Aller au contenu principal

Production : scaling et caching

Objectifs

  • Optimiser les performances d’un système d’embeddings en production
  • Implémenter le caching pour réduire les coûts et la latence
  • Gérer le scaling et la haute disponibilité

Les défis de la production

En développement, vous appelez l’API OpenAI pour chaque requête. En production, cela pose trois problèmes :

  • Coût : chaque appel est facturé
  • Latence : un aller-retour réseau de 100-300 ms par requête
  • Rate limits : OpenAI limite le nombre d’appels par minute

Caching des embeddings

Cache en mémoire avec LRU

Pour les applications à trafic modéré, un cache LRU en mémoire est le plus simple :

from functools import lru_cache
from openai import OpenAI
import hashlib
import json

client = OpenAI()

@lru_cache(maxsize=10_000)
def embedding_cache(texte: str, model: str = "text-embedding-3-large") -> tuple:
    """Cache LRU pour les embeddings (tuple car hashable)."""
    response = client.embeddings.create(input=texte, model=model)
    return tuple(response.data[0].embedding)

# Utilisation - le 2e appel est instantané
emb1 = embedding_cache("Bonjour le monde")  # Appel API
emb2 = embedding_cache("Bonjour le monde")  # Depuis le cache

Cache persistant avec Redis

Pour un cache qui survit aux redémarrages :

import redis
import numpy as np
import json

class EmbeddingCacheRedis:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.client = OpenAI()
        self.ttl = 86400 * 30  # 30 jours

    def _cache_key(self, texte: str, model: str) -> str:
        h = hashlib.md5(f"{model}:{texte}".encode()).hexdigest()
        return f"emb:{h}"

    def get_embedding(
        self, texte: str, model: str = "text-embedding-3-large"
    ) -> list[float]:
        key = self._cache_key(texte, model)

        # Chercher dans le cache
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        # Générer et cacher
        response = self.client.embeddings.create(
            input=texte, model=model
        )
        embedding = response.data[0].embedding

        self.redis.setex(key, self.ttl, json.dumps(embedding))
        return embedding

    def get_embeddings_batch(
        self, textes: list[str], model: str = "text-embedding-3-large"
    ) -> list[list[float]]:
        """Batch avec cache partiel."""
        resultats = [None] * len(textes)
        a_calculer = []
        indices_a_calculer = []

        # Vérifier le cache pour chaque texte
        for i, texte in enumerate(textes):
            key = self._cache_key(texte, model)
            cached = self.redis.get(key)
            if cached:
                resultats[i] = json.loads(cached)
            else:
                a_calculer.append(texte)
                indices_a_calculer.append(i)

        # Calculer les manquants en un seul appel
        if a_calculer:
            response = self.client.embeddings.create(
                input=a_calculer, model=model
            )
            for resp_item in sorted(response.data, key=lambda x: x.index):
                idx = indices_a_calculer[resp_item.index]
                texte = a_calculer[resp_item.index]
                embedding = resp_item.embedding

                resultats[idx] = embedding
                key = self._cache_key(texte, model)
                self.redis.setex(key, self.ttl, json.dumps(embedding))

        print(f"Cache: {len(textes) - len(a_calculer)}/{len(textes)} hits")
        return resultats

# Utilisation
cache = EmbeddingCacheRedis()
emb = cache.get_embedding("test de cache")

Gestion des rate limits

import time
from openai import RateLimitError

class EmbeddingClientRobuste:
    def __init__(self, max_retries: int = 5):
        self.client = OpenAI()
        self.max_retries = max_retries

    def create(
        self, input: str | list[str], model: str = "text-embedding-3-large",
        **kwargs
    ):
        """Appel avec retry exponentiel."""
        for tentative in range(self.max_retries):
            try:
                return self.client.embeddings.create(
                    input=input, model=model, **kwargs
                )
            except RateLimitError as e:
                if tentative == self.max_retries - 1:
                    raise
                wait = 2 ** tentative
                print(f"Rate limit atteint, pause de {wait}s...")
                time.sleep(wait)

Traitement asynchrone

