<НА ГЛАВНУЮ

Автономный мультиагентный оркестратор данных на лёгких моделях Qwen

'Руководство демонстрирует, как лёгкие Qwen-агенты автоматически анализируют инжест, проверяют качество и оптимизируют инфраструктуру в мультиагентном конвейере данных.'

Автономный мультиагентный оркестратор данных на лёгких моделях Qwen

В этом руководстве показано, как создать систему агентной стратегии данных и инфраструктуры на основе компактной модели Qwen2.5-0.5B-Instruct. Подход включает базовый агент LLM, специализированные агенты для отдельных задач данных и оркестратор, координирующий их взаимодействие в рамках конвейера.

Базовая архитектура лёгкого LLM-агента

Сначала создаём гибкий класс агента, который загружает модель Qwen и предоставляет интерфейс генерации ответов. Базовый агент управляет загрузкой токенизатора/модели, выбором устройства и простой историей диалога для контекстного ответа.

!pip install -q transformers torch accelerate datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
 
 
class LightweightLLMAgent:
   def __init__(self, role: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
       self.role = role
       self.model_name = model_name
       self.device = "cuda" if torch.cuda.is_available() else "cpu"
       print(f"Loading {model_name} for {role} agent on {self.device}...")
       self.tokenizer = AutoTokenizer.from_pretrained(model_name)
       self.model = AutoModelForCausalLM.from_pretrained(
           model_name,
           torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
           device_map="auto"
       )
       self.conversation_history = []
 
 
   def generate_response(self, prompt: str, max_tokens: int = 150) -> str:
       messages = [
           {"role": "system", "content": f"You are a {self.role} agent in a data infrastructure system."},
           {"role": "user", "content": prompt}
       ]
       text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
       model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
       with torch.no_grad():
           generated_ids = self.model.generate(
               model_inputs.input_ids,
               max_new_tokens=max_tokens,
               temperature=0.7,
               do_sample=True,
               top_p=0.95
           )
       generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
       response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
       self.conversation_history.append({"prompt": prompt, "response": response})
       return response

Этот базовый агент служит основой для специализированных ролей, которые затем реализуют конкретную логику для задач данных.

Специализированные агенты: инжест и качество данных

Реализуем агенты, наследующиеся от базового класса. Аgent по инжесту анализирует источник и предлагает стратегию, агент по качеству оценивает полноту, согласованность и проблемы, а также вычисляет уровень серьёзности.

class DataIngestionAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Data Ingestion Specialist")
   def analyze_data_source(self, source_info: Dict) -> Dict:
       prompt = f"""Analyze this data source and provide ingestion strategy:
Source Type: {source_info.get('type', 'unknown')}
Volume: {source_info.get('volume', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Provide a brief strategy focusing on: 1) Ingestion method, 2) Key considerations."""
       strategy = self.generate_response(prompt, max_tokens=100)
       return {"source": source_info, "strategy": strategy, "timestamp": datetime.now().isoformat()}
 
 
class DataQualityAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Data Quality Analyst")
   def assess_data_quality(self, data_sample: Dict) -> Dict:
       prompt = f"""Assess data quality for this sample:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Issues Found: {data_sample.get('issues', 0)}
Provide brief quality assessment and top 2 recommendations."""
       assessment = self.generate_response(prompt, max_tokens=100)
       return {"assessment": assessment, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
   def _calculate_severity(self, data_sample: Dict) -> str:
       completeness = data_sample.get('completeness', 100)
       consistency = data_sample.get('consistency', 100)
       avg_score = (completeness + consistency) / 2
       if avg_score >= 90: return "LOW"
       elif avg_score >= 70: return "MEDIUM"
       else: return "HIGH"

Такие агенты генерируют структурированные, применимые рекомендации, которые можно использовать в оркестраторе.

Агент оптимизации инфраструктуры

Агент оптимизации постоянно анализирует метрики и предлагает улучшения, а также оценивает приоритет на основе загрузки CPU и памяти.

class InfrastructureOptimizationAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Infrastructure Optimization Specialist")
   def optimize_resources(self, metrics: Dict) -> Dict:
       prompt = f"""Analyze infrastructure metrics and suggest optimizations:
CPU Usage: {metrics.get('cpu_usage', 0)}%
Memory Usage: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Query Latency: {metrics.get('query_latency', 0)}ms
Provide 2 optimization recommendations."""
       recommendations = self.generate_response(prompt, max_tokens=100)
       return {"current_metrics": metrics, "recommendations": recommendations, "priority": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
   def _calculate_priority(self, metrics: Dict) -> str:
       cpu = metrics.get('cpu_usage', 0)
       memory = metrics.get('memory_usage', 0)
       if cpu > 85 or memory > 85: return "CRITICAL"
       elif cpu > 70 or memory > 70: return "HIGH"
       else: return "NORMAL"

Этот агент помогает поддерживать производительность и экономичность ресурсов.

Оркестратор агентной системы

Оркестратор координирует запуск агентов по стадиям: анализ инжеста, проверка качества и оптимизация инфраструктуры. Он ведёт лог исполнения и формирует сводные отчёты.

class AgenticDataOrchestrator:
   def __init__(self):
       print("\n" + "="*70)
       print("Initializing Agentic Data Infrastructure System")
       print("="*70 + "\n")
       self.ingestion_agent = DataIngestionAgent()
       self.quality_agent = DataQualityAgent()
       self.optimization_agent = InfrastructureOptimizationAgent()
       self.execution_log = []
   def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
       results = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "stages": []}
       print("\n[Stage 1] Data Ingestion Analysis")
       ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("source", {}))
       print(f"Strategy: {ingestion_result['strategy'][:150]}...")
       results["stages"].append({"stage": "ingestion", "result": ingestion_result})
       print("\n[Stage 2] Data Quality Assessment")
       quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
       print(f"Assessment: {quality_result['assessment'][:150]}...")
       print(f"Severity: {quality_result['severity']}")
       results["stages"].append({"stage": "quality", "result": quality_result})
       print("\n[Stage 3] Infrastructure Optimization")
       optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
       print(f"Recommendations: {optimization_result['recommendations'][:150]}...")
       print(f"Priority: {optimization_result['priority']}")
       results["stages"].append({"stage": "optimization", "result": optimization_result})
       results["end_time"] = datetime.now().isoformat()
       results["status"] = "completed"
       self.execution_log.append(results)
       return results
   def generate_summary_report(self) -> pd.DataFrame:
       if not self.execution_log: return pd.DataFrame()
       summary_data = []
       for log in self.execution_log:
           summary_data.append({"Pipeline ID": log["pipeline_id"], "Start Time": log["start_time"], "Status": log["status"], "Stages Completed": len(log["stages"])})
       return pd.DataFrame(summary_data)

Примеры и запуск

Два примера — e-commerce и IoT — показывают, как оркестратор инициирует агентов и собирает результаты. Сводный отчёт подтверждает корректность выполнения и вклад каждого агента.

def main():
   orchestrator = AgenticDataOrchestrator()
   print("\n" + "="*70)
   print("EXAMPLE 1: E-commerce Data Pipeline")
   print("="*70)
   ecommerce_pipeline = {
       "id": "ecommerce_pipeline_001",
       "source": {"type": "REST API", "volume": "10GB/day", "frequency": "real-time"},
       "quality_metrics": {"completeness": 87, "consistency": 92, "issues": 15},
       "infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
   }
   result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
   print("\n\n" + "="*70)
   print("EXAMPLE 2: IoT Sensor Data Pipeline")
   print("="*70)
   iot_pipeline = {
       "id": "iot_pipeline_002",
       "source": {"type": "Message Queue (Kafka)", "volume": "50GB/day", "frequency": "streaming"},
       "quality_metrics": {"completeness": 95, "consistency": 88, "issues": 8},
       "infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
   }
   result2 = orchestrator.process_data_pipeline(iot_pipeline)
   print("\n\n" + "="*70)
   print("EXECUTION SUMMARY REPORT")
   print("="*70 + "\n")
   summary_df = orchestrator.generate_summary_report()
   print(summary_df.to_string(index=False))
   print("\n" + "="*70)
   print("Tutorial Complete!")
   print("="*70)
   print("\nKey Concepts Demonstrated:")
   print("✓ Lightweight LLM agent architecture")
   print("✓ Specialized agents for different data tasks")
   print("✓ Multi-agent orchestration")
   print("✓ Infrastructure monitoring and optimization")
   print("✓ Autonomous decision-making in data pipelines")
 
 
if __name__ == "__main__":
   main()

Приведённый паттерн показывает, как компактные открытые модели могут эффективно питать автономных агентов, а оркестрация превращает их в расширяемый рабочий процесс для MLOps и инженерии данных.

🇬🇧

Switch Language

Read this article in English

Switch to English