Production : scaling et caching
Objectifs
- Optimiser les performances d’un système d’embeddings en production
- Implémenter le caching pour réduire les coûts et la latence
- Gérer le scaling et la haute disponibilité
Les défis de la production
En développement, vous appelez l’API OpenAI pour chaque requête. En production, cela pose trois problèmes :
- Coût : chaque appel est facturé
- Latence : un aller-retour réseau de 100-300 ms par requête
- Rate limits : OpenAI limite le nombre d’appels par minute
Caching des embeddings
Cache en mémoire avec LRU
Pour les applications à trafic modéré, un cache LRU en mémoire est le plus simple :
from functools import lru_cache
from openai import OpenAI
import hashlib
import json
client = OpenAI()
@lru_cache(maxsize=10_000)
def embedding_cache(texte: str, model: str = "text-embedding-3-large") -> tuple:
"""Cache LRU pour les embeddings (tuple car hashable)."""
response = client.embeddings.create(input=texte, model=model)
return tuple(response.data[0].embedding)
# Utilisation - le 2e appel est instantané
emb1 = embedding_cache("Bonjour le monde") # Appel API
emb2 = embedding_cache("Bonjour le monde") # Depuis le cache
Cache persistant avec Redis
Pour un cache qui survit aux redémarrages :
import redis
import numpy as np
import json
class EmbeddingCacheRedis:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.client = OpenAI()
self.ttl = 86400 * 30 # 30 jours
def _cache_key(self, texte: str, model: str) -> str:
h = hashlib.md5(f"{model}:{texte}".encode()).hexdigest()
return f"emb:{h}"
def get_embedding(
self, texte: str, model: str = "text-embedding-3-large"
) -> list[float]:
key = self._cache_key(texte, model)
# Chercher dans le cache
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Générer et cacher
response = self.client.embeddings.create(
input=texte, model=model
)
embedding = response.data[0].embedding
self.redis.setex(key, self.ttl, json.dumps(embedding))
return embedding
def get_embeddings_batch(
self, textes: list[str], model: str = "text-embedding-3-large"
) -> list[list[float]]:
"""Batch avec cache partiel."""
resultats = [None] * len(textes)
a_calculer = []
indices_a_calculer = []
# Vérifier le cache pour chaque texte
for i, texte in enumerate(textes):
key = self._cache_key(texte, model)
cached = self.redis.get(key)
if cached:
resultats[i] = json.loads(cached)
else:
a_calculer.append(texte)
indices_a_calculer.append(i)
# Calculer les manquants en un seul appel
if a_calculer:
response = self.client.embeddings.create(
input=a_calculer, model=model
)
for resp_item in sorted(response.data, key=lambda x: x.index):
idx = indices_a_calculer[resp_item.index]
texte = a_calculer[resp_item.index]
embedding = resp_item.embedding
resultats[idx] = embedding
key = self._cache_key(texte, model)
self.redis.setex(key, self.ttl, json.dumps(embedding))
print(f"Cache: {len(textes) - len(a_calculer)}/{len(textes)} hits")
return resultats
# Utilisation
cache = EmbeddingCacheRedis()
emb = cache.get_embedding("test de cache")
Gestion des rate limits
import time
from openai import RateLimitError
class EmbeddingClientRobuste:
def __init__(self, max_retries: int = 5):
self.client = OpenAI()
self.max_retries = max_retries
def create(
self, input: str | list[str], model: str = "text-embedding-3-large",
**kwargs
):
"""Appel avec retry exponentiel."""
for tentative in range(self.max_retries):
try:
return self.client.embeddings.create(
input=input, model=model, **kwargs
)
except RateLimitError as e:
if tentative == self.max_retries - 1:
raise
wait = 2 ** tentative
print(f"Rate limit atteint, pause de {wait}s...")
time.sleep(wait)
Traitement asynchrone
Pour les pipelines d’ingestion, utilisez l’async pour paralléliser :
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def generer_embeddings_async(
textes: list[str],
model: str = "text-embedding-3-large",
max_concurrent: int = 5
) -> list[list[float]]:
"""Génère des embeddings en parallèle avec sémaphore."""
semaphore = asyncio.Semaphore(max_concurrent)
resultats = [None] * len(textes)
async def traiter_batch(batch_idx, batch):
async with semaphore:
response = await async_client.embeddings.create(
input=batch, model=model
)
return batch_idx, [
d.embedding
for d in sorted(response.data, key=lambda x: x.index)
]
# Découper en lots de 100
taches = []
batch_size = 100
for i in range(0, len(textes), batch_size):
batch = textes[i:i + batch_size]
taches.append(traiter_batch(i, batch))
for coro in asyncio.as_completed(taches):
batch_idx, embeddings = await coro
for j, emb in enumerate(embeddings):
resultats[batch_idx + j] = emb
return resultats
# Utilisation
embeddings = asyncio.run(
generer_embeddings_async(mes_textes, max_concurrent=3)
)
Optimiser le stockage
Quantification des vecteurs
Réduisez la taille des vecteurs sans perte significative de qualité :
import numpy as np
def quantifier_int8(embeddings: np.ndarray) -> tuple:
"""Quantifie les embeddings en int8 (4x moins de mémoire)."""
min_val = embeddings.min(axis=1, keepdims=True)
max_val = embeddings.max(axis=1, keepdims=True)
scale = (max_val - min_val) / 255.0
quantifie = ((embeddings - min_val) / scale).astype(np.int8)
return quantifie, min_val, scale
def dequantifier_int8(quantifie, min_val, scale):
"""Restaure les embeddings depuis int8."""
return quantifie.astype(np.float32) * scale + min_val
# Comparaison taille
embeddings = np.random.randn(10000, 3072).astype(np.float32)
print(f"float32 : {embeddings.nbytes / 1e6:.1f} Mo") # ~122 Mo
quantifie, min_val, scale = quantifier_int8(embeddings)
print(f"int8 : {quantifie.nbytes / 1e6:.1f} Mo") # ~30 Mo
Réduction de dimensions à l’API
Utilisez le paramètre dimensions pour réduire le stockage :
# 3072 dim -> ~24 Ko par vecteur
# 1024 dim -> ~8 Ko par vecteur (réduction API)
# 256 dim -> ~2 Ko par vecteur (réduction API)
response = client.embeddings.create(
input="texte",
model="text-embedding-3-large",
dimensions=1024 # 3x moins de stockage
)
Architecture de production
class PipelineProduction:
"""Pipeline RAG optimisé pour la production."""
def __init__(self):
self.cache = EmbeddingCacheRedis()
self.client_robuste = EmbeddingClientRobuste()
def ingerer(self, documents: list[dict]):
"""Ingestion avec cache et retry."""
textes = [d["texte"] for d in documents]
embeddings = self.cache.get_embeddings_batch(textes)
# Stocker dans la base vectorielle...
def rechercher(self, question: str, k: int = 5):
"""Recherche avec cache de la requête."""
query_emb = self.cache.get_embedding(question)
# Rechercher dans la base vectorielle...
return query_emb
Checklist de mise en production
| Élément | Description |
|---|---|
| Cache Redis | Réduire les appels API et la latence |
| Retry exponentiel | Gérer les rate limits gracieusement |
| Monitoring | Tracker le taux de cache hit, la latence, les erreurs |
| Batch processing | Traiter les gros volumes en lots asynchrones |
| Quantification | Réduire le stockage de 4x avec int8 |
| Réduction de dim | Utiliser 1024 ou 512 dim si la qualité le permet |
| Backup | Sauvegarder régulièrement les embeddings et les métadonnées |
| Alerting | Alerter sur les erreurs API et les dégradations de performance |
Résumé
- Le cache Redis réduit les coûts et la latence de 90%+
- Le retry exponentiel gère les rate limits d’OpenAI
- Le traitement asynchrone accélère l’ingestion de gros corpus
- La quantification int8 réduit le stockage de 4x
- La réduction de dimensions à l’API est le levier le plus simple