Autonomous Multi-Agent Data Orchestrator with Lightweight Qwen Models
'A hands-on tutorial showing how lightweight Qwen2.5-0.5B-Instruct agents manage ingestion, quality, and infrastructure optimization in multi-agent data pipelines.'
Autonomous Multi-Agent Data Orchestrator with Lightweight Qwen Models
This tutorial demonstrates how to build an agentic data and infrastructure strategy system powered by the lightweight Qwen2.5-0.5B-Instruct model. The design focuses on a modular LLM agent foundation, specialized agents for distinct data responsibilities, and an orchestrator that coordinates autonomous collaboration across a data pipeline.
Lightweight LLM agent foundation
We start by creating a flexible agent class that loads the Qwen model and exposes a conversational generation interface. This base agent manages tokenizer/model loading, device selection, and a simple conversation history to support context-aware replies.
!pip install -q transformers torch accelerate datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
class LightweightLLMAgent:
def __init__(self, role: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
self.role = role
self.model_name = model_name
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading {model_name} for {role} agent on {self.device}...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
device_map="auto"
)
self.conversation_history = []
def generate_response(self, prompt: str, max_tokens: int = 150) -> str:
messages = [
{"role": "system", "content": f"You are a {self.role} agent in a data infrastructure system."},
{"role": "user", "content": prompt}
]
text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
model_inputs.input_ids,
max_new_tokens=max_tokens,
temperature=0.7,
do_sample=True,
top_p=0.95
)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
self.conversation_history.append({"prompt": prompt, "response": response})
return responseThis base agent becomes the foundation for specialized roles that focus on ingestion, quality, and infrastructure.
Specialized agents: ingestion and quality
Next, we implement domain-specific agents that inherit from the lightweight agent. The Data Ingestion agent analyzes a source and returns a recommended ingestion strategy. The Data Quality agent assesses completeness, consistency, and issues, and rates severity.
class DataIngestionAgent(LightweightLLMAgent):
def __init__(self):
super().__init__(role="Data Ingestion Specialist")
def analyze_data_source(self, source_info: Dict) -> Dict:
prompt = f"""Analyze this data source and provide ingestion strategy:
Source Type: {source_info.get('type', 'unknown')}
Volume: {source_info.get('volume', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Provide a brief strategy focusing on: 1) Ingestion method, 2) Key considerations."""
strategy = self.generate_response(prompt, max_tokens=100)
return {"source": source_info, "strategy": strategy, "timestamp": datetime.now().isoformat()}
class DataQualityAgent(LightweightLLMAgent):
def __init__(self):
super().__init__(role="Data Quality Analyst")
def assess_data_quality(self, data_sample: Dict) -> Dict:
prompt = f"""Assess data quality for this sample:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Issues Found: {data_sample.get('issues', 0)}
Provide brief quality assessment and top 2 recommendations."""
assessment = self.generate_response(prompt, max_tokens=100)
return {"assessment": assessment, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
def _calculate_severity(self, data_sample: Dict) -> str:
completeness = data_sample.get('completeness', 100)
consistency = data_sample.get('consistency', 100)
avg_score = (completeness + consistency) / 2
if avg_score >= 90: return "LOW"
elif avg_score >= 70: return "MEDIUM"
else: return "HIGH"These agents give structured, actionable outputs that can be consumed by orchestration logic or operators.
Infrastructure optimization agent
An agent dedicated to infrastructure continuously analyzes metrics and proposes optimizations. It also assesses priority based on CPU and memory pressure.
class InfrastructureOptimizationAgent(LightweightLLMAgent):
def __init__(self):
super().__init__(role="Infrastructure Optimization Specialist")
def optimize_resources(self, metrics: Dict) -> Dict:
prompt = f"""Analyze infrastructure metrics and suggest optimizations:
CPU Usage: {metrics.get('cpu_usage', 0)}%
Memory Usage: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Query Latency: {metrics.get('query_latency', 0)}ms
Provide 2 optimization recommendations."""
recommendations = self.generate_response(prompt, max_tokens=100)
return {"current_metrics": metrics, "recommendations": recommendations, "priority": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
def _calculate_priority(self, metrics: Dict) -> str:
cpu = metrics.get('cpu_usage', 0)
memory = metrics.get('memory_usage', 0)
if cpu > 85 or memory > 85: return "CRITICAL"
elif cpu > 70 or memory > 70: return "HIGH"
else: return "NORMAL"This agent helps maintain performance and resource efficiency by providing prioritized recommendations.
Agentic orchestrator
The orchestrator coordinates the specialized agents in a sequential pipeline: ingestion analysis, data quality assessment, and infrastructure optimization. It logs execution and can produce summary reports.
class AgenticDataOrchestrator:
def __init__(self):
print("\n" + "="*70)
print("Initializing Agentic Data Infrastructure System")
print("="*70 + "\n")
self.ingestion_agent = DataIngestionAgent()
self.quality_agent = DataQualityAgent()
self.optimization_agent = InfrastructureOptimizationAgent()
self.execution_log = []
def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
results = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "stages": []}
print("\n[Stage 1] Data Ingestion Analysis")
ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("source", {}))
print(f"Strategy: {ingestion_result['strategy'][:150]}...")
results["stages"].append({"stage": "ingestion", "result": ingestion_result})
print("\n[Stage 2] Data Quality Assessment")
quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
print(f"Assessment: {quality_result['assessment'][:150]}...")
print(f"Severity: {quality_result['severity']}")
results["stages"].append({"stage": "quality", "result": quality_result})
print("\n[Stage 3] Infrastructure Optimization")
optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
print(f"Recommendations: {optimization_result['recommendations'][:150]}...")
print(f"Priority: {optimization_result['priority']}")
results["stages"].append({"stage": "optimization", "result": optimization_result})
results["end_time"] = datetime.now().isoformat()
results["status"] = "completed"
self.execution_log.append(results)
return results
def generate_summary_report(self) -> pd.DataFrame:
if not self.execution_log: return pd.DataFrame()
summary_data = []
for log in self.execution_log:
summary_data.append({"Pipeline ID": log["pipeline_id"], "Start Time": log["start_time"], "Status": log["status"], "Stages Completed": len(log["stages"])})
return pd.DataFrame(summary_data)Examples and running the system
Two example pipelines (e-commerce and IoT) illustrate how the orchestrator triggers each agent and aggregates results. The final summary report helps validate pipeline executions and what each agent contributed.
def main():
orchestrator = AgenticDataOrchestrator()
print("\n" + "="*70)
print("EXAMPLE 1: E-commerce Data Pipeline")
print("="*70)
ecommerce_pipeline = {
"id": "ecommerce_pipeline_001",
"source": {"type": "REST API", "volume": "10GB/day", "frequency": "real-time"},
"quality_metrics": {"completeness": 87, "consistency": 92, "issues": 15},
"infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
}
result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
print("\n\n" + "="*70)
print("EXAMPLE 2: IoT Sensor Data Pipeline")
print("="*70)
iot_pipeline = {
"id": "iot_pipeline_002",
"source": {"type": "Message Queue (Kafka)", "volume": "50GB/day", "frequency": "streaming"},
"quality_metrics": {"completeness": 95, "consistency": 88, "issues": 8},
"infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
}
result2 = orchestrator.process_data_pipeline(iot_pipeline)
print("\n\n" + "="*70)
print("EXECUTION SUMMARY REPORT")
print("="*70 + "\n")
summary_df = orchestrator.generate_summary_report()
print(summary_df.to_string(index=False))
print("\n" + "="*70)
print("Tutorial Complete!")
print("="*70)
print("\nKey Concepts Demonstrated:")
print("✓ Lightweight LLM agent architecture")
print("✓ Specialized agents for different data tasks")
print("✓ Multi-agent orchestration")
print("✓ Infrastructure monitoring and optimization")
print("✓ Autonomous decision-making in data pipelines")
if __name__ == "__main__":
main()The pattern shown here highlights how compact open-source models can power efficient, autonomous agents. The orchestrator pattern brings these agents together into a repeatable workflow that can be extended for production MLOps and data engineering needs.
Сменить язык
Читать эту статью на русском