Aller au contenu principal

Rate limits, quotas et tiers

Rate limits, quotas et tiers

Les rate limits définissent combien de requêtes vous pouvez envoyer à l’API par minute. Comprendre et gérer ces limites est crucial pour éviter les interruptions de service en production.

Le système de tiers

OpenAI organise les limites en tiers (niveaux) basés sur votre historique de facturation :

# Les tiers déterminent vos limites
# Tier 1 : Après premier paiement réussi
# Tier 2 : Après 50$ de paiements cumulés + 7 jours
# Tier 3 : Après 100$ de paiements cumulés + 7 jours
# Tier 4 : Après 250$ de paiements cumulés + 14 jours
# Tier 5 : Après 1000$ de paiements cumulés + 30 jours

# Les limites augmentent significativement avec chaque tier

Types de rate limits

Il existe trois types de limites qui s’appliquent simultanément :

# 1. RPM — Requests Per Minute (requêtes par minute)
# 2. TPM — Tokens Per Minute (tokens par minute)
# 3. RPD — Requests Per Day (requêtes par jour)

# Exemple pour GPT-5.3, Tier 3 :
# RPM : 5 000 requêtes/minute
# TPM : 2 000 000 tokens/minute
# RPD : 10 000 requêtes/jour

Lire les headers de rate limit

Chaque réponse inclut des headers indiquant votre consommation :

from openai import OpenAI
client = OpenAI()

response = client.responses.create(
    model="gpt-5.3",
    input="Bonjour !"
)

# Accéder aux headers via la réponse HTTP brute
# En utilisant le mode with_raw_response
raw = client.responses.with_raw_response.create(
    model="gpt-5.3",
    input="Bonjour !"
)

print(f"Limite RPM : {raw.headers.get('x-ratelimit-limit-requests')}")
print(f"Restant RPM : {raw.headers.get('x-ratelimit-remaining-requests')}")
print(f"Limite TPM : {raw.headers.get('x-ratelimit-limit-tokens')}")
print(f"Restant TPM : {raw.headers.get('x-ratelimit-remaining-tokens')}")
print(f"Reset dans : {raw.headers.get('x-ratelimit-reset-requests')}")

response = raw.parse()
print(f"Réponse : {response.output_text}")

Rate limiter côté client

Limiter le débit avec un sémaphore

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

class RateLimiter:
    """Limite le nombre de requêtes simultanées."""

    def __init__(self, max_concurrent: int = 10, rpm: int = 500):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rpm = rpm
        self.delay = 60.0 / rpm  # Délai entre chaque requête

    async def appeler(self, prompt: str) -> str:
        async with self.semaphore:
            await asyncio.sleep(self.delay)
            response = await async_client.responses.create(
                model="gpt-5.3",
                input=prompt
            )
            return response.output_text

async def traitement_batch():
    limiter = RateLimiter(max_concurrent=5, rpm=100)

    prompts = [f"Résumez le concept {i}" for i in range(50)]
    tasks = [limiter.appeler(p) for p in prompts]
    resultats = await asyncio.gather(*tasks)

    for i, r in enumerate(resultats):
        print(f"Résultat {i}: {r[:50]}...")

# asyncio.run(traitement_batch())

Token bucket simple

import time
import threading

class TokenBucket:
    """Rate limiter basé sur le token bucket algorithm."""

    def __init__(self, tokens_par_seconde: float, capacite_max: int):
        self.tokens_par_seconde = tokens_par_seconde
        self.capacite_max = capacite_max
        self.tokens = capacite_max
        self.last_refill = time.monotonic()
        self.lock = threading.Lock()

    def attendre(self, tokens_necessaires: int = 1):
        """Attend qu'assez de tokens soient disponibles."""
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.last_refill
                self.tokens = min(
                    self.capacite_max,
                    self.tokens + elapsed * self.tokens_par_seconde
                )
                self.last_refill = now

                if self.tokens >= tokens_necessaires:
                    self.tokens -= tokens_necessaires
                    return

            time.sleep(0.1)

# Utilisation : 50 requêtes par minute max
bucket = TokenBucket(tokens_par_seconde=50/60, capacite_max=50)

def appel_limite(prompt: str) -> str:
    bucket.attendre(1)
    response = client.responses.create(
        model="gpt-5.3",
        input=prompt
    )
    return response.output_text

Stratégies pour les gros volumes

1. Répartir entre plusieurs projets

from openai import OpenAI

# Chaque projet a ses propres rate limits
clients = [
    OpenAI(project="proj-aaaa"),  # Projet 1
    OpenAI(project="proj-bbbb"),  # Projet 2
    OpenAI(project="proj-cccc"),  # Projet 3
]

def round_robin_call(prompt: str, clients: list) -> str:
    """Distribue les appels entre plusieurs projets."""
    import itertools
    client_cycle = itertools.cycle(clients)
    client = next(client_cycle)
    return client.responses.create(
        model="gpt-5.3",
        input=prompt
    ).output_text

2. Utiliser le bon modèle pour le bon volume

# o4-mini a des rate limits plus élevés et coûte moins
# Utilisez-le pour les tâches à haut volume

# GPT-5.4 a des rate limits plus bas
# Réservez-le pour les tâches critiques

3. Mise en file d’attente

import queue
import threading

class APIQueue:
    """File d'attente pour les appels API."""

    def __init__(self, rpm: int = 100):
        self.queue = queue.Queue()
        self.delay = 60.0 / rpm
        self.running = True
        self.worker = threading.Thread(target=self._process, daemon=True)
        self.worker.start()

    def _process(self):
        while self.running:
            try:
                prompt, future = self.queue.get(timeout=1)
                response = client.responses.create(
                    model="gpt-5.3",
                    input=prompt
                )
                future["result"] = response.output_text
                future["event"].set()
                time.sleep(self.delay)
            except queue.Empty:
                continue

    def submit(self, prompt: str) -> str:
        future = {"result": None, "event": threading.Event()}
        self.queue.put((prompt, future))
        future["event"].wait()
        return future["result"]

Points clés à retenir

  • Les rate limits s’appliquent en RPM, TPM et RPD simultanément
  • Votre tier détermine vos limites — il augmente avec vos paiements cumulés
  • Lisez les headers x-ratelimit-* pour connaître votre consommation en temps réel
  • Implémentez un rate limiter côté client pour éviter les erreurs 429
  • Répartissez la charge entre projets pour les gros volumes
  • o4-mini offre les rate limits les plus élevés pour le traitement en masse