Оркестрация параллельных AI-агентов на Parsl: мультиинструментальные рабочие процессы
'Практическое руководство по оркестровке нескольких инструментов на Parsl и генерации итоговой сводки с помощью небольшого LLM.'
Кратко о задаче
В этом материале показано, как построить лёгкий конвейер AI-агента, в котором несколько независимых Python-инструментов запускаются параллельно с помощью Parsl. Результаты каждой задачи собираются и передаются в небольшой модель Hugging Face для получения краткой и понятной сводки.
Среда и конфигурация Parsl
Установите зависимости и настройте Parsl с локальным ThreadPoolExecutor, чтобы выполнять Python-приложения одновременно:
!pip install -q parsl transformers accelerate
import math, json, time, random
from typing import List, Dict, Any
import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
from parsl import python_app
parsl.load(Config(executors=[ThreadPoolExecutor(label="local", max_threads=8)]))Определение Parsl-приложений (инструментов)
Ниже показаны Parsl @python_app функции, работающие асинхронно в составе пайплайна агента. Включены вычисление чисел Фибоначчи, извлечение ключевых слов и симуляция внешнего инструмента. (В исходном тексте упоминается также подсчёт простых чисел; добавьте соответствующую функцию при необходимости.)
@python_app
def calc_fibonacci(n: int) -> Dict[str, Any]:
def fib(k):
a, b = 0, 1
for _ in range(k): a, b = b, a + b
return a
t0 = time.time(); val = fib(n); dt = time.time() - t0
return {"task": "fibonacci", "n": n, "value": val, "secs": round(dt, 4)}
@python_app
def extract_keywords(text: str, k: int = 8) -> Dict[str, Any]:
import re, collections
words = [w.lower() for w in re.findall(r"[a-zA-Z][a-zA-Z0-9\-]+", text)]
stop = set("the a an and or to of is are was were be been in on for with as by from at this that it its if then else not no".split())
cand = [w for w in words if w not in stop and len(w) > 3]
freq = collections.Counter(cand)
scored = sorted(freq.items(), key=lambda x: (x[1], len(x[0])), reverse=True)[:k]
return {"task":"keywords","keywords":[w for w,_ in scored]}
@python_app
def simulate_tool(name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
time.sleep(0.3 + random.random()*0.5)
return {"task": name, "payload": payload, "status": "ok", "timestamp": time.time()}Сводка через небольшой LLM
Используйте лёгкую модель Hugging Face, чтобы превратить собранные буллеты в краткую связную сводку. Функция формирует промпт со списком результатов и извлекает часть после маркера "Conclusion:".
def tiny_llm_summary(bullets: List[str]) -> str:
from transformers import pipeline
gen = pipeline("text-generation", model="sshleifer/tiny-gpt2")
prompt = "Summarize these agent results clearly:\n- " + "\n- ".join(bullets) + "\nConclusion:"
out = gen(prompt, max_length=160, do_sample=False)[0]["generated_text"]
return out.split("Conclusion:", 1)[-1].strip()Планирование вызовов инструментов
Функция plan переводит цель пользователя в список вызовов инструментов — проверяет ключевые слова и добавляет стандартные задачи: поиск в векторной БД, метрики и извлечение ключевых слов.
def plan(user_goal: str) -> List[Dict[str, Any]]:
intents = []
if "fibonacci" in user_goal.lower():
intents.append({"tool":"calc_fibonacci", "args":{"n":35}})
if "primes" in user_goal.lower():
intents.append({"tool":"count_primes", "args":{"limit":100_000}})
intents += [
{"tool":"simulate_tool", "args":{"name":"vector_db_search","payload":{"q":user_goal}}},
{"tool":"simulate_tool", "args":{"name":"metrics_fetch","payload":{"kpi":"latency_ms"}}},
{"tool":"extract_keywords", "args":{"text":user_goal}}
]
return intentsЗапуск агента и агрегация результатов
Функция run_agent отправляет задачи Parsl-приложениями, ждёт их завершения, преобразует результаты в буллеты и формирует итоговую сводку через tiny_llm_summary.
def run_agent(user_goal: str) -> Dict[str, Any]:
tasks = plan(user_goal)
futures = []
for t in tasks:
if t["tool"]=="calc_fibonacci": futures.append(calc_fibonacci(**t["args"]))
elif t["tool"]=="count_primes": futures.append(count_primes(**t["args"]))
elif t["tool"]=="extract_keywords": futures.append(extract_keywords(**t["args"]))
elif t["tool"]=="simulate_tool": futures.append(simulate_tool(**t["args"]))
raw = [f.result() for f in futures]
bullets = []
for r in raw:
if r["task"]=="fibonacci":
bullets.append(f"Fibonacci({r['n']}) = {r['value']} computed in {r['secs']}s.")
elif r["task"]=="count_primes":
bullets.append(f"{r['count']} primes found ≤ {r['limit']}.")
elif r["task"]=="keywords":
bullets.append("Top keywords: " + ", ".join(r["keywords"]))
else:
bullets.append(f"Tool {r['task']} responded with status={r['status']}.")
narrative = tiny_llm_summary(bullets)
return {"goal": user_goal, "bullets": bullets, "summary": narrative, "raw": raw}Пример выполнения
Запустите агент с комбинированной целью, чтобы проверить вычисления и генерацию сводки. Скрипт выводит буллеты, итоговую LLM-сводку и фрагмент raw JSON.
if __name__ == "__main__":
goal = ("Analyze fibonacci(35) performance, count primes under 100k, "
"and prepare a concise executive summary highlighting insights for planning.")
result = run_agent(goal)
print("\n=== Agent Bullets ===")
for b in result["bullets"]: print("•", b)
print("\n=== LLM Summary ===\n", result["summary"])
print("\n=== Raw JSON ===\n", json.dumps(result["raw"], indent=2)[:800], "...")Замечания по расширению
Архитектура остаётся модульной: добавляйте новые Parsl-приложения (например, GPU-модели или сетевые сервисы) и регулируйте исполнители под требования ресурсов. Преобразование структурированных результатов в естественный язык с помощью небольшой модели делает пайплайн удобным для отчётов, дашбордов и интерактивных агентов.
Switch Language
Read this article in English