Rate limiting et abuse prevention
Pourquoi limiter les requêtes ?
Les applications IA sont des cibles privilégiées pour l’abus : chaque appel au modèle coûte de l’argent, consomme des tokens et peut générer du contenu nuisible à grande échelle. Sans rate limiting, un attaquant peut ruiner votre budget API en quelques minutes, lancer des attaques de prompt injection automatisées ou utiliser votre infrastructure pour générer du spam.
Rate limiting côté application
Limiteur par fenêtre glissante
import time
from collections import defaultdict
class RateLimiter:
"""Rate limiter par fenêtre glissante."""
def __init__(self, max_requetes: int, fenetre_secondes: int):
self.max_requetes = max_requetes
self.fenetre = fenetre_secondes
self.requetes: dict[str, list[float]] = defaultdict(list)
def autoriser(self, identifiant: str) -> tuple[bool, dict]:
"""Vérifie si une requête est autorisée pour cet identifiant."""
maintenant = time.time()
seuil = maintenant - self.fenetre
# Nettoyer les anciennes requêtes
self.requetes[identifiant] = [
t for t in self.requetes[identifiant] if t > seuil
]
nb_requetes = len(self.requetes[identifiant])
if nb_requetes >= self.max_requetes:
prochain_slot = self.requetes[identifiant][0] + self.fenetre
return False, {
"restantes": 0,
"reset_dans": round(prochain_slot - maintenant, 1),
}
self.requetes[identifiant].append(maintenant)
return True, {
"restantes": self.max_requetes - nb_requetes - 1,
"reset_dans": round(self.fenetre - (maintenant - self.requetes[identifiant][0]), 1),
}
# Configuration par rôle
limiteurs = {
"gratuit": RateLimiter(max_requetes=10, fenetre_secondes=60),
"premium": RateLimiter(max_requetes=60, fenetre_secondes=60),
"api": RateLimiter(max_requetes=200, fenetre_secondes=60),
}
Limiteur par coût (tokens)
class TokenBudgetLimiter:
"""Limite les dépenses en tokens par utilisateur."""
def __init__(self, budget_tokens_jour: int, cout_par_1k_input: float, cout_par_1k_output: float):
self.budget_jour = budget_tokens_jour
self.cout_input = cout_par_1k_input
self.cout_output = cout_par_1k_output
self.usage: dict[str, dict] = defaultdict(lambda: {"tokens_in": 0, "tokens_out": 0, "date": ""})
def verifier_budget(self, user_id: str, tokens_estimes: int) -> tuple[bool, dict]:
"""Vérifie si l'utilisateur a encore du budget."""
from datetime import date
aujourd_hui = str(date.today())
# Reset quotidien
if self.usage[user_id]["date"] != aujourd_hui:
self.usage[user_id] = {"tokens_in": 0, "tokens_out": 0, "date": aujourd_hui}
total_utilise = self.usage[user_id]["tokens_in"] + self.usage[user_id]["tokens_out"]
restant = self.budget_jour - total_utilise
if tokens_estimes > restant:
return False, {"restant": restant, "demande": tokens_estimes}
return True, {"restant": restant - tokens_estimes}
def enregistrer_usage(self, user_id: str, tokens_in: int, tokens_out: int):
"""Enregistre l'usage réel après un appel."""
self.usage[user_id]["tokens_in"] += tokens_in
self.usage[user_id]["tokens_out"] += tokens_out
# Exemple : 100K tokens/jour pour les utilisateurs gratuits
budget = TokenBudgetLimiter(
budget_tokens_jour=100_000,
cout_par_1k_input=0.005,
cout_par_1k_output=0.015,
)
Détection d’abus
Détection de patterns suspects
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ProfilUtilisateur:
user_id: str
requetes_recentes: list[dict] = field(default_factory=list)
alertes: list[str] = field(default_factory=list)
class DetecteurAbus:
"""Détecte les patterns d'abus automatisé."""
def __init__(self):
self.profils: dict[str, ProfilUtilisateur] = {}
def analyser(self, user_id: str, message: str) -> list[str]:
"""Analyse un message pour détecter des patterns d'abus."""
if user_id not in self.profils:
self.profils[user_id] = ProfilUtilisateur(user_id=user_id)
profil = self.profils[user_id]
alertes = []
# Pattern 1 : messages identiques répétés
if profil.requetes_recentes:
derniers_messages = [r["message"] for r in profil.requetes_recentes[-5:]]
if derniers_messages.count(message) >= 3:
alertes.append("SPAM: message identique répété 3+ fois")
# Pattern 2 : fréquence anormale
maintenant = datetime.now()
recentes = [
r for r in profil.requetes_recentes
if (maintenant - r["timestamp"]).seconds < 10
]
if len(recentes) > 5:
alertes.append("FLOOD: 5+ requêtes en 10 secondes")
# Pattern 3 : messages très longs (tentative d'injection volumétrique)
if len(message) > 10000:
alertes.append(f"VOLUME: message de {len(message)} caractères")
# Pattern 4 : variation systématique (fuzzing automatisé)
if len(profil.requetes_recentes) >= 10:
longueurs = [len(r["message"]) for r in profil.requetes_recentes[-10:]]
if max(longueurs) - min(longueurs) < 5 and len(set(longueurs)) == 1:
alertes.append("FUZZING: messages de longueur identique (automate)")
# Enregistrer
profil.requetes_recentes.append({
"message": message,
"timestamp": maintenant,
"alertes": alertes,
})
return alertes
detecteur = DetecteurAbus()
Système de bannissement progressif
class GestionBannissement:
"""Bannissement progressif : avertissement, ralentissement, blocage."""
def __init__(self):
self.compteurs: dict[str, int] = defaultdict(int)
self.bans: dict[str, float] = {}
def signaler(self, user_id: str, severite: int = 1) -> str:
"""Signale un abus et applique la sanction appropriée."""
self.compteurs[user_id] += severite
score = self.compteurs[user_id]
if score >= 10:
self.bans[user_id] = time.time() + 86400 # Ban 24h
return "BAN_24H"
elif score >= 5:
self.bans[user_id] = time.time() + 3600 # Ban 1h
return "BAN_1H"
elif score >= 3:
return "RALENTIR" # Rate limit réduit
else:
return "AVERTIR"
def est_banni(self, user_id: str) -> bool:
"""Vérifie si un utilisateur est actuellement banni."""
if user_id in self.bans:
if time.time() < self.bans[user_id]:
return True
del self.bans[user_id]
return False
Intégration complète
from openai import OpenAI
class APISecurisee:
"""API avec rate limiting et détection d'abus intégrés."""
def __init__(self):
self.client = OpenAI()
self.rate_limiter = RateLimiter(max_requetes=20, fenetre_secondes=60)
self.budget = TokenBudgetLimiter(100_000, 0.005, 0.015)
self.detecteur = DetecteurAbus()
self.bans = GestionBannissement()
def traiter(self, user_id: str, message: str) -> str:
# Étape 1 : vérifier le bannissement
if self.bans.est_banni(user_id):
return "Votre accès est temporairement suspendu."
# Étape 2 : rate limiting
autorise, info = self.rate_limiter.autoriser(user_id)
if not autorise:
return f"Trop de requêtes. Réessayez dans {info['reset_dans']}s."
# Étape 3 : détection d'abus
alertes = self.detecteur.analyser(user_id, message)
if alertes:
action = self.bans.signaler(user_id, severite=len(alertes))
if action.startswith("BAN"):
return "Comportement abusif détecté. Accès suspendu."
# Étape 4 : vérifier le budget tokens
tokens_estimes = len(message) // 4 + 500
ok, budget_info = self.budget.verifier_budget(user_id, tokens_estimes)
if not ok:
return "Budget quotidien de tokens épuisé."
# Étape 5 : appel au modèle
response = self.client.chat.completions.create(
model="gpt-5.4",
messages=[{"role": "user", "content": message}],
)
# Enregistrer l'usage réel
usage = response.usage
self.budget.enregistrer_usage(user_id, usage.prompt_tokens, usage.completion_tokens)
return response.choices[0].message.content
Points clés à retenir
- Le rate limiting protège votre budget API et empêche les attaques automatisées
- Combinez limite par requête (fenêtre glissante) et limite par coût (tokens)
- La détection d’abus identifie les patterns : spam, flood, fuzzing, volume
- Le bannissement progressif (avertir, ralentir, bloquer) évite les faux positifs définitifs
- Chaque réponse doit inclure les headers de rate limit pour informer l’utilisateur