Détection d'anomalies et de duplicates
Objectifs
- Détecter les textes anormaux dans un corpus
- Identifier les duplicates sémantiques
- Construire un pipeline de déduplication
Détection d’anomalies
Un texte est « anormal » s’il est sémantiquement éloigné de tous les autres dans le corpus. Les embeddings permettent de mesurer cette distance.
Approche par distance au centroïde
from openai import OpenAI
import numpy as np
client = OpenAI()
def detecter_anomalies(
textes: list[str],
seuil_percentile: float = 95,
model: str = "text-embedding-3-large"
) -> list[dict]:
"""Détecte les textes anormaux par distance au centroïde."""
response = client.embeddings.create(
input=textes, model=model
)
embeddings = np.array(
[d.embedding for d in sorted(response.data, key=lambda x: x.index)]
)
# Centroïde du corpus
centroide = embeddings.mean(axis=0)
centroide /= np.linalg.norm(centroide)
# Distance de chaque texte au centroïde
similarites = embeddings @ centroide
seuil = np.percentile(similarites, 100 - seuil_percentile)
anomalies = []
for i, sim in enumerate(similarites):
if sim < seuil:
anomalies.append({
"index": i,
"texte": textes[i],
"similarite": float(sim),
"est_anomalie": True
})
return anomalies
# Exemple
corpus = [
"Notre politique de congés prévoit 25 jours par an",
"Les congés doivent être posés 2 semaines à l'avance",
"Le solde de congés est visible sur le portail RH",
"Les congés exceptionnels sont accordés sur justificatif",
"SELECT * FROM users WHERE admin=1; DROP TABLE users;--", # Anomalie !
"La politique de télétravail autorise 3 jours par semaine",
]
anomalies = detecter_anomalies(corpus)
for a in anomalies:
print(f" ANOMALIE [{a['similarite']:.3f}] : {a['texte'][:60]}...")
Approche par k-plus proches voisins
def detecter_anomalies_knn(
textes: list[str],
k: int = 3,
seuil_percentile: float = 90,
model: str = "text-embedding-3-large"
) -> list[dict]:
"""Détecte les anomalies par distance aux k voisins les plus proches."""
response = client.embeddings.create(
input=textes, model=model
)
embeddings = np.array(
[d.embedding for d in sorted(response.data, key=lambda x: x.index)]
)
# Matrice de similarité
sim_matrix = embeddings @ embeddings.T
# Pour chaque texte, moyenne des k plus proches voisins
scores_isolation = []
for i in range(len(textes)):
sims = sim_matrix[i].copy()
sims[i] = -1 # Exclure soi-même
top_k_sims = np.sort(sims)[-k:]
scores_isolation.append(top_k_sims.mean())
seuil = np.percentile(scores_isolation, 100 - seuil_percentile)
anomalies = []
for i, score in enumerate(scores_isolation):
if score < seuil:
anomalies.append({
"index": i,
"texte": textes[i],
"score_isolation": float(score)
})
return anomalies
Détection de duplicates sémantiques
Deux textes peuvent être des duplicates même avec des mots différents. Les embeddings les détectent.
Trouver les paires similaires
def trouver_duplicates(
textes: list[str],
seuil: float = 0.90,
model: str = "text-embedding-3-large"
) -> list[dict]:
"""Trouve les paires de textes sémantiquement identiques."""
response = client.embeddings.create(
input=textes, model=model
)
embeddings = np.array(
[d.embedding for d in sorted(response.data, key=lambda x: x.index)]
)
# Matrice de similarité
sim_matrix = embeddings @ embeddings.T
duplicates = []
for i in range(len(textes)):
for j in range(i + 1, len(textes)):
if sim_matrix[i][j] >= seuil:
duplicates.append({
"texte_a": textes[i],
"texte_b": textes[j],
"index_a": i,
"index_b": j,
"similarite": float(sim_matrix[i][j])
})
return sorted(duplicates, key=lambda x: x["similarite"], reverse=True)
# Exemple
textes = [
"Comment réinitialiser mon mot de passe ?",
"Je n'arrive pas à me connecter à mon compte",
"Mot de passe oublié, comment faire ?", # Duplicate de [0]
"Quelle est la politique de remboursement ?",
"Impossible d'accéder à mon espace personnel", # Duplicate de [1]
"Comment obtenir un remboursement ?", # Duplicate de [3]
]
dupes = trouver_duplicates(textes, seuil=0.85)
print(f"{len(dupes)} paires de duplicates trouvées :")
for d in dupes:
print(f" [{d['similarite']:.3f}]")
print(f" A: {d['texte_a']}")
print(f" B: {d['texte_b']}")
Pipeline de déduplication
class Deduplicateur:
def __init__(self, seuil: float = 0.90):
self.client = OpenAI()
self.seuil = seuil
def dedupliquer(self, textes: list[str]) -> dict:
"""Supprime les duplicates sémantiques."""
response = self.client.embeddings.create(
input=textes, model="text-embedding-3-large"
)
embeddings = np.array(
[d.embedding for d in sorted(response.data, key=lambda x: x.index)]
)
# Union-Find pour regrouper les duplicates
parent = list(range(len(textes)))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
pa, pb = find(a), find(b)
if pa != pb:
parent[pb] = pa
# Trouver les paires similaires
sim_matrix = embeddings @ embeddings.T
for i in range(len(textes)):
for j in range(i + 1, len(textes)):
if sim_matrix[i][j] >= self.seuil:
union(i, j)
# Garder un représentant par groupe
groupes = {}
for i in range(len(textes)):
root = find(i)
if root not in groupes:
groupes[root] = []
groupes[root].append(i)
# Sélectionner les textes uniques (le plus court de chaque groupe)
uniques = []
supprimes = []
for root, membres in groupes.items():
representant = min(membres, key=lambda i: len(textes[i]))
uniques.append(textes[representant])
for m in membres:
if m != representant:
supprimes.append(textes[m])
return {
"uniques": uniques,
"supprimes": supprimes,
"n_original": len(textes),
"n_deduplique": len(uniques),
"n_supprimes": len(supprimes)
}
# Utilisation
dedup = Deduplicateur(seuil=0.85)
resultat = dedup.dedupliquer(textes)
print(f"Avant : {resultat['n_original']} textes")
print(f"Après : {resultat['n_deduplique']} textes")
print(f"Supprimés : {resultat['n_supprimes']}")
Résumé
- La distance au centroïde ou aux k-NN détecte les textes anormaux
- La similarité cosinus au-dessus d’un seuil identifie les duplicates
- L’Union-Find regroupe efficacement les duplicates transitifs
- Calibrez les seuils sur vos données (0.85-0.95 typiquement pour les duplicates)