Aller au contenu principal

Streaming optimisé

Le streaming : indispensable en production

Le streaming transforme l’expérience utilisateur : au lieu d’attendre 5 secondes pour une réponse complète, l’utilisateur voit les premiers mots apparaître en quelques centaines de millisecondes. En production, ne pas utiliser le streaming est presque toujours une erreur.

Streaming avec la Responses API

Implémentation de base

import openai

client = openai.OpenAI()

def generer_en_streaming(prompt: str):
    """Génération en streaming avec la Responses API."""
    stream = client.responses.create(
        model="gpt-5.3",
        input=prompt,
        stream=True,
    )

    for event in stream:
        if event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)
        elif event.type == "response.completed":
            print()  # Nouvelle ligne à la fin
            return event.response

Streaming avec function calling

Le streaming fonctionne aussi avec les appels de fonctions. Vous recevez les arguments de la fonction au fil de la génération :

outils = [
    {
        "type": "function",
        "name": "rechercher_produit",
        "description": "Recherche un produit dans le catalogue.",
        "parameters": {
            "type": "object",
            "properties": {
                "requete": {"type": "string"},
                "categorie": {"type": "string"},
            },
            "required": ["requete"],
        },
    }
]

def streaming_avec_outils(prompt: str):
    stream = client.responses.create(
        model="gpt-5.3",
        input=prompt,
        tools=outils,
        stream=True,
    )

    for event in stream:
        match event.type:
            case "response.output_text.delta":
                print(event.delta, end="", flush=True)
            case "response.function_call_arguments.delta":
                # Arguments de la fonction en cours de génération
                print(f"[arg: {event.delta}]", end="")
            case "response.function_call_arguments.done":
                print(f"\n[Appel fonction terminé]")

Streaming côté serveur (SSE)

FastAPI + Server-Sent Events

Pour exposer le streaming à votre frontend :

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai
import json

app = FastAPI()
client = openai.OpenAI()

@app.post("/api/chat")
async def chat_streaming(requete: dict):
    prompt = requete["message"]

    async def generer():
        stream = client.responses.create(
            model="gpt-5.3",
            input=prompt,
            stream=True,
        )

        for event in stream:
            if event.type == "response.output_text.delta":
                donnees = json.dumps({"texte": event.delta})
                yield f"data: {donnees}\n\n"
            elif event.type == "response.completed":
                usage = event.response.usage
                donnees = json.dumps({
                    "fin": True,
                    "tokens_entree": usage.input_tokens,
                    "tokens_sortie": usage.output_tokens,
                })
                yield f"data: {donnees}\n\n"

    return StreamingResponse(
        generer(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Important pour nginx
        },
    )

Configuration nginx pour le streaming

Si vous utilisez nginx en reverse proxy, désactivez le buffering :

location /api/chat {
    proxy_pass http://127.0.0.1:8000;
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header Connection "";
    chunked_transfer_encoding on;
}

Gestion des erreurs en streaming

import openai

def streaming_resilient(prompt: str, max_tentatives: int = 3):
    """Streaming avec gestion d'erreurs et retry."""
    for tentative in range(max_tentatives):
        try:
            stream = client.responses.create(
                model="gpt-5.3",
                input=prompt,
                stream=True,
            )

            texte_complet = ""
            for event in stream:
                if event.type == "response.output_text.delta":
                    texte_complet += event.delta
                    yield event.delta

            return  # Succès, on sort

        except openai.APIConnectionError:
            if tentative < max_tentatives - 1:
                import time
                time.sleep(2 ** tentative)  # Backoff exponentiel
            else:
                raise
        except openai.RateLimitError:
            import time
            time.sleep(5)  # Attente fixe pour rate limit

Optimisations avancées

Traitement par chunks

Plutôt que de traiter token par token, accumulez par phrases pour un affichage plus fluide :

def streaming_par_phrase(prompt: str):
    """Accumule et yield par phrase complète."""
    buffer = ""

    stream = client.responses.create(
        model="gpt-5.3",
        input=prompt,
        stream=True,
    )

    for event in stream:
        if event.type == "response.output_text.delta":
            buffer += event.delta
            # Chercher une fin de phrase
            for separateur in [".\n", "!\n", "?\n", ". ", "! ", "? "]:
                if separateur in buffer:
                    parties = buffer.split(separateur, 1)
                    yield parties[0] + separateur.rstrip()
                    buffer = parties[1] if len(parties) > 1 else ""

    if buffer.strip():
        yield buffer.strip()

Points clés à retenir

  • Utilisez toujours le streaming en production pour réduire la latence perçue
  • Configurez nginx avec proxy_buffering off pour le SSE
  • Gérez les erreurs avec retry et backoff exponentiel
  • Accumulez par phrases pour un affichage plus naturel