Aller au contenu principal

Batch API : traitement asynchrone massif

Traiter des milliers de requêtes à moitié prix

La Batch API d’OpenAI vous permet d’envoyer des lots de requêtes à traiter de manière asynchrone. Le compromis est simple : vous acceptez un délai de traitement (jusqu’à 24 heures) et en échange vous payez 50 % moins cher. Pour tout traitement qui n’est pas temps réel, c’est le choix optimal.

Quand utiliser la Batch API

  • Analyse de documents : traiter des centaines de contrats, rapports, emails
  • Génération de contenu : créer des fiches produit, descriptions, traductions
  • Évaluation : exécuter des benchmarks sur des milliers de cas de test
  • Classification : catégoriser un backlog de tickets ou de feedbacks
  • Extraction de données : parser des milliers de factures ou de CV

Créer un batch

Étape 1 : Préparer le fichier JSONL

Chaque ligne du fichier est une requête indépendante :

import json

requetes = [
    {
        "custom_id": f"doc-{i}",
        "method": "POST",
        "url": "/v1/responses",
        "body": {
            "model": "gpt-5.3",
            "input": f"Résumez ce document en 3 points clés : {doc}",
            "max_output_tokens": 500,
        },
    }
    for i, doc in enumerate(liste_documents)
]

# Écrire le fichier JSONL
with open("batch_input.jsonl", "w") as f:
    for requete in requetes:
        f.write(json.dumps(requete) + "\n")

Étape 2 : Uploader et lancer le batch

import openai

client = openai.OpenAI()

# Upload du fichier
fichier = client.files.create(
    file=open("batch_input.jsonl", "rb"),
    purpose="batch",
)

# Création du batch
batch = client.batches.create(
    input_file_id=fichier.id,
    endpoint="/v1/responses",
    completion_window="24h",
    metadata={"projet": "analyse-contrats", "version": "2.1"},
)

print(f"Batch créé : {batch.id}")
print(f"Statut : {batch.status}")

Étape 3 : Suivre et récupérer les résultats

import time

def attendre_batch(batch_id: str, intervalle: int = 60) -> dict:
    """Attend la fin d'un batch et retourne les résultats."""
    while True:
        batch = client.batches.retrieve(batch_id)
        print(
            f"Statut: {batch.status} | "
            f"Terminés: {batch.request_counts.completed}/"
            f"{batch.request_counts.total} | "
            f"Échoués: {batch.request_counts.failed}"
        )

        if batch.status == "completed":
            break
        elif batch.status in ("failed", "expired", "cancelled"):
            raise RuntimeError(f"Batch échoué : {batch.status}")

        time.sleep(intervalle)

    # Télécharger les résultats
    contenu = client.files.content(batch.output_file_id)
    resultats = []
    for ligne in contenu.text.strip().split("\n"):
        resultats.append(json.loads(ligne))

    return resultats


resultats = attendre_batch(batch.id)

# Traiter les résultats
for resultat in resultats:
    custom_id = resultat["custom_id"]
    if resultat["response"]["status_code"] == 200:
        texte = resultat["response"]["body"]["output"][0]["content"][0]["text"]
        print(f"{custom_id}: {texte[:100]}...")
    else:
        print(f"{custom_id}: ERREUR {resultat[response][status_code]}")

Gestion des erreurs

def traiter_erreurs_batch(resultats: list[dict]) -> tuple[list, list]:
    """Sépare les succès des échecs pour retraitement."""
    succes = []
    echecs = []

    for r in resultats:
        if r["response"]["status_code"] == 200:
            succes.append(r)
        else:
            echecs.append(r)

    if echecs:
        print(f"Attention : {len(echecs)} requêtes échouées sur {len(resultats)}")
        # Réécrire un fichier JSONL avec les échecs pour retry
        with open("batch_retry.jsonl", "w") as f:
            for echec in echecs:
                requete_originale = {
                    "custom_id": echec["custom_id"],
                    "method": "POST",
                    "url": "/v1/responses",
                    "body": echec["request"]["body"],
                }
                f.write(json.dumps(requete_originale) + "\n")

    return succes, echecs

Optimisation des coûts

Comparaison des coûts

Pour un traitement de 10 000 documents avec GPT-5.3 :

  • API synchrone : coût standard (100 %)
  • Batch API : 50 % du coût standard
  • Batch API + prompt caching : les tokens en cache sont aussi réduits

Regrouper intelligemment

def creer_batches_optimises(
    requetes: list[dict],
    taille_max: int = 50_000,
) -> list[list[dict]]:
    """Découpe en batches de taille optimale."""
    batches = []
    batch_courant = []

    for requete in requetes:
        batch_courant.append(requete)
        if len(batch_courant) >= taille_max:
            batches.append(batch_courant)
            batch_courant = []

    if batch_courant:
        batches.append(batch_courant)

    return batches

Points clés à retenir

  • La Batch API offre 50 % de réduction pour les traitements non temps réel
  • Préparez vos requêtes en fichier JSONL avec un custom_id unique par requête
  • Prévoyez un mécanisme de retry pour les requêtes échouées
  • Combinez avec le prompt caching pour maximiser les économies