Production : scaling et monitoring
Production : scaling et monitoring
Passer d’un prototype d’agent vocal à un service de production fiable et scalable exige une attention particulière à l’infrastructure, au monitoring et à la gestion des erreurs. Cette dernière leçon couvre les aspects opérationnels essentiels pour déployer un agent vocal en conditions réelles.
Architecture de production
En production, l’agent vocal fonctionne derrière un load balancer et un pool de workers :
Load Balancer (nginx / HAProxy)
Distribue les connexions WebSocket entrantes entre les workers. Supporte le sticky sessions pour maintenir l'affinité.
Workers (FastAPI + uvicorn)
Chaque worker gère plusieurs sessions Realtime simultanément grâce à asyncio.
Base de données (PostgreSQL)
Stocke les logs de conversation, les métriques et la configuration des agents.
Monitoring (Prometheus + Grafana)
Collecte les métriques en temps réel : latence, nombre de sessions, erreurs, coûts.
Gestion des sessions concurrentes
Chaque connexion WebSocket maintient une session Realtime API. Le nombre de sessions simultanées est limité par vos rate limits OpenAI et la capacité de vos workers :
import asyncio
from dataclasses import dataclass
@dataclass
class SessionManager:
"""Gère les sessions Realtime API concurrentes."""
max_sessions: int = 50
active_sessions: dict = None
semaphore: asyncio.Semaphore = None
def __post_init__(self):
self.active_sessions = {}
self.semaphore = asyncio.Semaphore(self.max_sessions)
async def create_session(self, session_id: str, config: dict):
"""Crée une nouvelle session avec contrôle de concurrence."""
async with self.semaphore:
if session_id in self.active_sessions:
raise ValueError(f"Session {session_id} déjà active")
self.active_sessions[session_id] = {
"created_at": asyncio.get_event_loop().time(),
"config": config,
"status": "active"
}
return session_id
async def close_session(self, session_id: str):
"""Ferme proprement une session."""
if session_id in self.active_sessions:
del self.active_sessions[session_id]
@property
def active_count(self) -> int:
return len(self.active_sessions)
@property
def available_slots(self) -> int:
return self.max_sessions - self.active_count
session_manager = SessionManager(max_sessions=50)
Logging structuré
Enregistrez chaque interaction pour le débogage et la conformité :
import logging
import json
from datetime import datetime
class ConversationLogger:
"""Logger structuré pour les conversations vocales."""
def __init__(self, session_id: str):
self.session_id = session_id
self.logger = logging.getLogger(f"voice.{session_id}")
self.events = []
def log_event(self, event_type: str, data: dict = None):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"session_id": self.session_id,
"event_type": event_type,
"data": data or {}
}
self.events.append(entry)
self.logger.info(json.dumps(entry))
def log_transcription(self, role: str, text: str):
self.log_event("transcription", {
"role": role,
"text": text
})
def log_function_call(self, name: str, args: dict, result: dict, duration_ms: float):
self.log_event("function_call", {
"name": name,
"arguments": args,
"result": result,
"duration_ms": round(duration_ms)
})
def log_error(self, error_type: str, message: str):
self.log_event("error", {
"type": error_type,
"message": message
})
async def save_to_db(self, db_pool):
"""Persiste le log complet en base de données."""
async with db_pool.acquire() as conn:
await conn.execute(
"INSERT INTO conversation_logs (session_id, events, created_at) "
"VALUES ($1, $2, NOW())",
self.session_id, json.dumps(self.events)
)
Métriques Prometheus
Exposez des métriques pour le monitoring temps réel :
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Métriques clés
sessions_active = Gauge(
"voice_sessions_active",
"Nombre de sessions vocales actives"
)
session_duration = Histogram(
"voice_session_duration_seconds",
"Durée des sessions vocales",
buckets=[30, 60, 120, 300, 600, 1800]
)
response_latency = Histogram(
"voice_response_latency_ms",
"Latence de réponse (fin de parole → premier audio)",
buckets=[200, 400, 600, 800, 1000, 1500, 2000, 3000]
)
errors_total = Counter(
"voice_errors_total",
"Nombre total d'erreurs",
["error_type"]
)
tokens_used = Counter(
"voice_tokens_total",
"Tokens consommés",
["type"] # input_audio, output_audio, input_text, output_text
)
# Démarrer le serveur de métriques
start_http_server(9090)
Gestion des coûts
La Realtime API facture l’audio en tokens. Contrôlez les coûts avec des limites de session :
class CostController:
"""Contrôle les coûts d'une session Realtime."""
def __init__(self, max_duration_s: int = 600, max_tokens: int = 50000):
self.max_duration_s = max_duration_s
self.max_tokens = max_tokens
self.total_tokens = 0
self.start_time = None
def start(self):
self.start_time = asyncio.get_event_loop().time()
def update_tokens(self, usage: dict):
self.total_tokens += usage.get("total_tokens", 0)
@property
def elapsed_seconds(self) -> float:
if not self.start_time:
return 0
return asyncio.get_event_loop().time() - self.start_time
@property
def should_terminate(self) -> bool:
if self.elapsed_seconds > self.max_duration_s:
return True
if self.total_tokens > self.max_tokens:
return True
return False
@property
def warning_message(self) -> str:
remaining = self.max_duration_s - self.elapsed_seconds
if remaining < 60:
return f"Cette session se terminera dans {int(remaining)} secondes."
return ""
Gestion des erreurs en production
async def production_event_loop(ws, logger, cost_ctrl):
"""Boucle d'événements robuste pour la production."""
cost_ctrl.start()
try:
async for raw in ws:
event = json.loads(raw)
t = event["type"]
# Vérifier les limites de coût
if cost_ctrl.should_terminate:
logger.log_event("session_limit_reached")
await ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "input_text",
"text": "La session arrive à sa limite de durée."}]
}
}))
break
if t == "response.done":
usage = event["response"].get("usage", {})
cost_ctrl.update_tokens(usage)
tokens_used.labels(type="total").inc(usage.get("total_tokens", 0))
elif t == "error":
error = event["error"]
logger.log_error(error["type"], error["message"])
errors_total.labels(error_type=error["type"]).inc()
if error["type"] == "rate_limit_error":
await asyncio.sleep(1)
except websockets.exceptions.ConnectionClosed as e:
logger.log_error("connection_closed", str(e))
except Exception as e:
logger.log_error("unexpected", str(e))
finally:
session_duration.observe(cost_ctrl.elapsed_seconds)
sessions_active.dec()
Points clés à retenir
- L’architecture de production sépare load balancer, workers asyncio, base de données et monitoring
- Un
SessionManageravec sémaphore contrôle le nombre de sessions simultanées - Le logging structuré JSON permet le débogage et la conformité (enregistrement des conversations)
- Les métriques Prometheus (TTFA, durée, tokens, erreurs) sont indispensables pour l’opération quotidienne
- Le
CostControllerlimite la durée et les tokens par session pour éviter les dérapages budgétaires