Aller au contenu principal

Production : scaling et monitoring

Production : scaling et monitoring

Passer d’un prototype d’agent vocal à un service de production fiable et scalable exige une attention particulière à l’infrastructure, au monitoring et à la gestion des erreurs. Cette dernière leçon couvre les aspects opérationnels essentiels pour déployer un agent vocal en conditions réelles.

Architecture de production

En production, l’agent vocal fonctionne derrière un load balancer et un pool de workers :

1

Load Balancer (nginx / HAProxy)

Distribue les connexions WebSocket entrantes entre les workers. Supporte le sticky sessions pour maintenir l'affinité.

2

Workers (FastAPI + uvicorn)

Chaque worker gère plusieurs sessions Realtime simultanément grâce à asyncio.

3

Base de données (PostgreSQL)

Stocke les logs de conversation, les métriques et la configuration des agents.

4

Monitoring (Prometheus + Grafana)

Collecte les métriques en temps réel : latence, nombre de sessions, erreurs, coûts.

Gestion des sessions concurrentes

Chaque connexion WebSocket maintient une session Realtime API. Le nombre de sessions simultanées est limité par vos rate limits OpenAI et la capacité de vos workers :

import asyncio
from dataclasses import dataclass

@dataclass
class SessionManager:
    """Gère les sessions Realtime API concurrentes."""
    max_sessions: int = 50
    active_sessions: dict = None
    semaphore: asyncio.Semaphore = None

    def __post_init__(self):
        self.active_sessions = {}
        self.semaphore = asyncio.Semaphore(self.max_sessions)

    async def create_session(self, session_id: str, config: dict):
        """Crée une nouvelle session avec contrôle de concurrence."""
        async with self.semaphore:
            if session_id in self.active_sessions:
                raise ValueError(f"Session {session_id} déjà active")

            self.active_sessions[session_id] = {
                "created_at": asyncio.get_event_loop().time(),
                "config": config,
                "status": "active"
            }
            return session_id

    async def close_session(self, session_id: str):
        """Ferme proprement une session."""
        if session_id in self.active_sessions:
            del self.active_sessions[session_id]

    @property
    def active_count(self) -> int:
        return len(self.active_sessions)

    @property
    def available_slots(self) -> int:
        return self.max_sessions - self.active_count

session_manager = SessionManager(max_sessions=50)

Logging structuré

Enregistrez chaque interaction pour le débogage et la conformité :

import logging
import json
from datetime import datetime

class ConversationLogger:
    """Logger structuré pour les conversations vocales."""

    def __init__(self, session_id: str):
        self.session_id = session_id
        self.logger = logging.getLogger(f"voice.{session_id}")
        self.events = []

    def log_event(self, event_type: str, data: dict = None):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "session_id": self.session_id,
            "event_type": event_type,
            "data": data or {}
        }
        self.events.append(entry)
        self.logger.info(json.dumps(entry))

    def log_transcription(self, role: str, text: str):
        self.log_event("transcription", {
            "role": role,
            "text": text
        })

    def log_function_call(self, name: str, args: dict, result: dict, duration_ms: float):
        self.log_event("function_call", {
            "name": name,
            "arguments": args,
            "result": result,
            "duration_ms": round(duration_ms)
        })

    def log_error(self, error_type: str, message: str):
        self.log_event("error", {
            "type": error_type,
            "message": message
        })

    async def save_to_db(self, db_pool):
        """Persiste le log complet en base de données."""
        async with db_pool.acquire() as conn:
            await conn.execute(
                "INSERT INTO conversation_logs (session_id, events, created_at) "
                "VALUES ($1, $2, NOW())",
                self.session_id, json.dumps(self.events)
            )

Métriques Prometheus

Exposez des métriques pour le monitoring temps réel :

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Métriques clés
sessions_active = Gauge(
    "voice_sessions_active",
    "Nombre de sessions vocales actives"
)

session_duration = Histogram(
    "voice_session_duration_seconds",
    "Durée des sessions vocales",
    buckets=[30, 60, 120, 300, 600, 1800]
)

response_latency = Histogram(
    "voice_response_latency_ms",
    "Latence de réponse (fin de parole → premier audio)",
    buckets=[200, 400, 600, 800, 1000, 1500, 2000, 3000]
)

errors_total = Counter(
    "voice_errors_total",
    "Nombre total d'erreurs",
    ["error_type"]
)

tokens_used = Counter(
    "voice_tokens_total",
    "Tokens consommés",
    ["type"]  # input_audio, output_audio, input_text, output_text
)

# Démarrer le serveur de métriques
start_http_server(9090)

Gestion des coûts

La Realtime API facture l’audio en tokens. Contrôlez les coûts avec des limites de session :

class CostController:
    """Contrôle les coûts d'une session Realtime."""

    def __init__(self, max_duration_s: int = 600, max_tokens: int = 50000):
        self.max_duration_s = max_duration_s
        self.max_tokens = max_tokens
        self.total_tokens = 0
        self.start_time = None

    def start(self):
        self.start_time = asyncio.get_event_loop().time()

    def update_tokens(self, usage: dict):
        self.total_tokens += usage.get("total_tokens", 0)

    @property
    def elapsed_seconds(self) -> float:
        if not self.start_time:
            return 0
        return asyncio.get_event_loop().time() - self.start_time

    @property
    def should_terminate(self) -> bool:
        if self.elapsed_seconds > self.max_duration_s:
            return True
        if self.total_tokens > self.max_tokens:
            return True
        return False

    @property
    def warning_message(self) -> str:
        remaining = self.max_duration_s - self.elapsed_seconds
        if remaining < 60:
            return f"Cette session se terminera dans {int(remaining)} secondes."
        return ""

Gestion des erreurs en production

async def production_event_loop(ws, logger, cost_ctrl):
    """Boucle d'événements robuste pour la production."""
    cost_ctrl.start()

    try:
        async for raw in ws:
            event = json.loads(raw)
            t = event["type"]

            # Vérifier les limites de coût
            if cost_ctrl.should_terminate:
                logger.log_event("session_limit_reached")
                await ws.send(json.dumps({
                    "type": "conversation.item.create",
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": [{"type": "input_text",
                                     "text": "La session arrive à sa limite de durée."}]
                    }
                }))
                break

            if t == "response.done":
                usage = event["response"].get("usage", {})
                cost_ctrl.update_tokens(usage)
                tokens_used.labels(type="total").inc(usage.get("total_tokens", 0))

            elif t == "error":
                error = event["error"]
                logger.log_error(error["type"], error["message"])
                errors_total.labels(error_type=error["type"]).inc()

                if error["type"] == "rate_limit_error":
                    await asyncio.sleep(1)

    except websockets.exceptions.ConnectionClosed as e:
        logger.log_error("connection_closed", str(e))
    except Exception as e:
        logger.log_error("unexpected", str(e))
    finally:
        session_duration.observe(cost_ctrl.elapsed_seconds)
        sessions_active.dec()

Points clés à retenir

  • L’architecture de production sépare load balancer, workers asyncio, base de données et monitoring
  • Un SessionManager avec sémaphore contrôle le nombre de sessions simultanées
  • Le logging structuré JSON permet le débogage et la conformité (enregistrement des conversations)
  • Les métriques Prometheus (TTFA, durée, tokens, erreurs) sont indispensables pour l’opération quotidienne
  • Le CostController limite la durée et les tokens par session pour éviter les dérapages budgétaires