Aller au contenu principal

Le streaming en temps reel

Objectifs

  • Comprendre le fonctionnement du streaming
  • Travailler avec les evenements de flux

Pourquoi le streaming ?

Par defaut, client.messages.create() renvoie le contenu une fois que toute la generation est terminee. Pour des reponses longues, l’utilisateur attend sans rien voir. Le streaming resout ce probleme en envoyant le contenu au fur et a mesure de sa generation, comme sur claude.ai.

Activer le streaming

Il suffit d’ajouter stream=True :

stream = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=500,
    stream=True,
    messages=[{"role": "user", "content": "Ecris un court essai sur l'ocean"}]
)

L’objet retourne est un generateur qui emet des evenements server-sent (SSE).

Les types d’evenements

Chaque flux contient les evenements suivants, dans cet ordre :

  1. MessageStartEvent : debut du message (contenu vide)
  2. ContentBlockStartEvent : debut d’un bloc de contenu
  3. ContentBlockDeltaEvent (un ou plusieurs) : les morceaux de texte genere
  4. ContentBlockStopEvent : fin du bloc de contenu
  5. MessageDeltaEvent : informations de fin (tokens utilises)
  6. MessageStopEvent : fin du message

Le texte genere se trouve dans les ContentBlockDeltaEvent, accessibles via event.delta.text.

Extraire le texte

for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text, end="", flush=True)

Les parametres end="" et flush=True permettent un affichage progressif sur la meme ligne.

Impact sur le Time to First Token (TTFT)

Le grand avantage du streaming est la reduction du temps avant le premier token :

ApprochePremier tokenReponse complete
Sans streaming~4 secondes~4 secondes
Avec streaming~0,5 secondes~4 secondes

Le temps total reste le meme, mais l’utilisateur voit du contenu 8x plus vite.

Les helpers du SDK

Le SDK offre client.messages.stream() qui simplifie le travail :

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=500,
    messages=[{"role": "user", "content": "Raconte une histoire courte"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

get_final_message()

Pour acceder au message complet une fois le streaming termine :

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=500,
    messages=[{"role": "user", "content": "Raconte une histoire"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    final = stream.get_final_message()
    print(f"\nTokens utilises : {final.usage}")

Handlers d’evenements personnalises

Vous pouvez definir des classes avec des gestionnaires d’evenements :

  • on_text(text, snapshot) : declenche a chaque nouveau texte
  • on_stream_event(event) : declenche pour tout evenement
  • on_message(message) : declenche quand le message est complet
  • on_exception(exception) : declenche en cas d’erreur
  • on_end() : dernier evenement du flux

Exercice

Ecrivez un chatbot en ligne de commande qui utilise le streaming pour afficher les reponses de Claude en temps reel, token par token.