Aller au contenu principal

Gestion d'erreurs et fallbacks

Gestion d’erreurs et fallbacks

En production, les fonctions echouent. API externe indisponible, donnees corrompues, timeout reseau : votre code doit gerer chaque cas et informer le modele de maniere exploitable pour qu’il adapte sa reponse.

Renvoyer les erreurs au modele

Quand une fonction echoue, renvoyez un message d’erreur structure dans function_call_output. Le modele s’en sert pour reformuler sa reponse ou tenter une approche alternative :

import json
from openai import OpenAI

client = OpenAI()

def executer_fonction(name: str, arguments: str, call_id: str) -> dict:
    try:
        args = json.loads(arguments)
        resultat = dispatcher[name](**args)
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps(resultat)
        }
    except KeyError:
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps({
                "error": "fonction_inconnue",
                "message": f"La fonction '{name}' n'existe pas"
            })
        }
    except json.JSONDecodeError:
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps({
                "error": "arguments_invalides",
                "message": "Les arguments JSON sont malformes"
            })
        }
    except Exception as e:
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps({
                "error": "erreur_interne",
                "message": str(e)
            })
        }

Retry avec backoff exponentiel

Pour les erreurs transitoires, retentez avant de signaler l’echec :

import time
import random

def executer_avec_retry(func, args, max_retries=3, base_delay=1.0):
    """Retry avec backoff exponentiel et jitter."""
    for tentative in range(max_retries + 1):
        try:
            return func(**args)
        except (ConnectionError, TimeoutError) as e:
            if tentative == max_retries:
                raise
            delay = base_delay * (2 ** tentative) + random.uniform(0, 0.5)
            time.sleep(delay)
        except ValueError:
            # Erreur de donnees : pas de retry
            raise

def executer_fonction_robuste(name, arguments, call_id):
    args = json.loads(arguments)
    try:
        resultat = executer_avec_retry(dispatcher[name], args)
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps(resultat)
        }
    except Exception as e:
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps({
                "error": type(e).__name__,
                "message": str(e),
                "retries_effectues": 3
            })
        }

Timeouts par fonction

Differentes fonctions meritent des timeouts differents :

import signal
from functools import wraps

TIMEOUTS = {
    "rechercher_client": 5,
    "generer_rapport": 30,
    "envoyer_email": 10,
}

def avec_timeout(func, timeout_seconds):
    """Execute une fonction avec un timeout."""
    import concurrent.futures
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func)
        try:
            return future.result(timeout=timeout_seconds)
        except concurrent.futures.TimeoutError:
            raise TimeoutError(
                f"Fonction interrompue apres {timeout_seconds}s"
            )

def executer_avec_timeout(name, arguments, call_id):
    args = json.loads(arguments)
    timeout = TIMEOUTS.get(name, 10)  # 10s par defaut

    try:
        resultat = avec_timeout(
            lambda: dispatcher[name](**args),
            timeout
        )
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps(resultat)
        }
    except TimeoutError as e:
        return {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps({
                "error": "timeout",
                "message": str(e),
                "suggestion": "Reessayez avec des criteres plus precis"
            })
        }

Fallback entre fonctions

Quand une source de donnees est indisponible, basculez sur une alternative :

async def rechercher_produit(terme: str) -> dict:
    """Recherche avec fallback : API principale -> cache -> base locale."""
    # Tentative 1 : API catalogue
    try:
        return await api_catalogue.rechercher(terme)
    except Exception:
        pass

    # Tentative 2 : cache Redis
    try:
        cached = await redis_client.get(f"produit:{terme}")
        if cached:
            resultat = json.loads(cached)
            resultat["_source"] = "cache"
            resultat["_avertissement"] = "Donnees potentiellement obsoletes"
            return resultat
    except Exception:
        pass

    # Tentative 3 : base locale SQLite
    try:
        row = db.execute(
            "SELECT * FROM produits WHERE nom LIKE ?",
            (f"%{terme}%",)
        ).fetchone()
        if row:
            return {
                "nom": row["nom"],
                "prix": row["prix"],
                "_source": "base_locale",
                "_avertissement": "Donnees de la derniere synchronisation"
            }
    except Exception:
        pass

    return {
        "error": "indisponible",
        "message": "Aucune source de donnees accessible pour cette recherche"
    }

Circuit breaker

Pour eviter de surcharger un service defaillant :

from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, seuil_echecs=5, duree_ouverture=60):
        self.seuil = seuil_echecs
        self.duree = duree_ouverture
        self.echecs = 0
        self.dernier_echec = None
        self.etat = "ferme"  # ferme, ouvert, semi-ouvert

    def peut_executer(self) -> bool:
        if self.etat == "ferme":
            return True
        if self.etat == "ouvert":
            if datetime.now() - self.dernier_echec > timedelta(seconds=self.duree):
                self.etat = "semi-ouvert"
                return True
            return False
        return True  # semi-ouvert : laisser passer un essai

    def enregistrer_succes(self):
        self.echecs = 0
        self.etat = "ferme"

    def enregistrer_echec(self):
        self.echecs += 1
        self.dernier_echec = datetime.now()
        if self.echecs >= self.seuil:
            self.etat = "ouvert"

# Un circuit breaker par service externe
breakers = {
    "api_meteo": CircuitBreaker(seuil_echecs=3, duree_ouverture=30),
    "api_catalogue": CircuitBreaker(seuil_echecs=5, duree_ouverture=60),
}

Boucle robuste complete

Voici le pattern complet pour une boucle de function calling en production :

def boucle_robuste(prompt, tools, max_tours=10):
    messages = prompt
    for tour in range(max_tours):
        response = client.responses.create(
            model="gpt-5.3",
            input=messages,
            tools=tools
        )

        appels = [i for i in response.output if i.type == "function_call"]
        if not appels:
            return response.output_text

        resultats = []
        for appel in appels:
            resultat = executer_fonction_robuste(
                appel.name, appel.arguments, appel.call_id
            )
            resultats.append(resultat)

        messages = response.output + resultats

    return "Workflow interrompu : limite de tours atteinte"

Points cles a retenir

  • Renvoyez toujours un function_call_output, meme en cas d’erreur
  • Structurez les erreurs en JSON pour que le modele les exploite
  • Retentez uniquement les erreurs transitoires avec backoff exponentiel
  • Implementez des fallbacks entre sources de donnees
  • Un circuit breaker protege les services defaillants