Le streaming en temps reel
Objectifs
- Comprendre le fonctionnement du streaming
- Travailler avec les evenements de flux
Pourquoi le streaming ?
Par defaut, client.messages.create() renvoie le contenu une fois que toute la generation est terminee. Pour des reponses longues, l’utilisateur attend sans rien voir. Le streaming resout ce probleme en envoyant le contenu au fur et a mesure de sa generation, comme sur claude.ai.
Activer le streaming
Il suffit d’ajouter stream=True :
stream = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=500,
stream=True,
messages=[{"role": "user", "content": "Ecris un court essai sur l'ocean"}]
)
L’objet retourne est un generateur qui emet des evenements server-sent (SSE).
Les types d’evenements
Chaque flux contient les evenements suivants, dans cet ordre :
- MessageStartEvent : debut du message (contenu vide)
- ContentBlockStartEvent : debut d’un bloc de contenu
- ContentBlockDeltaEvent (un ou plusieurs) : les morceaux de texte genere
- ContentBlockStopEvent : fin du bloc de contenu
- MessageDeltaEvent : informations de fin (tokens utilises)
- MessageStopEvent : fin du message
Le texte genere se trouve dans les ContentBlockDeltaEvent, accessibles via event.delta.text.
Extraire le texte
for event in stream:
if event.type == "content_block_delta":
print(event.delta.text, end="", flush=True)
Les parametres end="" et flush=True permettent un affichage progressif sur la meme ligne.
Impact sur le Time to First Token (TTFT)
Le grand avantage du streaming est la reduction du temps avant le premier token :
| Approche | Premier token | Reponse complete |
|---|---|---|
| Sans streaming | ~4 secondes | ~4 secondes |
| Avec streaming | ~0,5 secondes | ~4 secondes |
Le temps total reste le meme, mais l’utilisateur voit du contenu 8x plus vite.
Les helpers du SDK
Le SDK offre client.messages.stream() qui simplifie le travail :
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=500,
messages=[{"role": "user", "content": "Raconte une histoire courte"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
get_final_message()
Pour acceder au message complet une fois le streaming termine :
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=500,
messages=[{"role": "user", "content": "Raconte une histoire"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
final = stream.get_final_message()
print(f"\nTokens utilises : {final.usage}")
Handlers d’evenements personnalises
Vous pouvez definir des classes avec des gestionnaires d’evenements :
on_text(text, snapshot): declenche a chaque nouveau texteon_stream_event(event): declenche pour tout evenementon_message(message): declenche quand le message est completon_exception(exception): declenche en cas d’erreuron_end(): dernier evenement du flux
Exercice
Ecrivez un chatbot en ligne de commande qui utilise le streaming pour afficher les reponses de Claude en temps reel, token par token.