Aller au contenu principal

Scaling et architectures multi-agents

Scaling et architectures multi-agents

Votre agent fonctionne pour 10 utilisateurs. Comment le faire tourner pour 10 000 ? Dans cette dernière leçon, vous apprendrez à scaler vos agents, à concevoir des architectures multi-agents performantes, et à gérer les contraintes de production à grande échelle.

Architecture de production

Le backend agent avec FastAPI

from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import StreamingResponse
from agents import Agent, Runner, function_tool
from contextlib import asynccontextmanager
import asyncio

# Pool d'agents pré-configurés
agents_registry = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialiser les agents au démarrage
    agents_registry["commercial"] = Agent(
        name="Agent commercial",
        instructions="Vous assistez les clients pour leurs achats.",
        model="gpt-5.3",
        tools=[rechercher_produit, calculer_prix],
    )
    agents_registry["support"] = Agent(
        name="Agent support",
        instructions="Vous faites du support technique.",
        model="gpt-5.3",
        tools=[consulter_ticket, creer_ticket],
    )
    yield
    agents_registry.clear()

app = FastAPI(lifespan=lifespan)

@app.post("/api/chat/{agent_type}")
async def chat(agent_type: str, request: Request):
    agent = agents_registry.get(agent_type)
    if not agent:
        raise HTTPException(404, f"Agent '{agent_type}' non trouvé")

    body = await request.json()

    async def stream():
        result = Runner.run_streamed(agent, body["messages"])
        async for event in result.stream_events():
            if event.type == "raw_response_event":
                if hasattr(event.data, "delta") and event.data.delta:
                    yield f"data: {event.data.delta}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Gestion de la concurrence

Chaque requête crée un Runner indépendant. Le SDK gère nativement la concurrence :

import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Agent concurrent",
    instructions="Vous répondez aux questions.",
    model="gpt-5.3",
)

async def traiter_requetes_paralleles(messages: list[str]):
    """Traite plusieurs requêtes en parallèle."""
    taches = [Runner.run(agent, msg) for msg in messages]
    resultats = await asyncio.gather(*taches, return_exceptions=True)

    for msg, res in zip(messages, resultats):
        if isinstance(res, Exception):
            print(f"Erreur pour '{msg}': {res}")
        else:
            print(f"'{msg}' -> {res.final_output[:50]}...")

    return resultats

Patterns multi-agents

Pattern superviseur

Un agent superviseur coordonne des agents spécialisés :

from agents import Agent, Runner, Handoff

agent_recherche = Agent(
    name="Recherche",
    instructions="Vous recherchez des informations sur le web et dans les bases de données.",
    tools=[WebSearchTool(), rechercher_dans_crm],
    model="gpt-5.3",
)

agent_analyse = Agent(
    name="Analyse",
    instructions="Vous analysez les données et produisez des insights.",
    model="o3-pro",
)

agent_redaction = Agent(
    name="Rédaction",
    instructions="Vous rédigez des rapports professionnels en français.",
    model="gpt-5.3",
)

agent_superviseur = Agent(
    name="Superviseur",
    instructions="""Vous coordonnez une équipe d'agents spécialisés.
    Pour chaque demande :
    1. Analysez ce qui est nécessaire
    2. Déléguez aux agents spécialisés via handoff
    3. Le dernier agent produit la réponse finale

    Agents disponibles :
    - Recherche : pour collecter des données
    - Analyse : pour analyser et interpréter
    - Rédaction : pour produire le livrable final""",
    handoffs=[
        Handoff(agent=agent_recherche, description="Collecter des données"),
        Handoff(agent=agent_analyse, description="Analyser des données"),
        Handoff(agent=agent_redaction, description="Rédiger un rapport"),
    ],
    model="o4-mini",  # Rapide pour le routage
)

Pattern pipeline

Des agents connectés en séquence pour un workflow linéaire :

