Aller au contenu principal

Parallélisme et concurrence

Concurrence vs parallélisme pour les appels LLM

Les appels API sont des opérations I/O-bound : votre programme attend une réponse réseau. La concurrence (asyncio) est donc bien plus efficace que le parallélisme (multiprocessing) pour ce cas d’usage. Cette leçon vous montre comment maximiser le débit de vos appels.

Concurrence avec asyncio

Le client asynchrone OpenAI

import asyncio
import openai

client = openai.AsyncOpenAI()

async def appel_unique(prompt: str, modele: str = "gpt-5.3") -> str:
    """Un appel API asynchrone."""
    response = await client.responses.create(
        model=modele,
        input=prompt,
    )
    return response.output_text

async def appels_concurrents(prompts: list[str]) -> list[str]:
    """Lance tous les appels en concurrence."""
    taches = [appel_unique(prompt) for prompt in prompts]
    resultats = await asyncio.gather(*taches, return_exceptions=True)

    for i, resultat in enumerate(resultats):
        if isinstance(resultat, Exception):
            print(f"Erreur sur le prompt {i}: {resultat}")

    return [r for r in resultats if not isinstance(r, Exception)]

Limiter la concurrence avec un sémaphore

Lancer 1 000 appels simultanés va déclencher un rate limit. Utilisez un sémaphore :

async def appels_avec_limite(
    prompts: list[str],
    max_concurrence: int = 20,
) -> list[str]:
    """Limite le nombre d'appels simultanés."""
    semaphore = asyncio.Semaphore(max_concurrence)

    async def appel_limite(prompt: str) -> str:
        async with semaphore:
            return await appel_unique(prompt)

    taches = [appel_limite(prompt) for prompt in prompts]
    return await asyncio.gather(*taches, return_exceptions=True)

Pattern map-reduce avec LLM

Pour traiter un gros document, découpez-le en chunks, traitez en parallèle, puis agrégez :

async def map_reduce_resume(
    document: str,
    taille_chunk: int = 3000,
) -> str:
    """Résume un long document par map-reduce parallèle."""

    # Phase MAP : découper et résumer en parallèle
    chunks = [
        document[i:i + taille_chunk]
        for i in range(0, len(document), taille_chunk)
    ]

    prompts_map = [
        f"Résumez ce passage en 3 points clés :\n\n{chunk}"
        for chunk in chunks
    ]

    resumes_partiels = await appels_avec_limite(prompts_map, max_concurrence=10)

    # Phase REDUCE : combiner les résumés
    resumes_texte = "\n\n---\n\n".join(
        r for r in resumes_partiels if isinstance(r, str)
    )

    resume_final = await appel_unique(
        f"Synthétisez ces résumés partiels en un résumé cohérent "
        f"de 5 points clés :\n\n{resumes_texte}"
    )

    return resume_final

Parallélisme avec ProcessPoolExecutor

Dans de rares cas (post-traitement CPU-intensif), combinez asyncio et multiprocessing :

import asyncio
from concurrent.futures import ProcessPoolExecutor

def post_traitement_lourd(texte: str) -> dict:
    """Traitement CPU-bound sur le résultat (NLP, parsing, etc.)."""
    # Exemple : extraction d'entités, scoring, etc.
    mots = texte.split()
    return {
        "nb_mots": len(mots),
        "mots_uniques": len(set(mots)),
    }

async def pipeline_complet(prompts: list[str]) -> list[dict]:
    """Pipeline : appels LLM async + post-traitement CPU parallèle."""
    # Étape 1 : appels LLM en concurrence
    resultats_llm = await appels_avec_limite(prompts)

    # Étape 2 : post-traitement en parallèle (CPU)
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=4) as pool:
        taches = [
            loop.run_in_executor(pool, post_traitement_lourd, r)
            for r in resultats_llm
            if isinstance(r, str)
        ]
        resultats_finaux = await asyncio.gather(*taches)

    return resultats_finaux

Mesurer le débit

import time

async def benchmark_debit(nb_requetes: int = 100):
    """Mesure le débit réel de votre pipeline."""
    prompts = [
        f"Donnez un fait intéressant numéro {i}."
        for i in range(nb_requetes)
    ]

    debut = time.perf_counter()
    resultats = await appels_avec_limite(prompts, max_concurrence=20)
    duree = time.perf_counter() - debut

    succes = sum(1 for r in resultats if isinstance(r, str))
    print(f"Requêtes : {nb_requetes}")
    print(f"Succès : {succes}")
    print(f"Durée : {duree:.1f}s")
    print(f"Débit : {succes / duree:.1f} req/s")

Points clés à retenir

  • Utilisez AsyncOpenAI et asyncio.gather pour la concurrence I/O
  • Limitez la concurrence avec un sémaphore pour respecter les quotas
  • Le pattern map-reduce parallélise le traitement de longs documents
  • Réservez ProcessPoolExecutor au post-traitement CPU-intensif