Parallélisme et concurrence
Concurrence vs parallélisme pour les appels LLM
Les appels API sont des opérations I/O-bound : votre programme attend une réponse réseau. La concurrence (asyncio) est donc bien plus efficace que le parallélisme (multiprocessing) pour ce cas d’usage. Cette leçon vous montre comment maximiser le débit de vos appels.
Concurrence avec asyncio
Le client asynchrone OpenAI
import asyncio
import openai
client = openai.AsyncOpenAI()
async def appel_unique(prompt: str, modele: str = "gpt-5.3") -> str:
"""Un appel API asynchrone."""
response = await client.responses.create(
model=modele,
input=prompt,
)
return response.output_text
async def appels_concurrents(prompts: list[str]) -> list[str]:
"""Lance tous les appels en concurrence."""
taches = [appel_unique(prompt) for prompt in prompts]
resultats = await asyncio.gather(*taches, return_exceptions=True)
for i, resultat in enumerate(resultats):
if isinstance(resultat, Exception):
print(f"Erreur sur le prompt {i}: {resultat}")
return [r for r in resultats if not isinstance(r, Exception)]
Limiter la concurrence avec un sémaphore
Lancer 1 000 appels simultanés va déclencher un rate limit. Utilisez un sémaphore :
async def appels_avec_limite(
prompts: list[str],
max_concurrence: int = 20,
) -> list[str]:
"""Limite le nombre d'appels simultanés."""
semaphore = asyncio.Semaphore(max_concurrence)
async def appel_limite(prompt: str) -> str:
async with semaphore:
return await appel_unique(prompt)
taches = [appel_limite(prompt) for prompt in prompts]
return await asyncio.gather(*taches, return_exceptions=True)
Pattern map-reduce avec LLM
Pour traiter un gros document, découpez-le en chunks, traitez en parallèle, puis agrégez :
async def map_reduce_resume(
document: str,
taille_chunk: int = 3000,
) -> str:
"""Résume un long document par map-reduce parallèle."""
# Phase MAP : découper et résumer en parallèle
chunks = [
document[i:i + taille_chunk]
for i in range(0, len(document), taille_chunk)
]
prompts_map = [
f"Résumez ce passage en 3 points clés :\n\n{chunk}"
for chunk in chunks
]
resumes_partiels = await appels_avec_limite(prompts_map, max_concurrence=10)
# Phase REDUCE : combiner les résumés
resumes_texte = "\n\n---\n\n".join(
r for r in resumes_partiels if isinstance(r, str)
)
resume_final = await appel_unique(
f"Synthétisez ces résumés partiels en un résumé cohérent "
f"de 5 points clés :\n\n{resumes_texte}"
)
return resume_final
Parallélisme avec ProcessPoolExecutor
Dans de rares cas (post-traitement CPU-intensif), combinez asyncio et multiprocessing :
import asyncio
from concurrent.futures import ProcessPoolExecutor
def post_traitement_lourd(texte: str) -> dict:
"""Traitement CPU-bound sur le résultat (NLP, parsing, etc.)."""
# Exemple : extraction d'entités, scoring, etc.
mots = texte.split()
return {
"nb_mots": len(mots),
"mots_uniques": len(set(mots)),
}
async def pipeline_complet(prompts: list[str]) -> list[dict]:
"""Pipeline : appels LLM async + post-traitement CPU parallèle."""
# Étape 1 : appels LLM en concurrence
resultats_llm = await appels_avec_limite(prompts)
# Étape 2 : post-traitement en parallèle (CPU)
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as pool:
taches = [
loop.run_in_executor(pool, post_traitement_lourd, r)
for r in resultats_llm
if isinstance(r, str)
]
resultats_finaux = await asyncio.gather(*taches)
return resultats_finaux
Mesurer le débit
import time
async def benchmark_debit(nb_requetes: int = 100):
"""Mesure le débit réel de votre pipeline."""
prompts = [
f"Donnez un fait intéressant numéro {i}."
for i in range(nb_requetes)
]
debut = time.perf_counter()
resultats = await appels_avec_limite(prompts, max_concurrence=20)
duree = time.perf_counter() - debut
succes = sum(1 for r in resultats if isinstance(r, str))
print(f"Requêtes : {nb_requetes}")
print(f"Succès : {succes}")
print(f"Durée : {duree:.1f}s")
print(f"Débit : {succes / duree:.1f} req/s")
Points clés à retenir
- Utilisez
AsyncOpenAIetasyncio.gatherpour la concurrence I/O - Limitez la concurrence avec un sémaphore pour respecter les quotas
- Le pattern map-reduce parallélise le traitement de longs documents
- Réservez
ProcessPoolExecutorau post-traitement CPU-intensif