Aller au contenu principal

Streaming : recevoir les réponses en temps réel

Streaming : recevoir les réponses en temps réel

Le streaming permet de recevoir la réponse token par token au lieu d’attendre la génération complète. C’est indispensable pour offrir une expérience utilisateur réactive, notamment dans les chatbots et interfaces conversationnelles.

Pourquoi utiliser le streaming ?

Sans streaming, votre utilisateur attend parfois 10 à 30 secondes avant de voir quoi que ce soit. Avec le streaming, les premiers mots apparaissent en moins d’une seconde.

from openai import OpenAI
client = OpenAI()

# SANS streaming — attente complète
response = client.responses.create(
    model="gpt-5.3",
    input="Écrivez un paragraphe sur l'intelligence artificielle."
)
print(response.output_text)  # Tout d'un coup après l'attente

# AVEC streaming — token par token
stream = client.responses.create(
    model="gpt-5.3",
    input="Écrivez un paragraphe sur l'intelligence artificielle.",
    stream=True
)

for event in stream:
    if event.type == "response.output_text.delta":
        print(event.delta, end="", flush=True)
print()  # Nouvelle ligne à la fin

Les types d’événements

Le flux de streaming émet différents types d’événements :

stream = client.responses.create(
    model="gpt-5.3",
    input="Bonjour !",
    stream=True
)

for event in stream:
    match event.type:
        case "response.created":
            print(f"[Réponse créée] ID: {event.response.id}")
        case "response.output_item.added":
            print(f"[Nouvel élément de sortie]")
        case "response.output_text.delta":
            print(f"[Texte] {event.delta}", end="")
        case "response.output_text.done":
            print(f"\n[Texte complet]")
        case "response.completed":
            print(f"[Terminé] Tokens: {event.response.usage.total_tokens}")

# Résultat :
# [Réponse créée] ID: resp_abc123
# [Nouvel élément de sortie]
# [Texte] Bonjour[Texte]  ![Texte]  Comment[Texte]  puis[Texte] -je...
# [Texte complet]
# [Terminé] Tokens: 25

Streaming avec accumulation

Pour collecter la réponse complète tout en affichant le streaming :

def stream_et_collecter(prompt: str) -> str:
    """Stream la réponse tout en la collectant."""
    stream = client.responses.create(
        model="gpt-5.3",
        input=prompt,
        stream=True
    )

    texte_complet = ""
    for event in stream:
        if event.type == "response.output_text.delta":
            texte_complet += event.delta
            print(event.delta, end="", flush=True)

    print()  # Nouvelle ligne
    return texte_complet

resultat = stream_et_collecter("Listez 3 capitales européennes.")
print(f"\nTexte collecté ({len(resultat)} caractères)")

Streaming avec function calling

Le streaming fonctionne aussi avec les appels de fonctions :

stream = client.responses.create(
    model="gpt-5.3",
    input="Quelle heure est-il à Tokyo ?",
    tools=[{
        "type": "function",
        "name": "get_heure",
        "description": "Obtenir l'heure dans une ville",
        "parameters": {
            "type": "object",
            "properties": {
                "ville": {"type": "string"}
            },
            "required": ["ville"]
        }
    }],
    stream=True
)

for event in stream:
    match event.type:
        case "response.output_text.delta":
            print(event.delta, end="")
        case "response.function_call_arguments.delta":
            print(f"[Args] {event.delta}", end="")
        case "response.function_call_arguments.done":
            print(f"\n[Appel de fonction complet]")

# Résultat :
# [Args] {"ville": "Tokyo"}
# [Appel de fonction complet]

Streaming asynchrone

Pour les applications async (FastAPI, etc.) :

from openai import AsyncOpenAI
import asyncio

async_client = AsyncOpenAI()

async def stream_async(prompt: str):
    """Stream asynchrone de la réponse."""
    stream = await async_client.responses.create(
        model="gpt-5.3",
        input=prompt,
        stream=True
    )

    async for event in stream:
        if event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)
    print()

asyncio.run(stream_async("Racontez une blague courte."))

Intégration avec FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()

@app.get("/chat")
async def chat(question: str):
    def generate():
        stream = client.responses.create(
            model="gpt-5.3",
            input=question,
            stream=True
        )
        for event in stream:
            if event.type == "response.output_text.delta":
                yield event.delta

    return StreamingResponse(generate(), media_type="text/plain")

Gestion des erreurs en streaming

from openai import APIError

def stream_avec_erreurs(prompt: str):
    """Stream avec gestion d'erreurs robuste."""
    try:
        stream = client.responses.create(
            model="gpt-5.3",
            input=prompt,
            stream=True
        )

        for event in stream:
            if event.type == "response.output_text.delta":
                print(event.delta, end="", flush=True)
        print()

    except APIError as e:
        print(f"\nErreur API pendant le streaming : {e.message}")
    except Exception as e:
        print(f"\nErreur inattendue : {e}")

stream_avec_erreurs("Expliquez le streaming en 2 phrases.")

Points clés à retenir

  • Activez le streaming avec stream=True pour une expérience temps réel
  • Filtrez les événements par event.type pour traiter chaque type de données
  • response.output_text.delta contient les fragments de texte
  • response.completed signale la fin et inclut les métriques d’usage
  • Utilisez AsyncOpenAI pour le streaming dans les applications asynchrones
  • Gérez toujours les erreurs même en mode streaming