Aller au contenu principal

Détection d'anomalies et de duplicates

Objectifs

  • Détecter les textes anormaux dans un corpus
  • Identifier les duplicates sémantiques
  • Construire un pipeline de déduplication

Détection d’anomalies

Un texte est « anormal » s’il est sémantiquement éloigné de tous les autres dans le corpus. Les embeddings permettent de mesurer cette distance.

Approche par distance au centroïde

from openai import OpenAI
import numpy as np

client = OpenAI()

def detecter_anomalies(
    textes: list[str],
    seuil_percentile: float = 95,
    model: str = "text-embedding-3-large"
) -> list[dict]:
    """Détecte les textes anormaux par distance au centroïde."""
    response = client.embeddings.create(
        input=textes, model=model
    )
    embeddings = np.array(
        [d.embedding for d in sorted(response.data, key=lambda x: x.index)]
    )

    # Centroïde du corpus
    centroide = embeddings.mean(axis=0)
    centroide /= np.linalg.norm(centroide)

    # Distance de chaque texte au centroïde
    similarites = embeddings @ centroide
    seuil = np.percentile(similarites, 100 - seuil_percentile)

    anomalies = []
    for i, sim in enumerate(similarites):
        if sim < seuil:
            anomalies.append({
                "index": i,
                "texte": textes[i],
                "similarite": float(sim),
                "est_anomalie": True
            })

    return anomalies

# Exemple
corpus = [
    "Notre politique de congés prévoit 25 jours par an",
    "Les congés doivent être posés 2 semaines à l'avance",
    "Le solde de congés est visible sur le portail RH",
    "Les congés exceptionnels sont accordés sur justificatif",
    "SELECT * FROM users WHERE admin=1; DROP TABLE users;--",  # Anomalie !
    "La politique de télétravail autorise 3 jours par semaine",
]

anomalies = detecter_anomalies(corpus)
for a in anomalies:
    print(f"  ANOMALIE [{a['similarite']:.3f}] : {a['texte'][:60]}...")

Approche par k-plus proches voisins

def detecter_anomalies_knn(
    textes: list[str],
    k: int = 3,
    seuil_percentile: float = 90,
    model: str = "text-embedding-3-large"
) -> list[dict]:
    """Détecte les anomalies par distance aux k voisins les plus proches."""
    response = client.embeddings.create(
        input=textes, model=model
    )
    embeddings = np.array(
        [d.embedding for d in sorted(response.data, key=lambda x: x.index)]
    )

    # Matrice de similarité
    sim_matrix = embeddings @ embeddings.T

    # Pour chaque texte, moyenne des k plus proches voisins
    scores_isolation = []
    for i in range(len(textes)):
        sims = sim_matrix[i].copy()
        sims[i] = -1  # Exclure soi-même
        top_k_sims = np.sort(sims)[-k:]
        scores_isolation.append(top_k_sims.mean())

    seuil = np.percentile(scores_isolation, 100 - seuil_percentile)

    anomalies = []
    for i, score in enumerate(scores_isolation):
        if score < seuil:
            anomalies.append({
                "index": i,
                "texte": textes[i],
                "score_isolation": float(score)
            })

    return anomalies

Détection de duplicates sémantiques

Deux textes peuvent être des duplicates même avec des mots différents. Les embeddings les détectent.

Trouver les paires similaires

def trouver_duplicates(
    textes: list[str],
    seuil: float = 0.90,
    model: str = "text-embedding-3-large"
) -> list[dict]:
    """Trouve les paires de textes sémantiquement identiques."""
    response = client.embeddings.create(
        input=textes, model=model
    )
    embeddings = np.array(
        [d.embedding for d in sorted(response.data, key=lambda x: x.index)]
    )

    # Matrice de similarité
    sim_matrix = embeddings @ embeddings.T

    duplicates = []
    for i in range(len(textes)):
        for j in range(i + 1, len(textes)):
            if sim_matrix[i][j] >= seuil:
                duplicates.append({
                    "texte_a": textes[i],
                    "texte_b": textes[j],
                    "index_a": i,
                    "index_b": j,
                    "similarite": float(sim_matrix[i][j])
                })

    return sorted(duplicates, key=lambda x: x["similarite"], reverse=True)

# Exemple
textes = [
    "Comment réinitialiser mon mot de passe ?",
    "Je n'arrive pas à me connecter à mon compte",
    "Mot de passe oublié, comment faire ?",           # Duplicate de [0]
    "Quelle est la politique de remboursement ?",
    "Impossible d'accéder à mon espace personnel",    # Duplicate de [1]
    "Comment obtenir un remboursement ?",              # Duplicate de [3]
]

dupes = trouver_duplicates(textes, seuil=0.85)
print(f"{len(dupes)} paires de duplicates trouvées :")
for d in dupes:
    print(f"  [{d['similarite']:.3f}]")
    print(f"    A: {d['texte_a']}")
    print(f"    B: {d['texte_b']}")

Pipeline de déduplication

class Deduplicateur:
    def __init__(self, seuil: float = 0.90):
        self.client = OpenAI()
        self.seuil = seuil

    def dedupliquer(self, textes: list[str]) -> dict:
        """Supprime les duplicates sémantiques."""
        response = self.client.embeddings.create(
            input=textes, model="text-embedding-3-large"
        )
        embeddings = np.array(
            [d.embedding for d in sorted(response.data, key=lambda x: x.index)]
        )

        # Union-Find pour regrouper les duplicates
        parent = list(range(len(textes)))

        def find(x):
            while parent[x] != x:
                parent[x] = parent[parent[x]]
                x = parent[x]
            return x

        def union(a, b):
            pa, pb = find(a), find(b)
            if pa != pb:
                parent[pb] = pa

        # Trouver les paires similaires
        sim_matrix = embeddings @ embeddings.T
        for i in range(len(textes)):
            for j in range(i + 1, len(textes)):
                if sim_matrix[i][j] >= self.seuil:
                    union(i, j)

        # Garder un représentant par groupe
        groupes = {}
        for i in range(len(textes)):
            root = find(i)
            if root not in groupes:
                groupes[root] = []
            groupes[root].append(i)

        # Sélectionner les textes uniques (le plus court de chaque groupe)
        uniques = []
        supprimes = []
        for root, membres in groupes.items():
            representant = min(membres, key=lambda i: len(textes[i]))
            uniques.append(textes[representant])
            for m in membres:
                if m != representant:
                    supprimes.append(textes[m])

        return {
            "uniques": uniques,
            "supprimes": supprimes,
            "n_original": len(textes),
            "n_deduplique": len(uniques),
            "n_supprimes": len(supprimes)
        }

# Utilisation
dedup = Deduplicateur(seuil=0.85)
resultat = dedup.dedupliquer(textes)
print(f"Avant : {resultat['n_original']} textes")
print(f"Après : {resultat['n_deduplique']} textes")
print(f"Supprimés : {resultat['n_supprimes']}")

Résumé

  • La distance au centroïde ou aux k-NN détecte les textes anormaux
  • La similarité cosinus au-dessus d’un seuil identifie les duplicates
  • L’Union-Find regroupe efficacement les duplicates transitifs
  • Calibrez les seuils sur vos données (0.85-0.95 typiquement pour les duplicates)