Aller au contenu principal

Automatiser des workflows multi-étapes

Orchestrer des tâches complexes

Un workflow réel dépasse rarement une seule action. Remplir un CRM implique de naviguer entre les pages, de chercher un contact, de mettre à jour ses informations, puis de créer une opportunité. Cette leçon vous apprend à structurer ces automatisations multi-étapes.

Architecture en étapes

Découpez votre workflow en étapes atomiques, chacune avec un objectif clair et un critère de succès :

workflow = [
    {
        "name": "connexion",
        "task": "Connecte-toi au CRM avec les identifiants fournis.",
        "success_url_contains": "/dashboard",
    },
    {
        "name": "recherche_contact",
        "task": "Cherche le contact 'Jean Dupont' dans la barre de recherche.",
        "success_text_contains": "Jean Dupont",
    },
    {
        "name": "mise_a_jour",
        "task": "Ouvre la fiche de Jean Dupont et mets à jour son téléphone : 06 98 76 54 32.",
    },
    {
        "name": "creation_opportunite",
        "task": "Crée une opportunité : 'Migration Cloud Q2 2026', montant 50000 EUR, probabilité 60%.",
    },
]

L’orchestrateur

Créez un orchestrateur qui exécute les étapes séquentiellement avec gestion d’erreurs :

def run_workflow(workflow: list, page, max_retries: int = 2):
    """Exécute un workflow multi-étapes avec retry."""
    results = []

    for step in workflow:
        step_name = step["name"]
        print(f"\n--- Étape : {step_name} ---")

        for attempt in range(max_retries + 1):
            try:
                result = run_agent(step["task"], page=page)

                # Vérification optionnelle par URL
                url_check = step.get("success_url_contains")
                if url_check and url_check not in page.url:
                    print(f"  URL attendue non trouvée, retry...")
                    continue

                print(f"  Succès (tentative {attempt + 1})")
                results.append({
                    "step": step_name,
                    "status": "success",
                    "attempts": attempt + 1,
                })
                break

            except Exception as e:
                print(f"  Erreur : {e}")
                if attempt == max_retries:
                    results.append({
                        "step": step_name,
                        "status": "failed",
                        "error": str(e),
                    })

    return results

Passer des données entre les étapes

Souvent, une étape produit des données nécessaires à la suivante. Utilisez un contexte partagé :

class WorkflowContext:
    """Stocke les données partagées entre les étapes."""

    def __init__(self):
        self.data = {}

    def set(self, key: str, value):
        self.data[key] = value

    def get(self, key: str, default=None):
        return self.data.get(key, default)

    def inject(self, template: str) -> str:
        """Remplace les variables {clé} dans le template."""
        task = template
        for key, value in self.data.items():
            task = task.replace("{" + key + "}", str(value))
        return task


# Exemple d'utilisation
ctx = WorkflowContext()

# Étape 1 : l'agent extrait un prix
prix_extrait = run_agent(
    "Lis le prix total affiché et retourne uniquement le nombre.",
    page=page
)
ctx.set("prix", prix_extrait)

# Étape 2 : utilise le prix extrait
task_validation = ctx.inject(
    "Le montant attendu est {prix} EUR. Vérifie et valide la commande."
)
run_agent(task_validation, page=page)

Workflows conditionnels

La prochaine étape peut dépendre du résultat de la précédente :

def conditional_workflow(page):
    """Workflow avec branchement conditionnel."""
    # Étape 1 : Lire le statut
    status = run_agent(
        "Lis le statut de la commande. "
        "Retourne uniquement : en_cours, livré ou annulé.",
        page=page
    )

    # Branchement selon le résultat
    if "en_cours" in status:
        run_agent(
            "La commande est en cours. Clique sur 'Suivre le colis'.",
            page=page
        )
    elif "livré" in status:
        run_agent(
            "La commande est livrée. Clique sur 'Laisser un avis'.",
            page=page
        )
    elif "annulé" in status:
        run_agent(
            "La commande est annulée. Clique sur 'Recommander'.",
            page=page
        )

Parallélisation

Pour les tâches indépendantes, lancez plusieurs agents en parallèle :

import asyncio
from playwright.async_api import async_playwright

async def parallel_extraction(urls: list[str]):
    """Extrait des données de plusieurs pages en parallèle."""
    async with async_playwright() as pw:
        browser = await pw.chromium.launch(headless=True)

        async def extract_one(url: str):
            page = await browser.new_page(
                viewport={"width": 1280, "height": 720}
            )
            await page.goto(url)
            result = await run_agent_async(
                "Extrais le titre et le prix du produit.", page=page
            )
            await page.close()
            return {"url": url, "data": result}

        results = await asyncio.gather(
            *[extract_one(url) for url in urls]
        )

    return results

# Lancer
urls = [
    "https://shop.example.com/product/1",
    "https://shop.example.com/product/2",
    "https://shop.example.com/product/3",
]
data = asyncio.run(parallel_extraction(urls))

Points clés à retenir

  • Découpez les workflows en étapes atomiques avec des critères de succès explicites
  • Ajoutez du retry automatique pour chaque étape (2-3 tentatives)
  • Utilisez un contexte partagé pour passer des données entre les étapes
  • Les workflows conditionnels permettent de s’adapter au contenu de la page
  • Parallélisez les tâches indépendantes pour gagner du temps