Aller au contenu principal

Rate limiting et abuse prevention

Pourquoi limiter les requêtes ?

Les applications IA sont des cibles privilégiées pour l’abus : chaque appel au modèle coûte de l’argent, consomme des tokens et peut générer du contenu nuisible à grande échelle. Sans rate limiting, un attaquant peut ruiner votre budget API en quelques minutes, lancer des attaques de prompt injection automatisées ou utiliser votre infrastructure pour générer du spam.

Rate limiting côté application

Limiteur par fenêtre glissante

import time
from collections import defaultdict

class RateLimiter:
    """Rate limiter par fenêtre glissante."""

    def __init__(self, max_requetes: int, fenetre_secondes: int):
        self.max_requetes = max_requetes
        self.fenetre = fenetre_secondes
        self.requetes: dict[str, list[float]] = defaultdict(list)

    def autoriser(self, identifiant: str) -> tuple[bool, dict]:
        """Vérifie si une requête est autorisée pour cet identifiant."""
        maintenant = time.time()
        seuil = maintenant - self.fenetre

        # Nettoyer les anciennes requêtes
        self.requetes[identifiant] = [
            t for t in self.requetes[identifiant] if t > seuil
        ]

        nb_requetes = len(self.requetes[identifiant])

        if nb_requetes >= self.max_requetes:
            prochain_slot = self.requetes[identifiant][0] + self.fenetre
            return False, {
                "restantes": 0,
                "reset_dans": round(prochain_slot - maintenant, 1),
            }

        self.requetes[identifiant].append(maintenant)
        return True, {
            "restantes": self.max_requetes - nb_requetes - 1,
            "reset_dans": round(self.fenetre - (maintenant - self.requetes[identifiant][0]), 1),
        }

# Configuration par rôle
limiteurs = {
    "gratuit": RateLimiter(max_requetes=10, fenetre_secondes=60),
    "premium": RateLimiter(max_requetes=60, fenetre_secondes=60),
    "api": RateLimiter(max_requetes=200, fenetre_secondes=60),
}

Limiteur par coût (tokens)

class TokenBudgetLimiter:
    """Limite les dépenses en tokens par utilisateur."""

    def __init__(self, budget_tokens_jour: int, cout_par_1k_input: float, cout_par_1k_output: float):
        self.budget_jour = budget_tokens_jour
        self.cout_input = cout_par_1k_input
        self.cout_output = cout_par_1k_output
        self.usage: dict[str, dict] = defaultdict(lambda: {"tokens_in": 0, "tokens_out": 0, "date": ""})

    def verifier_budget(self, user_id: str, tokens_estimes: int) -> tuple[bool, dict]:
        """Vérifie si l'utilisateur a encore du budget."""
        from datetime import date
        aujourd_hui = str(date.today())

        # Reset quotidien
        if self.usage[user_id]["date"] != aujourd_hui:
            self.usage[user_id] = {"tokens_in": 0, "tokens_out": 0, "date": aujourd_hui}

        total_utilise = self.usage[user_id]["tokens_in"] + self.usage[user_id]["tokens_out"]
        restant = self.budget_jour - total_utilise

        if tokens_estimes > restant:
            return False, {"restant": restant, "demande": tokens_estimes}

        return True, {"restant": restant - tokens_estimes}

    def enregistrer_usage(self, user_id: str, tokens_in: int, tokens_out: int):
        """Enregistre l'usage réel après un appel."""
        self.usage[user_id]["tokens_in"] += tokens_in
        self.usage[user_id]["tokens_out"] += tokens_out

# Exemple : 100K tokens/jour pour les utilisateurs gratuits
budget = TokenBudgetLimiter(
    budget_tokens_jour=100_000,
    cout_par_1k_input=0.005,
    cout_par_1k_output=0.015,
)

Détection d’abus

Détection de patterns suspects

from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class ProfilUtilisateur:
    user_id: str
    requetes_recentes: list[dict] = field(default_factory=list)
    alertes: list[str] = field(default_factory=list)

