Gestion d'erreurs et fallbacks
Gestion d’erreurs et fallbacks
En production, les fonctions echouent. API externe indisponible, donnees corrompues, timeout reseau : votre code doit gerer chaque cas et informer le modele de maniere exploitable pour qu’il adapte sa reponse.
Renvoyer les erreurs au modele
Quand une fonction echoue, renvoyez un message d’erreur structure dans function_call_output. Le modele s’en sert pour reformuler sa reponse ou tenter une approche alternative :
import json
from openai import OpenAI
client = OpenAI()
def executer_fonction(name: str, arguments: str, call_id: str) -> dict:
try:
args = json.loads(arguments)
resultat = dispatcher[name](**args)
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(resultat)
}
except KeyError:
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({
"error": "fonction_inconnue",
"message": f"La fonction '{name}' n'existe pas"
})
}
except json.JSONDecodeError:
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({
"error": "arguments_invalides",
"message": "Les arguments JSON sont malformes"
})
}
except Exception as e:
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({
"error": "erreur_interne",
"message": str(e)
})
}
Retry avec backoff exponentiel
Pour les erreurs transitoires, retentez avant de signaler l’echec :
import time
import random
def executer_avec_retry(func, args, max_retries=3, base_delay=1.0):
"""Retry avec backoff exponentiel et jitter."""
for tentative in range(max_retries + 1):
try:
return func(**args)
except (ConnectionError, TimeoutError) as e:
if tentative == max_retries:
raise
delay = base_delay * (2 ** tentative) + random.uniform(0, 0.5)
time.sleep(delay)
except ValueError:
# Erreur de donnees : pas de retry
raise
def executer_fonction_robuste(name, arguments, call_id):
args = json.loads(arguments)
try:
resultat = executer_avec_retry(dispatcher[name], args)
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(resultat)
}
except Exception as e:
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({
"error": type(e).__name__,
"message": str(e),
"retries_effectues": 3
})
}
Timeouts par fonction
Differentes fonctions meritent des timeouts differents :
import signal
from functools import wraps
TIMEOUTS = {
"rechercher_client": 5,
"generer_rapport": 30,
"envoyer_email": 10,
}
def avec_timeout(func, timeout_seconds):
"""Execute une fonction avec un timeout."""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func)
try:
return future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
raise TimeoutError(
f"Fonction interrompue apres {timeout_seconds}s"
)
def executer_avec_timeout(name, arguments, call_id):
args = json.loads(arguments)
timeout = TIMEOUTS.get(name, 10) # 10s par defaut
try:
resultat = avec_timeout(
lambda: dispatcher[name](**args),
timeout
)
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(resultat)
}
except TimeoutError as e:
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({
"error": "timeout",
"message": str(e),
"suggestion": "Reessayez avec des criteres plus precis"
})
}
Fallback entre fonctions
Quand une source de donnees est indisponible, basculez sur une alternative :
async def rechercher_produit(terme: str) -> dict:
"""Recherche avec fallback : API principale -> cache -> base locale."""
# Tentative 1 : API catalogue
try:
return await api_catalogue.rechercher(terme)
except Exception:
pass
# Tentative 2 : cache Redis
try:
cached = await redis_client.get(f"produit:{terme}")
if cached:
resultat = json.loads(cached)
resultat["_source"] = "cache"
resultat["_avertissement"] = "Donnees potentiellement obsoletes"
return resultat
except Exception:
pass
# Tentative 3 : base locale SQLite
try:
row = db.execute(
"SELECT * FROM produits WHERE nom LIKE ?",
(f"%{terme}%",)
).fetchone()
if row:
return {
"nom": row["nom"],
"prix": row["prix"],
"_source": "base_locale",
"_avertissement": "Donnees de la derniere synchronisation"
}
except Exception:
pass
return {
"error": "indisponible",
"message": "Aucune source de donnees accessible pour cette recherche"
}
Circuit breaker
Pour eviter de surcharger un service defaillant :
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, seuil_echecs=5, duree_ouverture=60):
self.seuil = seuil_echecs
self.duree = duree_ouverture
self.echecs = 0
self.dernier_echec = None
self.etat = "ferme" # ferme, ouvert, semi-ouvert
def peut_executer(self) -> bool:
if self.etat == "ferme":
return True
if self.etat == "ouvert":
if datetime.now() - self.dernier_echec > timedelta(seconds=self.duree):
self.etat = "semi-ouvert"
return True
return False
return True # semi-ouvert : laisser passer un essai
def enregistrer_succes(self):
self.echecs = 0
self.etat = "ferme"
def enregistrer_echec(self):
self.echecs += 1
self.dernier_echec = datetime.now()
if self.echecs >= self.seuil:
self.etat = "ouvert"
# Un circuit breaker par service externe
breakers = {
"api_meteo": CircuitBreaker(seuil_echecs=3, duree_ouverture=30),
"api_catalogue": CircuitBreaker(seuil_echecs=5, duree_ouverture=60),
}
Boucle robuste complete
Voici le pattern complet pour une boucle de function calling en production :
def boucle_robuste(prompt, tools, max_tours=10):
messages = prompt
for tour in range(max_tours):
response = client.responses.create(
model="gpt-5.3",
input=messages,
tools=tools
)
appels = [i for i in response.output if i.type == "function_call"]
if not appels:
return response.output_text
resultats = []
for appel in appels:
resultat = executer_fonction_robuste(
appel.name, appel.arguments, appel.call_id
)
resultats.append(resultat)
messages = response.output + resultats
return "Workflow interrompu : limite de tours atteinte"
Points cles a retenir
- Renvoyez toujours un
function_call_output, meme en cas d’erreur - Structurez les erreurs en JSON pour que le modele les exploite
- Retentez uniquement les erreurs transitoires avec backoff exponentiel
- Implementez des fallbacks entre sources de donnees
- Un circuit breaker protege les services defaillants