<НА ГЛАВНУЮ

Оркестрация параллельных AI-агентов на Parsl: мультиинструментальные рабочие процессы

'Практическое руководство по оркестровке нескольких инструментов на Parsl и генерации итоговой сводки с помощью небольшого LLM.'

Кратко о задаче

В этом материале показано, как построить лёгкий конвейер AI-агента, в котором несколько независимых Python-инструментов запускаются параллельно с помощью Parsl. Результаты каждой задачи собираются и передаются в небольшой модель Hugging Face для получения краткой и понятной сводки.

Среда и конфигурация Parsl

Установите зависимости и настройте Parsl с локальным ThreadPoolExecutor, чтобы выполнять Python-приложения одновременно:

!pip install -q parsl transformers accelerate
 
 
import math, json, time, random
from typing import List, Dict, Any
import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
from parsl import python_app
 
 
parsl.load(Config(executors=[ThreadPoolExecutor(label="local", max_threads=8)]))

Определение Parsl-приложений (инструментов)

Ниже показаны Parsl @python_app функции, работающие асинхронно в составе пайплайна агента. Включены вычисление чисел Фибоначчи, извлечение ключевых слов и симуляция внешнего инструмента. (В исходном тексте упоминается также подсчёт простых чисел; добавьте соответствующую функцию при необходимости.)

@python_app
def calc_fibonacci(n: int) -> Dict[str, Any]:
   def fib(k):
       a, b = 0, 1
       for _ in range(k): a, b = b, a + b
       return a
   t0 = time.time(); val = fib(n); dt = time.time() - t0
   return {"task": "fibonacci", "n": n, "value": val, "secs": round(dt, 4)}
 
 
@python_app
def extract_keywords(text: str, k: int = 8) -> Dict[str, Any]:
   import re, collections
   words = [w.lower() for w in re.findall(r"[a-zA-Z][a-zA-Z0-9\-]+", text)]
   stop = set("the a an and or to of is are was were be been in on for with as by from at this that it its if then else not no".split())
   cand = [w for w in words if w not in stop and len(w) > 3]
   freq = collections.Counter(cand)
   scored = sorted(freq.items(), key=lambda x: (x[1], len(x[0])), reverse=True)[:k]
   return {"task":"keywords","keywords":[w for w,_ in scored]}
 
 
@python_app
def simulate_tool(name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
   time.sleep(0.3 + random.random()*0.5)
   return {"task": name, "payload": payload, "status": "ok", "timestamp": time.time()}

Сводка через небольшой LLM

Используйте лёгкую модель Hugging Face, чтобы превратить собранные буллеты в краткую связную сводку. Функция формирует промпт со списком результатов и извлекает часть после маркера "Conclusion:".

def tiny_llm_summary(bullets: List[str]) -> str:
   from transformers import pipeline
   gen = pipeline("text-generation", model="sshleifer/tiny-gpt2")
   prompt = "Summarize these agent results clearly:\n- " + "\n- ".join(bullets) + "\nConclusion:"
   out = gen(prompt, max_length=160, do_sample=False)[0]["generated_text"]
   return out.split("Conclusion:", 1)[-1].strip()

Планирование вызовов инструментов

Функция plan переводит цель пользователя в список вызовов инструментов — проверяет ключевые слова и добавляет стандартные задачи: поиск в векторной БД, метрики и извлечение ключевых слов.

def plan(user_goal: str) -> List[Dict[str, Any]]:
   intents = []
   if "fibonacci" in user_goal.lower():
       intents.append({"tool":"calc_fibonacci", "args":{"n":35}})
   if "primes" in user_goal.lower():
       intents.append({"tool":"count_primes", "args":{"limit":100_000}})
   intents += [
       {"tool":"simulate_tool", "args":{"name":"vector_db_search","payload":{"q":user_goal}}},
       {"tool":"simulate_tool", "args":{"name":"metrics_fetch","payload":{"kpi":"latency_ms"}}},
       {"tool":"extract_keywords", "args":{"text":user_goal}}
   ]
   return intents

Запуск агента и агрегация результатов

Функция run_agent отправляет задачи Parsl-приложениями, ждёт их завершения, преобразует результаты в буллеты и формирует итоговую сводку через tiny_llm_summary.

def run_agent(user_goal: str) -> Dict[str, Any]:
   tasks = plan(user_goal)
   futures = []
   for t in tasks:
       if t["tool"]=="calc_fibonacci": futures.append(calc_fibonacci(**t["args"]))
       elif t["tool"]=="count_primes": futures.append(count_primes(**t["args"]))
       elif t["tool"]=="extract_keywords": futures.append(extract_keywords(**t["args"]))
       elif t["tool"]=="simulate_tool": futures.append(simulate_tool(**t["args"]))
   raw = [f.result() for f in futures]
 
 
   bullets = []
   for r in raw:
       if r["task"]=="fibonacci":
           bullets.append(f"Fibonacci({r['n']}) = {r['value']} computed in {r['secs']}s.")
       elif r["task"]=="count_primes":
           bullets.append(f"{r['count']} primes found ≤ {r['limit']}.")
       elif r["task"]=="keywords":
           bullets.append("Top keywords: " + ", ".join(r["keywords"]))
       else:
           bullets.append(f"Tool {r['task']} responded with status={r['status']}.")
 
 
   narrative = tiny_llm_summary(bullets)
   return {"goal": user_goal, "bullets": bullets, "summary": narrative, "raw": raw}

Пример выполнения

Запустите агент с комбинированной целью, чтобы проверить вычисления и генерацию сводки. Скрипт выводит буллеты, итоговую LLM-сводку и фрагмент raw JSON.

if __name__ == "__main__":
   goal = ("Analyze fibonacci(35) performance, count primes under 100k, "
           "and prepare a concise executive summary highlighting insights for planning.")
   result = run_agent(goal)
   print("\n=== Agent Bullets ===")
   for b in result["bullets"]: print("•", b)
   print("\n=== LLM Summary ===\n", result["summary"])
   print("\n=== Raw JSON ===\n", json.dumps(result["raw"], indent=2)[:800], "...")

Замечания по расширению

Архитектура остаётся модульной: добавляйте новые Parsl-приложения (например, GPU-модели или сетевые сервисы) и регулируйте исполнители под требования ресурсов. Преобразование структурированных результатов в естественный язык с помощью небольшой модели делает пайплайн удобным для отчётов, дашбордов и интерактивных агентов.

🇬🇧

Switch Language

Read this article in English

Switch to English