async def pipeline_analyse_concurrentielle(entreprise: str):
    # Étape 1 : Collecte parallèle de données
    tache_web = Runner.run(agent_recherche, f"Chercher des informations sur {entreprise}")
    tache_crm = Runner.run(agent_crm, f"Données CRM pour {entreprise}")
    resultats = await asyncio.gather(tache_web, tache_crm)

    donnees = f"""Données web : {resultats[0].final_output}
    Données CRM : {resultats[1].final_output}"""

    # Étape 2 : Analyse
    analyse = await Runner.run(agent_analyse, f"Analysez ces données : {donnees}")

    # Étape 3 : Rapport
    rapport = await Runner.run(
        agent_redaction,
        f"Rédigez un rapport d'analyse concurrentielle : {analyse.final_output}"
    )

    return rapport.final_output

Pattern map-reduce

Traiter un grand volume de données en parallèle puis agréger :

async def analyser_feedbacks(feedbacks: list[str]):
    # MAP : analyser chaque feedback en parallèle
    agent_classificateur = Agent(
        name="Classificateur",
        instructions="Classifiez le feedback : positif, négatif ou neutre. Identifiez le thème principal.",
        model="o4-mini",
        output_type=ClassificationFeedback,
    )

    taches = [Runner.run(agent_classificateur, fb) for fb in feedbacks]
    classifications = await asyncio.gather(*taches)

    # REDUCE : synthétiser les résultats
    resume_data = [
        {"feedback": fb, "class": c.final_output.dict()}
        for fb, c in zip(feedbacks, classifications)
    ]

    agent_synthetiseur = Agent(
        name="Synthétiseur",
        instructions="Synthétisez les classifications de feedbacks en un rapport avec statistiques et insights.",
        model="gpt-5.3",
    )

    import json
    rapport = await Runner.run(
        agent_synthetiseur,
        f"Synthétisez ces {len(feedbacks)} feedbacks classifiés : {json.dumps(resume_data, ensure_ascii=False)}"
    )

    return rapport.final_output

Gestion des rate limits

import asyncio
from agents import Runner

class GestionnaireRateLimits:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.compteur_erreurs = 0

    async def executer(self, agent, message, **kwargs):
        async with self.semaphore:
            try:
                result = await Runner.run(agent, message, **kwargs)
                self.compteur_erreurs = 0
                return result
            except Exception as e:
                if "rate_limit" in str(e).lower():
                    self.compteur_erreurs += 1
                    attente = min(2 ** self.compteur_erreurs, 60)
                    await asyncio.sleep(attente)
                    return await self.executer(agent, message, **kwargs)
                raise

gestionnaire = GestionnaireRateLimits(max_concurrent=20)

Optimisation des coûts à grande échelle

# Stratégie de modèles par complexité
def choisir_modele(complexite: str) -> str:
    modeles = {
        "simple": "gpt-5.3",      # Questions simples : pas cher
        "moyen": "gpt-5.3",       # La plupart des cas
        "complexe": "o4-mini",     # Raisonnement rapide
        "critique": "o3-pro",      # Cas critiques : qualité maximale
    }
    return modeles.get(complexite, "gpt-5.3")

# Cache des réponses fréquentes
from functools import lru_cache
import hashlib

reponses_cache = {}

async def executer_avec_cache(agent, message: str):
    cle = hashlib.md5(message.encode()).hexdigest()
    if cle in reponses_cache:
        return reponses_cache[cle]

    result = await Runner.run(agent, message)
    reponses_cache[cle] = result.final_output
    return result.final_output

Points clés à retenir

  • Pré-configurez vos agents au démarrage et réutilisez-les entre les requêtes
  • Le SDK gère nativement la concurrence via asyncio.gather()
  • Le pattern superviseur coordonne des agents spécialisés via handoffs
  • Le pattern map-reduce parallélise le traitement de gros volumes
  • Gérez les rate limits avec un sémaphore et un backoff exponentiel
  • Optimisez les coûts avec le choix de modèle par complexité et le caching