class DetecteurAbus:
    """Détecte les patterns d'abus automatisé."""

    def __init__(self):
        self.profils: dict[str, ProfilUtilisateur] = {}

    def analyser(self, user_id: str, message: str) -> list[str]:
        """Analyse un message pour détecter des patterns d'abus."""
        if user_id not in self.profils:
            self.profils[user_id] = ProfilUtilisateur(user_id=user_id)

        profil = self.profils[user_id]
        alertes = []

        # Pattern 1 : messages identiques répétés
        if profil.requetes_recentes:
            derniers_messages = [r["message"] for r in profil.requetes_recentes[-5:]]
            if derniers_messages.count(message) >= 3:
                alertes.append("SPAM: message identique répété 3+ fois")

        # Pattern 2 : fréquence anormale
        maintenant = datetime.now()
        recentes = [
            r for r in profil.requetes_recentes
            if (maintenant - r["timestamp"]).seconds < 10
        ]
        if len(recentes) > 5:
            alertes.append("FLOOD: 5+ requêtes en 10 secondes")

        # Pattern 3 : messages très longs (tentative d'injection volumétrique)
        if len(message) > 10000:
            alertes.append(f"VOLUME: message de {len(message)} caractères")

        # Pattern 4 : variation systématique (fuzzing automatisé)
        if len(profil.requetes_recentes) >= 10:
            longueurs = [len(r["message"]) for r in profil.requetes_recentes[-10:]]
            if max(longueurs) - min(longueurs) < 5 and len(set(longueurs)) == 1:
                alertes.append("FUZZING: messages de longueur identique (automate)")

        # Enregistrer
        profil.requetes_recentes.append({
            "message": message,
            "timestamp": maintenant,
            "alertes": alertes,
        })

        return alertes

detecteur = DetecteurAbus()

Système de bannissement progressif

class GestionBannissement:
    """Bannissement progressif : avertissement, ralentissement, blocage."""

    def __init__(self):
        self.compteurs: dict[str, int] = defaultdict(int)
        self.bans: dict[str, float] = {}

    def signaler(self, user_id: str, severite: int = 1) -> str:
        """Signale un abus et applique la sanction appropriée."""
        self.compteurs[user_id] += severite
        score = self.compteurs[user_id]

        if score >= 10:
            self.bans[user_id] = time.time() + 86400  # Ban 24h
            return "BAN_24H"
        elif score >= 5:
            self.bans[user_id] = time.time() + 3600  # Ban 1h
            return "BAN_1H"
        elif score >= 3:
            return "RALENTIR"  # Rate limit réduit
        else:
            return "AVERTIR"

    def est_banni(self, user_id: str) -> bool:
        """Vérifie si un utilisateur est actuellement banni."""
        if user_id in self.bans:
            if time.time() < self.bans[user_id]:
                return True
            del self.bans[user_id]
        return False

Intégration complète

from openai import OpenAI

class APISecurisee:
    """API avec rate limiting et détection d'abus intégrés."""

    def __init__(self):
        self.client = OpenAI()
        self.rate_limiter = RateLimiter(max_requetes=20, fenetre_secondes=60)
        self.budget = TokenBudgetLimiter(100_000, 0.005, 0.015)
        self.detecteur = DetecteurAbus()
        self.bans = GestionBannissement()

    def traiter(self, user_id: str, message: str) -> str:
        # Étape 1 : vérifier le bannissement
        if self.bans.est_banni(user_id):
            return "Votre accès est temporairement suspendu."

        # Étape 2 : rate limiting
        autorise, info = self.rate_limiter.autoriser(user_id)
        if not autorise:
            return f"Trop de requêtes. Réessayez dans {info['reset_dans']}s."

        # Étape 3 : détection d'abus
        alertes = self.detecteur.analyser(user_id, message)
        if alertes:
            action = self.bans.signaler(user_id, severite=len(alertes))
            if action.startswith("BAN"):
                return "Comportement abusif détecté. Accès suspendu."

        # Étape 4 : vérifier le budget tokens
        tokens_estimes = len(message) // 4 + 500
        ok, budget_info = self.budget.verifier_budget(user_id, tokens_estimes)
        if not ok:
            return "Budget quotidien de tokens épuisé."

        # Étape 5 : appel au modèle
        response = self.client.chat.completions.create(
            model="gpt-5.4",
            messages=[{"role": "user", "content": message}],
        )

        # Enregistrer l'usage réel
        usage = response.usage
        self.budget.enregistrer_usage(user_id, usage.prompt_tokens, usage.completion_tokens)

        return response.choices[0].message.content

Points clés à retenir

  • Le rate limiting protège votre budget API et empêche les attaques automatisées
  • Combinez limite par requête (fenêtre glissante) et limite par coût (tokens)
  • La détection d’abus identifie les patterns : spam, flood, fuzzing, volume
  • Le bannissement progressif (avertir, ralentir, bloquer) évite les faux positifs définitifs
  • Chaque réponse doit inclure les headers de rate limit pour informer l’utilisateur