Streaming : recevoir les réponses en temps réel
Streaming : recevoir les réponses en temps réel
Le streaming permet de recevoir la réponse token par token au lieu d’attendre la génération complète. C’est indispensable pour offrir une expérience utilisateur réactive, notamment dans les chatbots et interfaces conversationnelles.
Pourquoi utiliser le streaming ?
Sans streaming, votre utilisateur attend parfois 10 à 30 secondes avant de voir quoi que ce soit. Avec le streaming, les premiers mots apparaissent en moins d’une seconde.
from openai import OpenAI
client = OpenAI()
# SANS streaming — attente complète
response = client.responses.create(
model="gpt-5.3",
input="Écrivez un paragraphe sur l'intelligence artificielle."
)
print(response.output_text) # Tout d'un coup après l'attente
# AVEC streaming — token par token
stream = client.responses.create(
model="gpt-5.3",
input="Écrivez un paragraphe sur l'intelligence artificielle.",
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
print() # Nouvelle ligne à la fin
Les types d’événements
Le flux de streaming émet différents types d’événements :
stream = client.responses.create(
model="gpt-5.3",
input="Bonjour !",
stream=True
)
for event in stream:
match event.type:
case "response.created":
print(f"[Réponse créée] ID: {event.response.id}")
case "response.output_item.added":
print(f"[Nouvel élément de sortie]")
case "response.output_text.delta":
print(f"[Texte] {event.delta}", end="")
case "response.output_text.done":
print(f"\n[Texte complet]")
case "response.completed":
print(f"[Terminé] Tokens: {event.response.usage.total_tokens}")
# Résultat :
# [Réponse créée] ID: resp_abc123
# [Nouvel élément de sortie]
# [Texte] Bonjour[Texte] ![Texte] Comment[Texte] puis[Texte] -je...
# [Texte complet]
# [Terminé] Tokens: 25
Streaming avec accumulation
Pour collecter la réponse complète tout en affichant le streaming :
def stream_et_collecter(prompt: str) -> str:
"""Stream la réponse tout en la collectant."""
stream = client.responses.create(
model="gpt-5.3",
input=prompt,
stream=True
)
texte_complet = ""
for event in stream:
if event.type == "response.output_text.delta":
texte_complet += event.delta
print(event.delta, end="", flush=True)
print() # Nouvelle ligne
return texte_complet
resultat = stream_et_collecter("Listez 3 capitales européennes.")
print(f"\nTexte collecté ({len(resultat)} caractères)")
Streaming avec function calling
Le streaming fonctionne aussi avec les appels de fonctions :
stream = client.responses.create(
model="gpt-5.3",
input="Quelle heure est-il à Tokyo ?",
tools=[{
"type": "function",
"name": "get_heure",
"description": "Obtenir l'heure dans une ville",
"parameters": {
"type": "object",
"properties": {
"ville": {"type": "string"}
},
"required": ["ville"]
}
}],
stream=True
)
for event in stream:
match event.type:
case "response.output_text.delta":
print(event.delta, end="")
case "response.function_call_arguments.delta":
print(f"[Args] {event.delta}", end="")
case "response.function_call_arguments.done":
print(f"\n[Appel de fonction complet]")
# Résultat :
# [Args] {"ville": "Tokyo"}
# [Appel de fonction complet]
Streaming asynchrone
Pour les applications async (FastAPI, etc.) :
from openai import AsyncOpenAI
import asyncio
async_client = AsyncOpenAI()
async def stream_async(prompt: str):
"""Stream asynchrone de la réponse."""
stream = await async_client.responses.create(
model="gpt-5.3",
input=prompt,
stream=True
)
async for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
print()
asyncio.run(stream_async("Racontez une blague courte."))
Intégration avec FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
@app.get("/chat")
async def chat(question: str):
def generate():
stream = client.responses.create(
model="gpt-5.3",
input=question,
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
yield event.delta
return StreamingResponse(generate(), media_type="text/plain")
Gestion des erreurs en streaming
from openai import APIError
def stream_avec_erreurs(prompt: str):
"""Stream avec gestion d'erreurs robuste."""
try:
stream = client.responses.create(
model="gpt-5.3",
input=prompt,
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
print()
except APIError as e:
print(f"\nErreur API pendant le streaming : {e.message}")
except Exception as e:
print(f"\nErreur inattendue : {e}")
stream_avec_erreurs("Expliquez le streaming en 2 phrases.")
Points clés à retenir
- Activez le streaming avec
stream=Truepour une expérience temps réel - Filtrez les événements par
event.typepour traiter chaque type de données response.output_text.deltacontient les fragments de texteresponse.completedsignale la fin et inclut les métriques d’usage- Utilisez
AsyncOpenAIpour le streaming dans les applications asynchrones - Gérez toujours les erreurs même en mode streaming