Architecture RAG complète
Objectifs
- Comprendre l’architecture RAG de bout en bout
- Implémenter un pipeline RAG fonctionnel
- Identifier les composants à optimiser
Qu’est-ce que le RAG ?
RAG (Retrieval-Augmented Generation) combine la recherche documentaire avec la génération de texte par un LLM. Au lieu de se fier uniquement à ses connaissances internes, le modèle reçoit des documents pertinents comme contexte.
Pourquoi le RAG ?
- Données propriétaires : le LLM ne connaît pas vos documents internes
- Informations récentes : les données postérieures à l’entraînement du modèle
- Vérifiabilité : chaque réponse est ancrée dans des sources identifiables
- Coût : moins cher que le fine-tuning pour ajouter des connaissances
Les étapes du pipeline
1. Ingestion (offline)
Documents → Découpage → Embeddings → Stockage vectoriel
2. Requête (online)
Question → Embedding → Recherche → Contexte + Question → LLM → Réponse
Implémentation complète
from openai import OpenAI
import numpy as np
client = OpenAI()
class PipelineRAG:
def __init__(
self,
model_embedding: str = "text-embedding-3-large",
model_generation: str = "gpt-5.3"
):
self.client = OpenAI()
self.model_embedding = model_embedding
self.model_generation = model_generation
self.chunks: list[dict] = []
self.embeddings: np.ndarray | None = None
# --- Phase d'ingestion ---
def ingerer(self, documents: list[dict]):
"""Ingère des documents {id, texte, metadata}."""
self.chunks = documents
textes = [d["texte"] for d in documents]
# Générer les embeddings par lots
tous_emb = []
for i in range(0, len(textes), 100):
batch = textes[i:i + 100]
resp = self.client.embeddings.create(
input=batch, model=self.model_embedding
)
batch_emb = sorted(resp.data, key=lambda x: x.index)
tous_emb.extend([e.embedding for e in batch_emb])
self.embeddings = np.array(tous_emb, dtype=np.float32)
print(f"Ingestion terminée : {len(self.chunks)} chunks indexés")
# --- Phase de retrieval ---
def rechercher(self, question: str, k: int = 5) -> list[dict]:
"""Trouve les k chunks les plus pertinents."""
query_emb = self.client.embeddings.create(
input=question, model=self.model_embedding
).data[0].embedding
scores = self.embeddings @ np.array(query_emb)
top_k = np.argsort(scores)[-k:][::-1]
return [
{**self.chunks[i], "score": float(scores[i])}
for i in top_k
]
# --- Phase de génération ---
def generer(self, question: str, contextes: list[dict]) -> str:
"""Génère une réponse basée sur les contextes retrouvés."""
# Construire le prompt
contexte_formate = "\n\n---\n\n".join(
f"[Source : {c.get('id', 'inconnu')}]\n{c['texte']}"
for c in contextes
)
messages = [
{
"role": "system",
"content": (
"Vous êtes un assistant qui répond aux questions "
"en vous basant uniquement sur les documents fournis. "
"Si l'information n'est pas dans les documents, "
"dites-le clairement. Citez vos sources."
)
},
{
"role": "user",
"content": (
f"Documents de référence :\n\n{contexte_formate}\n\n"
f"---\n\nQuestion : {question}"
)
}
]
response = self.client.chat.completions.create(
model=self.model_generation,
messages=messages,
temperature=0.2
)
return response.choices[0].message.content
# --- Pipeline complet ---
def repondre(self, question: str, k: int = 5) -> dict:
"""Pipeline RAG complet : recherche → génération."""
contextes = self.rechercher(question, k=k)
reponse = self.generer(question, contextes)
return {
"question": question,
"reponse": reponse,
"sources": [
{"id": c["id"], "score": c["score"],
"extrait": c["texte"][:150]}
for c in contextes
]
}
Utilisation
# Préparer les documents
documents = [
{
"id": "politique-conges",
"texte": "Les employés bénéficient de 25 jours de congés payés "
"par an. Les congés doivent être posés au moins 2 semaines "
"à l'avance via le portail RH.",
"metadata": {"type": "rh", "date": "2026-01"}
},
{
"id": "politique-teletravail",
"texte": "Le télétravail est autorisé jusqu'à 3 jours par semaine. "
"Les mardi et jeudi sont des jours de présence obligatoire. "
"Un accord écrit avec le manager est requis.",
"metadata": {"type": "rh", "date": "2026-01"}
},
{
"id": "procedure-remboursement",
"texte": "Les notes de frais doivent être soumises dans les 30 jours "
"suivant la dépense. Joindre le justificatif original. "
"Validation par le N+1 requise au-delà de 200 euros.",
"metadata": {"type": "finance", "date": "2026-01"}
},
]
# Créer le pipeline
rag = PipelineRAG()
rag.ingerer(documents)
# Poser une question
resultat = rag.repondre("Combien de jours de télétravail par semaine ?")
print(resultat["reponse"])
print("\nSources utilisées :")
for s in resultat["sources"]:
print(f" [{s['score']:.3f}] {s['id']}: {s['extrait']}")
Les points d’optimisation
Chaque étape du pipeline peut être améliorée :
| Étape | Optimisations possibles |
|---|---|
| Découpage | Taille des chunks, overlap, chunking sémantique |
| Embedding | Choix du modèle, dimensions, enrichissement du texte |
| Recherche | Hybride, reranking, filtres par métadonnées |
| Prompt | Instructions système, format du contexte, few-shot |
| Génération | Modèle, température, citations |
Les leçons suivantes détaillent chacune de ces optimisations.
Résumé
- RAG = Recherche + Contexte + Génération par LLM
- Le pipeline se décompose en ingestion (offline) et requête (online)
- Chaque composant (chunking, embedding, recherche, prompt, génération) peut être optimisé indépendamment
- Le RAG est plus flexible et moins coûteux que le fine-tuning pour ajouter des connaissances