Automatiser des workflows multi-étapes
Orchestrer des tâches complexes
Un workflow réel dépasse rarement une seule action. Remplir un CRM implique de naviguer entre les pages, de chercher un contact, de mettre à jour ses informations, puis de créer une opportunité. Cette leçon vous apprend à structurer ces automatisations multi-étapes.
Architecture en étapes
Découpez votre workflow en étapes atomiques, chacune avec un objectif clair et un critère de succès :
workflow = [
{
"name": "connexion",
"task": "Connecte-toi au CRM avec les identifiants fournis.",
"success_url_contains": "/dashboard",
},
{
"name": "recherche_contact",
"task": "Cherche le contact 'Jean Dupont' dans la barre de recherche.",
"success_text_contains": "Jean Dupont",
},
{
"name": "mise_a_jour",
"task": "Ouvre la fiche de Jean Dupont et mets à jour son téléphone : 06 98 76 54 32.",
},
{
"name": "creation_opportunite",
"task": "Crée une opportunité : 'Migration Cloud Q2 2026', montant 50000 EUR, probabilité 60%.",
},
]
L’orchestrateur
Créez un orchestrateur qui exécute les étapes séquentiellement avec gestion d’erreurs :
def run_workflow(workflow: list, page, max_retries: int = 2):
"""Exécute un workflow multi-étapes avec retry."""
results = []
for step in workflow:
step_name = step["name"]
print(f"\n--- Étape : {step_name} ---")
for attempt in range(max_retries + 1):
try:
result = run_agent(step["task"], page=page)
# Vérification optionnelle par URL
url_check = step.get("success_url_contains")
if url_check and url_check not in page.url:
print(f" URL attendue non trouvée, retry...")
continue
print(f" Succès (tentative {attempt + 1})")
results.append({
"step": step_name,
"status": "success",
"attempts": attempt + 1,
})
break
except Exception as e:
print(f" Erreur : {e}")
if attempt == max_retries:
results.append({
"step": step_name,
"status": "failed",
"error": str(e),
})
return results
Passer des données entre les étapes
Souvent, une étape produit des données nécessaires à la suivante. Utilisez un contexte partagé :
class WorkflowContext:
"""Stocke les données partagées entre les étapes."""
def __init__(self):
self.data = {}
def set(self, key: str, value):
self.data[key] = value
def get(self, key: str, default=None):
return self.data.get(key, default)
def inject(self, template: str) -> str:
"""Remplace les variables {clé} dans le template."""
task = template
for key, value in self.data.items():
task = task.replace("{" + key + "}", str(value))
return task
# Exemple d'utilisation
ctx = WorkflowContext()
# Étape 1 : l'agent extrait un prix
prix_extrait = run_agent(
"Lis le prix total affiché et retourne uniquement le nombre.",
page=page
)
ctx.set("prix", prix_extrait)
# Étape 2 : utilise le prix extrait
task_validation = ctx.inject(
"Le montant attendu est {prix} EUR. Vérifie et valide la commande."
)
run_agent(task_validation, page=page)
Workflows conditionnels
La prochaine étape peut dépendre du résultat de la précédente :
def conditional_workflow(page):
"""Workflow avec branchement conditionnel."""
# Étape 1 : Lire le statut
status = run_agent(
"Lis le statut de la commande. "
"Retourne uniquement : en_cours, livré ou annulé.",
page=page
)
# Branchement selon le résultat
if "en_cours" in status:
run_agent(
"La commande est en cours. Clique sur 'Suivre le colis'.",
page=page
)
elif "livré" in status:
run_agent(
"La commande est livrée. Clique sur 'Laisser un avis'.",
page=page
)
elif "annulé" in status:
run_agent(
"La commande est annulée. Clique sur 'Recommander'.",
page=page
)
Parallélisation
Pour les tâches indépendantes, lancez plusieurs agents en parallèle :
import asyncio
from playwright.async_api import async_playwright
async def parallel_extraction(urls: list[str]):
"""Extrait des données de plusieurs pages en parallèle."""
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
async def extract_one(url: str):
page = await browser.new_page(
viewport={"width": 1280, "height": 720}
)
await page.goto(url)
result = await run_agent_async(
"Extrais le titre et le prix du produit.", page=page
)
await page.close()
return {"url": url, "data": result}
results = await asyncio.gather(
*[extract_one(url) for url in urls]
)
return results
# Lancer
urls = [
"https://shop.example.com/product/1",
"https://shop.example.com/product/2",
"https://shop.example.com/product/3",
]
data = asyncio.run(parallel_extraction(urls))
Points clés à retenir
- Découpez les workflows en étapes atomiques avec des critères de succès explicites
- Ajoutez du retry automatique pour chaque étape (2-3 tentatives)
- Utilisez un contexte partagé pour passer des données entre les étapes
- Les workflows conditionnels permettent de s’adapter au contenu de la page
- Parallélisez les tâches indépendantes pour gagner du temps