Architecture asynchrone et files d'attente
Pourquoi l’asynchrone est essentiel
En production, vous ne pouvez pas bloquer un processus en attendant qu’un LLM génère sa réponse. Les appels API prennent de 1 à 30 secondes selon le modèle et la complexité. Une architecture asynchrone avec files d’attente vous permet d’absorber les pics de charge, de gérer les retries, et de découpler vos services.
Architecture avec files d’attente
Pattern producteur / consommateur
import asyncio
from dataclasses import dataclass, field
from enum import Enum
class StatutTache(Enum):
EN_ATTENTE = "en_attente"
EN_COURS = "en_cours"
TERMINEE = "terminee"
ECHOUEE = "echouee"
@dataclass
class Tache:
id: str
prompt: str
modele: str = "gpt-5.3"
statut: StatutTache = StatutTache.EN_ATTENTE
resultat: str | None = None
tentatives: int = 0
max_tentatives: int = 3
class FileTraitement:
"""File d'attente asynchrone pour les appels LLM."""
def __init__(self, nb_workers: int = 5):
self.file: asyncio.Queue[Tache] = asyncio.Queue()
self.resultats: dict[str, Tache] = {}
self.nb_workers = nb_workers
async def ajouter(self, tache: Tache) -> str:
"""Ajoute une tâche à la file."""
self.resultats[tache.id] = tache
await self.file.put(tache)
return tache.id
async def worker(self, worker_id: int):
"""Worker qui traite les tâches de la file."""
import openai
client = openai.AsyncOpenAI()
while True:
tache = await self.file.get()
tache.statut = StatutTache.EN_COURS
tache.tentatives += 1
try:
response = await client.responses.create(
model=tache.modele,
input=tache.prompt,
)
tache.resultat = response.output_text
tache.statut = StatutTache.TERMINEE
except Exception as e:
if tache.tentatives < tache.max_tentatives:
tache.statut = StatutTache.EN_ATTENTE
await self.file.put(tache)
else:
tache.statut = StatutTache.ECHOUEE
tache.resultat = str(e)
finally:
self.file.task_done()
async def demarrer(self):
"""Lance les workers."""
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.nb_workers)
]
return workers
async def attendre_fin(self):
"""Attend que toutes les tâches soient traitées."""
await self.file.join()
Utilisation
async def traiter_documents(documents: list[str]):
file = FileTraitement(nb_workers=10)
workers = await file.demarrer()
# Ajouter les tâches
for i, doc in enumerate(documents):
tache = Tache(
id=f"doc-{i}",
prompt=f"Résumez ce document : {doc}",
)
await file.ajouter(tache)
# Attendre la fin
await file.attendre_fin()
# Collecter les résultats
for tache_id, tache in file.resultats.items():
print(f"{tache_id}: {tache.statut.value}")
# Arrêter les workers
for w in workers:
w.cancel()
Rate limiting intelligent
OpenAI impose des limites de débit (tokens par minute, requêtes par minute). Votre architecture doit les respecter :
import asyncio
import time
class RateLimiter:
"""Limiteur de débit basé sur un token bucket."""
def __init__(self, rpm: int = 500, tpm: int = 200_000):
self.rpm = rpm
self.tpm = tpm
self.requetes_recentes: list[float] = []
self.tokens_recents: list[tuple[float, int]] = []
self.verrou = asyncio.Lock()
async def attendre(self, tokens_estimes: int = 1000):
"""Attend si nécessaire pour respecter les limites."""
async with self.verrou:
maintenant = time.time()
fenetre = 60.0
# Nettoyer les entrées hors fenêtre
self.requetes_recentes = [
t for t in self.requetes_recentes
if maintenant - t < fenetre
]
self.tokens_recents = [
(t, n) for t, n in self.tokens_recents
if maintenant - t < fenetre
]
# Vérifier RPM
if len(self.requetes_recentes) >= self.rpm:
attente = self.requetes_recentes[0] + fenetre - maintenant
await asyncio.sleep(attente)
# Vérifier TPM
tokens_utilises = sum(n for _, n in self.tokens_recents)
if tokens_utilises + tokens_estimes > self.tpm:
attente = self.tokens_recents[0][0] + fenetre - maintenant
await asyncio.sleep(attente)
self.requetes_recentes.append(time.time())
self.tokens_recents.append((time.time(), tokens_estimes))
Pattern webhook pour les tâches longues
Pour les traitements qui prennent plusieurs minutes, utilisez un pattern webhook :
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
@app.post("/api/analyse")
async def lancer_analyse(
requete: dict,
background_tasks: BackgroundTasks,
):
"""Lance une analyse en arrière-plan et notifie par webhook."""
tache_id = generer_id()
background_tasks.add_task(
executer_analyse,
tache_id=tache_id,
document=requete["document"],
webhook_url=requete.get("webhook_url"),
)
return {"tache_id": tache_id, "statut": "en_cours"}
async def executer_analyse(
tache_id: str,
document: str,
webhook_url: str | None,
):
"""Exécute l'analyse et envoie le résultat par webhook."""
import httpx
resultat = await appeler_llm(document)
if webhook_url:
async with httpx.AsyncClient() as http:
await http.post(webhook_url, json={
"tache_id": tache_id,
"statut": "terminee",
"resultat": resultat,
})
Points clés à retenir
- Utilisez des files d’attente asynchrones pour découpler l’ingestion du traitement
- Implémentez un rate limiter pour respecter les quotas OpenAI
- Gérez les retries avec backoff exponentiel dans vos workers
- Pour les tâches longues, préférez un pattern webhook plutôt que du polling