Pour les pipelines d’ingestion, utilisez l’async pour paralléliser :

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def generer_embeddings_async(
    textes: list[str],
    model: str = "text-embedding-3-large",
    max_concurrent: int = 5
) -> list[list[float]]:
    """Génère des embeddings en parallèle avec sémaphore."""
    semaphore = asyncio.Semaphore(max_concurrent)
    resultats = [None] * len(textes)

    async def traiter_batch(batch_idx, batch):
        async with semaphore:
            response = await async_client.embeddings.create(
                input=batch, model=model
            )
            return batch_idx, [
                d.embedding
                for d in sorted(response.data, key=lambda x: x.index)
            ]

    # Découper en lots de 100
    taches = []
    batch_size = 100
    for i in range(0, len(textes), batch_size):
        batch = textes[i:i + batch_size]
        taches.append(traiter_batch(i, batch))

    for coro in asyncio.as_completed(taches):
        batch_idx, embeddings = await coro
        for j, emb in enumerate(embeddings):
            resultats[batch_idx + j] = emb

    return resultats

# Utilisation
embeddings = asyncio.run(
    generer_embeddings_async(mes_textes, max_concurrent=3)
)

Optimiser le stockage

Quantification des vecteurs

Réduisez la taille des vecteurs sans perte significative de qualité :

import numpy as np

def quantifier_int8(embeddings: np.ndarray) -> tuple:
    """Quantifie les embeddings en int8 (4x moins de mémoire)."""
    min_val = embeddings.min(axis=1, keepdims=True)
    max_val = embeddings.max(axis=1, keepdims=True)
    scale = (max_val - min_val) / 255.0

    quantifie = ((embeddings - min_val) / scale).astype(np.int8)
    return quantifie, min_val, scale

def dequantifier_int8(quantifie, min_val, scale):
    """Restaure les embeddings depuis int8."""
    return quantifie.astype(np.float32) * scale + min_val

# Comparaison taille
embeddings = np.random.randn(10000, 3072).astype(np.float32)
print(f"float32 : {embeddings.nbytes / 1e6:.1f} Mo")  # ~122 Mo

quantifie, min_val, scale = quantifier_int8(embeddings)
print(f"int8 :    {quantifie.nbytes / 1e6:.1f} Mo")    # ~30 Mo

Réduction de dimensions à l’API

Utilisez le paramètre dimensions pour réduire le stockage :

# 3072 dim -> ~24 Ko par vecteur
# 1024 dim -> ~8 Ko par vecteur  (réduction API)
# 256 dim  -> ~2 Ko par vecteur  (réduction API)

response = client.embeddings.create(
    input="texte",
    model="text-embedding-3-large",
    dimensions=1024  # 3x moins de stockage
)

Architecture de production

class PipelineProduction:
    """Pipeline RAG optimisé pour la production."""

    def __init__(self):
        self.cache = EmbeddingCacheRedis()
        self.client_robuste = EmbeddingClientRobuste()

    def ingerer(self, documents: list[dict]):
        """Ingestion avec cache et retry."""
        textes = [d["texte"] for d in documents]
        embeddings = self.cache.get_embeddings_batch(textes)
        # Stocker dans la base vectorielle...

    def rechercher(self, question: str, k: int = 5):
        """Recherche avec cache de la requête."""
        query_emb = self.cache.get_embedding(question)
        # Rechercher dans la base vectorielle...
        return query_emb

Checklist de mise en production

ÉlémentDescription
Cache RedisRéduire les appels API et la latence
Retry exponentielGérer les rate limits gracieusement
MonitoringTracker le taux de cache hit, la latence, les erreurs
Batch processingTraiter les gros volumes en lots asynchrones
QuantificationRéduire le stockage de 4x avec int8
Réduction de dimUtiliser 1024 ou 512 dim si la qualité le permet
BackupSauvegarder régulièrement les embeddings et les métadonnées
AlertingAlerter sur les erreurs API et les dégradations de performance

Résumé

  • Le cache Redis réduit les coûts et la latence de 90%+
  • Le retry exponentiel gère les rate limits d’OpenAI
  • Le traitement asynchrone accélère l’ingestion de gros corpus
  • La quantification int8 réduit le stockage de 4x
  • La réduction de dimensions à l’API est le levier le plus simple