Aller au contenu principal

Architecture asynchrone et files d'attente

Pourquoi l’asynchrone est essentiel

En production, vous ne pouvez pas bloquer un processus en attendant qu’un LLM génère sa réponse. Les appels API prennent de 1 à 30 secondes selon le modèle et la complexité. Une architecture asynchrone avec files d’attente vous permet d’absorber les pics de charge, de gérer les retries, et de découpler vos services.

Architecture avec files d’attente

Pattern producteur / consommateur

import asyncio
from dataclasses import dataclass, field
from enum import Enum

class StatutTache(Enum):
    EN_ATTENTE = "en_attente"
    EN_COURS = "en_cours"
    TERMINEE = "terminee"
    ECHOUEE = "echouee"

@dataclass
class Tache:
    id: str
    prompt: str
    modele: str = "gpt-5.3"
    statut: StatutTache = StatutTache.EN_ATTENTE
    resultat: str | None = None
    tentatives: int = 0
    max_tentatives: int = 3

class FileTraitement:
    """File d'attente asynchrone pour les appels LLM."""

    def __init__(self, nb_workers: int = 5):
        self.file: asyncio.Queue[Tache] = asyncio.Queue()
        self.resultats: dict[str, Tache] = {}
        self.nb_workers = nb_workers

    async def ajouter(self, tache: Tache) -> str:
        """Ajoute une tâche à la file."""
        self.resultats[tache.id] = tache
        await self.file.put(tache)
        return tache.id

    async def worker(self, worker_id: int):
        """Worker qui traite les tâches de la file."""
        import openai
        client = openai.AsyncOpenAI()

        while True:
            tache = await self.file.get()
            tache.statut = StatutTache.EN_COURS
            tache.tentatives += 1

            try:
                response = await client.responses.create(
                    model=tache.modele,
                    input=tache.prompt,
                )
                tache.resultat = response.output_text
                tache.statut = StatutTache.TERMINEE

            except Exception as e:
                if tache.tentatives < tache.max_tentatives:
                    tache.statut = StatutTache.EN_ATTENTE
                    await self.file.put(tache)
                else:
                    tache.statut = StatutTache.ECHOUEE
                    tache.resultat = str(e)

            finally:
                self.file.task_done()

    async def demarrer(self):
        """Lance les workers."""
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.nb_workers)
        ]
        return workers

    async def attendre_fin(self):
        """Attend que toutes les tâches soient traitées."""
        await self.file.join()

Utilisation

async def traiter_documents(documents: list[str]):
    file = FileTraitement(nb_workers=10)
    workers = await file.demarrer()

    # Ajouter les tâches
    for i, doc in enumerate(documents):
        tache = Tache(
            id=f"doc-{i}",
            prompt=f"Résumez ce document : {doc}",
        )
        await file.ajouter(tache)

    # Attendre la fin
    await file.attendre_fin()

    # Collecter les résultats
    for tache_id, tache in file.resultats.items():
        print(f"{tache_id}: {tache.statut.value}")

    # Arrêter les workers
    for w in workers:
        w.cancel()

Rate limiting intelligent

OpenAI impose des limites de débit (tokens par minute, requêtes par minute). Votre architecture doit les respecter :

import asyncio
import time

class RateLimiter:
    """Limiteur de débit basé sur un token bucket."""

    def __init__(self, rpm: int = 500, tpm: int = 200_000):
        self.rpm = rpm
        self.tpm = tpm
        self.requetes_recentes: list[float] = []
        self.tokens_recents: list[tuple[float, int]] = []
        self.verrou = asyncio.Lock()

    async def attendre(self, tokens_estimes: int = 1000):
        """Attend si nécessaire pour respecter les limites."""
        async with self.verrou:
            maintenant = time.time()
            fenetre = 60.0

            # Nettoyer les entrées hors fenêtre
            self.requetes_recentes = [
                t for t in self.requetes_recentes
                if maintenant - t < fenetre
            ]
            self.tokens_recents = [
                (t, n) for t, n in self.tokens_recents
                if maintenant - t < fenetre
            ]

            # Vérifier RPM
            if len(self.requetes_recentes) >= self.rpm:
                attente = self.requetes_recentes[0] + fenetre - maintenant
                await asyncio.sleep(attente)

            # Vérifier TPM
            tokens_utilises = sum(n for _, n in self.tokens_recents)
            if tokens_utilises + tokens_estimes > self.tpm:
                attente = self.tokens_recents[0][0] + fenetre - maintenant
                await asyncio.sleep(attente)

            self.requetes_recentes.append(time.time())
            self.tokens_recents.append((time.time(), tokens_estimes))

Pattern webhook pour les tâches longues

Pour les traitements qui prennent plusieurs minutes, utilisez un pattern webhook :

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

@app.post("/api/analyse")
async def lancer_analyse(
    requete: dict,
    background_tasks: BackgroundTasks,
):
    """Lance une analyse en arrière-plan et notifie par webhook."""
    tache_id = generer_id()

    background_tasks.add_task(
        executer_analyse,
        tache_id=tache_id,
        document=requete["document"],
        webhook_url=requete.get("webhook_url"),
    )

    return {"tache_id": tache_id, "statut": "en_cours"}


async def executer_analyse(
    tache_id: str,
    document: str,
    webhook_url: str | None,
):
    """Exécute l'analyse et envoie le résultat par webhook."""
    import httpx

    resultat = await appeler_llm(document)

    if webhook_url:
        async with httpx.AsyncClient() as http:
            await http.post(webhook_url, json={
                "tache_id": tache_id,
                "statut": "terminee",
                "resultat": resultat,
            })

Points clés à retenir

  • Utilisez des files d’attente asynchrones pour découpler l’ingestion du traitement
  • Implémentez un rate limiter pour respecter les quotas OpenAI
  • Gérez les retries avec backoff exponentiel dans vos workers
  • Pour les tâches longues, préférez un pattern webhook plutôt que du polling