Scaling et architectures multi-agents
Scaling et architectures multi-agents
Votre agent fonctionne pour 10 utilisateurs. Comment le faire tourner pour 10 000 ? Dans cette dernière leçon, vous apprendrez à scaler vos agents, à concevoir des architectures multi-agents performantes, et à gérer les contraintes de production à grande échelle.
Architecture de production
Le backend agent avec FastAPI
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import StreamingResponse
from agents import Agent, Runner, function_tool
from contextlib import asynccontextmanager
import asyncio
# Pool d'agents pré-configurés
agents_registry = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialiser les agents au démarrage
agents_registry["commercial"] = Agent(
name="Agent commercial",
instructions="Vous assistez les clients pour leurs achats.",
model="gpt-5.3",
tools=[rechercher_produit, calculer_prix],
)
agents_registry["support"] = Agent(
name="Agent support",
instructions="Vous faites du support technique.",
model="gpt-5.3",
tools=[consulter_ticket, creer_ticket],
)
yield
agents_registry.clear()
app = FastAPI(lifespan=lifespan)
@app.post("/api/chat/{agent_type}")
async def chat(agent_type: str, request: Request):
agent = agents_registry.get(agent_type)
if not agent:
raise HTTPException(404, f"Agent '{agent_type}' non trouvé")
body = await request.json()
async def stream():
result = Runner.run_streamed(agent, body["messages"])
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, "delta") and event.data.delta:
yield f"data: {event.data.delta}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Gestion de la concurrence
Chaque requête crée un Runner indépendant. Le SDK gère nativement la concurrence :
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Agent concurrent",
instructions="Vous répondez aux questions.",
model="gpt-5.3",
)
async def traiter_requetes_paralleles(messages: list[str]):
"""Traite plusieurs requêtes en parallèle."""
taches = [Runner.run(agent, msg) for msg in messages]
resultats = await asyncio.gather(*taches, return_exceptions=True)
for msg, res in zip(messages, resultats):
if isinstance(res, Exception):
print(f"Erreur pour '{msg}': {res}")
else:
print(f"'{msg}' -> {res.final_output[:50]}...")
return resultats
Patterns multi-agents
Pattern superviseur
Un agent superviseur coordonne des agents spécialisés :
from agents import Agent, Runner, Handoff
agent_recherche = Agent(
name="Recherche",
instructions="Vous recherchez des informations sur le web et dans les bases de données.",
tools=[WebSearchTool(), rechercher_dans_crm],
model="gpt-5.3",
)
agent_analyse = Agent(
name="Analyse",
instructions="Vous analysez les données et produisez des insights.",
model="o3-pro",
)
agent_redaction = Agent(
name="Rédaction",
instructions="Vous rédigez des rapports professionnels en français.",
model="gpt-5.3",
)
agent_superviseur = Agent(
name="Superviseur",
instructions="""Vous coordonnez une équipe d'agents spécialisés.
Pour chaque demande :
1. Analysez ce qui est nécessaire
2. Déléguez aux agents spécialisés via handoff
3. Le dernier agent produit la réponse finale
Agents disponibles :
- Recherche : pour collecter des données
- Analyse : pour analyser et interpréter
- Rédaction : pour produire le livrable final""",
handoffs=[
Handoff(agent=agent_recherche, description="Collecter des données"),
Handoff(agent=agent_analyse, description="Analyser des données"),
Handoff(agent=agent_redaction, description="Rédiger un rapport"),
],
model="o4-mini", # Rapide pour le routage
)
Pattern pipeline
Des agents connectés en séquence pour un workflow linéaire :
async def pipeline_analyse_concurrentielle(entreprise: str):
# Étape 1 : Collecte parallèle de données
tache_web = Runner.run(agent_recherche, f"Chercher des informations sur {entreprise}")
tache_crm = Runner.run(agent_crm, f"Données CRM pour {entreprise}")
resultats = await asyncio.gather(tache_web, tache_crm)
donnees = f"""Données web : {resultats[0].final_output}
Données CRM : {resultats[1].final_output}"""
# Étape 2 : Analyse
analyse = await Runner.run(agent_analyse, f"Analysez ces données : {donnees}")
# Étape 3 : Rapport
rapport = await Runner.run(
agent_redaction,
f"Rédigez un rapport d'analyse concurrentielle : {analyse.final_output}"
)
return rapport.final_output
Pattern map-reduce
Traiter un grand volume de données en parallèle puis agréger :
async def analyser_feedbacks(feedbacks: list[str]):
# MAP : analyser chaque feedback en parallèle
agent_classificateur = Agent(
name="Classificateur",
instructions="Classifiez le feedback : positif, négatif ou neutre. Identifiez le thème principal.",
model="o4-mini",
output_type=ClassificationFeedback,
)
taches = [Runner.run(agent_classificateur, fb) for fb in feedbacks]
classifications = await asyncio.gather(*taches)
# REDUCE : synthétiser les résultats
resume_data = [
{"feedback": fb, "class": c.final_output.dict()}
for fb, c in zip(feedbacks, classifications)
]
agent_synthetiseur = Agent(
name="Synthétiseur",
instructions="Synthétisez les classifications de feedbacks en un rapport avec statistiques et insights.",
model="gpt-5.3",
)
import json
rapport = await Runner.run(
agent_synthetiseur,
f"Synthétisez ces {len(feedbacks)} feedbacks classifiés : {json.dumps(resume_data, ensure_ascii=False)}"
)
return rapport.final_output
Gestion des rate limits
import asyncio
from agents import Runner
class GestionnaireRateLimits:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.compteur_erreurs = 0
async def executer(self, agent, message, **kwargs):
async with self.semaphore:
try:
result = await Runner.run(agent, message, **kwargs)
self.compteur_erreurs = 0
return result
except Exception as e:
if "rate_limit" in str(e).lower():
self.compteur_erreurs += 1
attente = min(2 ** self.compteur_erreurs, 60)
await asyncio.sleep(attente)
return await self.executer(agent, message, **kwargs)
raise
gestionnaire = GestionnaireRateLimits(max_concurrent=20)
Optimisation des coûts à grande échelle
# Stratégie de modèles par complexité
def choisir_modele(complexite: str) -> str:
modeles = {
"simple": "gpt-5.3", # Questions simples : pas cher
"moyen": "gpt-5.3", # La plupart des cas
"complexe": "o4-mini", # Raisonnement rapide
"critique": "o3-pro", # Cas critiques : qualité maximale
}
return modeles.get(complexite, "gpt-5.3")
# Cache des réponses fréquentes
from functools import lru_cache
import hashlib
reponses_cache = {}
async def executer_avec_cache(agent, message: str):
cle = hashlib.md5(message.encode()).hexdigest()
if cle in reponses_cache:
return reponses_cache[cle]
result = await Runner.run(agent, message)
reponses_cache[cle] = result.final_output
return result.final_output
Points clés à retenir
- Pré-configurez vos agents au démarrage et réutilisez-les entre les requêtes
- Le SDK gère nativement la concurrence via
asyncio.gather() - Le pattern superviseur coordonne des agents spécialisés via handoffs
- Le pattern map-reduce parallélise le traitement de gros volumes
- Gérez les rate limits avec un sémaphore et un backoff exponentiel
- Optimisez les coûts avec le choix de modèle par complexité et le caching