Оффлайн многоинструментный агент рассуждения: динамическое планирование, восстановление ошибок и интеллектуальная маршрутизация
'Практическое руководство по сборке оффлайн многоинструментного агента с валидацией схем, маршрутизацией через Instructor и логикой восстановления ошибок. Включены демо-коды и моки инструментов.'
Создание полностью оффлайн многоинструментного агента рассуждения
В этом руководстве показано, как собрать компактного, полностью оффлайн агента, который выбирает инструменты, валидирует входные данные, планирует многозадачные рабочие процессы и восстанавливается после ошибок. Система использует Instructor, Transformers и Pydantic-схемы для структурирования рассуждений и оркестрации вызовов инструментов.
Подготовка окружения и зависимости
Установите необходимые библиотеки и создайте генеративный пайплайн, пригодный для оффлайн-режима. Ниже показан фрагмент, который устанавливает пакеты и пытается импортировать Instructor.
import subprocess
import sys
def install_dependencies():
import torch
packages = [
"instructor",
"transformers>=4.35.0",
"torch",
"accelerate",
"pydantic>=2.0.0",
"numpy",
"pandas"
]
if torch.cuda.is_available():
packages.append("bitsandbytes")
print(" GPU detected - installing quantization support")
else:
print(" No GPU detected - will use CPU (slower but works)")
for package in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
try:
import instructor
except ImportError:
print(" Installing dependencies...")
install_dependencies()
print(" Installation complete!")Подготовьте остальные импорты и утилиты, которые будут использоваться в агенте.
from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import instructor
import json
from datetime import datetime
import rePydantic-схемы: структура и валидация
Определите строгие типизированные схемы для описания SQL-запросов, преобразований данных, API-запросов, артефактов генерации кода, многошаговых планов и вызовов инструментов. Эти схемы задают ожидания и обеспечивают проверку безопасности.
class SQLQuery(BaseModel):
"""Complex SQL generation with validation"""
table: str
columns: List[str]
where_conditions: Optional[Dict[str, Any]] = None
joins: Optional[List[Dict[str, str]]] = None
aggregations: Optional[Dict[str, str]] = None
order_by: Optional[List[str]] = None
@validator('columns')
def validate_columns(cls, v):
if not v:
raise ValueError("Must specify at least one column")
return v
class DataTransformation(BaseModel):
"""Schema for complex data pipeline operations"""
operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
source_data: str = Field(description="Reference to data source")
parameters: Dict[str, Any]
output_format: Literal["json", "csv", "dataframe"]
class APIRequest(BaseModel):
"""Multi-endpoint API orchestration"""
endpoints: List[Dict[str, str]] = Field(description="List of endpoints to call")
authentication: Dict[str, str]
request_order: Literal["sequential", "parallel", "conditional"]
error_handling: Literal["stop", "continue", "retry"]
max_retries: int = Field(default=3, ge=0, le=10)
class CodeGeneration(BaseModel):
"""Generate and validate code snippets"""
language: Literal["python", "javascript", "sql", "bash"]
purpose: str
code: str = Field(description="The generated code")
dependencies: List[str] = Field(default_factory=list)
test_cases: List[Dict[str, Any]] = Field(default_factory=list)
@validator('code')
def validate_code_safety(cls, v, values):
dangerous = ['eval(', 'exec(', '__import__', 'os.system']
if values.get('language') == 'python':
if any(d in v for d in dangerous):
raise ValueError("Code contains potentially dangerous operations")
return v
class MultiToolPlan(BaseModel):
"""Plan for multi-step tool execution"""
goal: str
steps: List[Dict[str, Any]] = Field(description="Ordered list of tool calls")
dependencies: Dict[str, List[str]] = Field(description="Step dependencies")
fallback_strategy: Optional[str] = None
estimated_duration: float = Field(description="Seconds")
class ToolCall(BaseModel):
"""Enhanced tool selection with context"""
reasoning: str
confidence: float = Field(ge=0.0, le=1.0)
tool_name: Literal["sql_engine", "data_transformer", "api_orchestrator",
"code_generator", "planner", "none"]
tool_input: Optional[Union[SQLQuery, DataTransformation, APIRequest,
CodeGeneration, MultiToolPlan]] = None
requires_human_approval: bool = False
class ExecutionResult(BaseModel):
"""Rich result with metadata"""
success: bool
data: Any
execution_time: float
warnings: List[str] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)Эти модели обеспечивают проверку (например, предотвращая пустые списки колонок или небезопасные конструкции в Python) и служат основой для логики маршрутизации и выполнения.
Реализация инструментов (моки)
Для оффлайн-тестирования реализуйте детерминированные инструменты: SQL-движок на mock-таблицах, трансформер данных, оркестратор API, генератор кода и планировщик. Каждый возвращает ExecutionResult с метаданными и предупреждениями.
def sql_engine_tool(params: SQLQuery) -> ExecutionResult:
import time
start = time.time()
mock_tables = {
"users": [
{"id": 1, "name": "Alice", "age": 30, "country": "USA"},
{"id": 2, "name": "Bob", "age": 25, "country": "UK"},
{"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
],
"orders": [
{"id": 1, "user_id": 1, "amount": 100, "status": "completed"},
{"id": 2, "user_id": 1, "amount": 200, "status": "pending"},
{"id": 3, "user_id": 2, "amount": 150, "status": "completed"},
]
}
data = mock_tables.get(params.table, [])
if params.where_conditions:
data = [row for row in data if all(
row.get(k) == v for k, v in params.where_conditions.items()
)]
data = [{col: row.get(col) for col in params.columns} for row in data]
warnings = []
if params.aggregations:
warnings.append("Aggregation simplified in mock mode")
return ExecutionResult(
success=True,
data=data,
execution_time=time.time() - start,
warnings=warnings,
metadata={"rows_affected": len(data), "query_type": "SELECT"}
)
def data_transformer_tool(params: DataTransformation) -> ExecutionResult:
import time
start = time.time()
operations = {
"filter": lambda d, p: [x for x in d if x.get(p['field']) == p['value']],
"aggregate": lambda d, p: {"count": len(d), "operation": p.get('function', 'count')},
"normalize": lambda d, p: [{k: v/p.get('factor', 1) for k, v in x.items()} for x in d]
}
mock_data = [{"value": i, "category": "A" if i % 2 else "B"} for i in range(10)]
op_func = operations.get(params.operation)
if op_func:
result_data = op_func(mock_data, params.parameters)
else:
result_data = mock_data
return ExecutionResult(
success=True,
data=result_data,
execution_time=time.time() - start,
warnings=[],
metadata={"operation": params.operation, "input_rows": len(mock_data)}
)
def api_orchestrator_tool(params: APIRequest) -> ExecutionResult:
import time
start = time.time()
results = []
warnings = []
for i, endpoint in enumerate(params.endpoints):
if params.error_handling == "retry" and i == 1:
warnings.append(f"Endpoint {endpoint.get('url')} failed, retrying...")
results.append({
"endpoint": endpoint.get('url'),
"status": 200,
"data": f"Mock response from {endpoint.get('url')}"
})
return ExecutionResult(
success=True,
data=results,
execution_time=time.time() - start,
warnings=warnings,
metadata={"endpoints_called": len(params.endpoints), "order": params.request_order}
)
def code_generator_tool(params: CodeGeneration) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.code) > 1000:
warnings.append("Generated code is quite long, consider refactoring")
if not params.test_cases:
warnings.append("No test cases provided for generated code")
return ExecutionResult(
success=True,
data={"code": params.code, "language": params.language, "dependencies": params.dependencies},
execution_time=time.time() - start,
warnings=warnings,
metadata={"lines_of_code": len(params.code.split('\n'))}
)
def planner_tool(params: MultiToolPlan) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.steps) > 10:
warnings.append("Plan has many steps, consider breaking into sub-plans")
for step_id, deps in params.dependencies.items():
if step_id in deps:
warnings.append(f"Circular dependency detected in step {step_id}")
return ExecutionResult(
success=True,
data={"plan": params.steps, "estimated_time": params.estimated_duration},
execution_time=time.time() - start,
warnings=warnings,
metadata={"total_steps": len(params.steps)}
)
TOOLS = {
"sql_engine": sql_engine_tool,
"data_transformer": data_transformer_tool,
"api_orchestrator": api_orchestrator_tool,
"code_generator": code_generator_tool,
"planner": planner_tool
}Эти моки имитируют реальные инструменты и возвращают структурированные результаты, которые агент использует для принятия решений и логирования.
Архитектура агента: маршрутизация, выполнение и восстановление
AdvancedToolAgent оборачивает локальный LLM-пайплайн (через Instructor) и ожидает структурированные ToolCall-ответы. Агент загружает модель, создает пайплайн и предоставляет методы для маршрутизации запросов к инструментам и их выполнения с повторными попытками и логированием истории.
class AdvancedToolAgent:
"""Agent with complex reasoning, error recovery, and multi-step planning"""
def __init__(self, model_name: str = "HuggingFaceH4/zephyr-7b-beta"):
import torch
print(f" Loading model: {model_name}")
model_kwargs = {"device_map": "auto"}
if torch.cuda.is_available():
print(" GPU detected - using 8-bit quantization")
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
model_kwargs["quantization_config"] = quantization_config
else:
print(" CPU mode - using smaller model for better performance")
model_name = "google/flan-t5-base"
model_kwargs["torch_dtype"] = "auto"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
**model_kwargs
)
self.pipe = pipeline(
"text-generation", model=self.model, tokenizer=self.tokenizer,
max_new_tokens=768, temperature=0.7, do_sample=True
)
self.client = instructor.from_pipe(self.pipe)
self.execution_history = []
print(" Agent initialized!")
def route_to_tool(self, user_query: str, context: Optional[str] = None) -> ToolCall:
tool_descriptions = """
Advanced Tools:
- sql_engine: Execute complex SQL queries with joins, aggregations, filtering
- data_transformer: Multi-step data pipelines (filter→aggregate→normalize)
- api_orchestrator: Call multiple APIs with dependencies, retries, error handling
- code_generator: Generate safe, validated code with tests in multiple languages
- planner: Create multi-step execution plans with dependency management
- none: Answer directly using reasoning
"""
prompt = f"""{tool_descriptions}
User query: {user_query}
{f'Context from previous steps: {context}' if context else ''}
Analyze the complexity and choose the appropriate tool. For multi-step tasks, use the planner."""
return self.client(prompt, response_model=ToolCall)
def execute_with_recovery(self, tool_call: ToolCall, max_retries: int = 2) -> ExecutionResult:
for attempt in range(max_retries + 1):
try:
if tool_call.tool_name == "none":
return ExecutionResult(
success=True, data="Direct response", execution_time=0.0,
warnings=[], metadata={}
)
tool_func = TOOLS.get(tool_call.tool_name)
if not tool_func:
return ExecutionResult(
success=False, data=None, execution_time=0.0,
warnings=[f"Tool {tool_call.tool_name} not found"], metadata={}
)
result = tool_func(tool_call.tool_input)
self.execution_history.append({
"tool": tool_call.tool_name,
"success": result.success,
"timestamp": datetime.now().isoformat()
})
return result
except Exception as e:
if attempt < max_retries:
print(f" Attempt {attempt + 1} failed, retrying...")
continue
return ExecutionResult(
success=False, data=None, execution_time=0.0,
warnings=[f"Failed after {max_retries + 1} attempts: {str(e)}"],
metadata={"error": str(e)}
)Агент регистрирует историю выполнения и справляется с отсутствием инструментов, исключениями и простыми ответами без вызова инструментов.
Запуск запросов и демонстрация
Метод run объединяет анализ, маршрутизацию, выполнение и отчётность. Демонстрация main() запускает несколько тяжёлых запросов, чтобы показать работу маршрутизации между SQL, трансформацией данных, оркестрацией API, генерацией кода и планированием.
def run(self, user_query: str, verbose: bool = True) -> Dict[str, Any]:
if verbose:
print(f"\n{'='*70}")
print(f" Complex Query: {user_query}")
print(f"{'='*70}")
if verbose:
print("\n Step 1: Analyzing query complexity & routing...")
tool_call = self.route_to_tool(user_query)
if verbose:
print(f" → Tool: {tool_call.tool_name}")
print(f" → Confidence: {tool_call.confidence:.2%}")
print(f" → Reasoning: {tool_call.reasoning}")
if tool_call.requires_human_approval:
print(f" Requires human approval!")
if verbose:
print("\n Step 2: Executing tool with error recovery...")
result = self.execute_with_recovery(tool_call)
if verbose:
print(f" → Success: {result.success}")
print(f" → Execution time: {result.execution_time:.3f}s")
if result.warnings:
print(f" → Warnings: {', '.join(result.warnings)}")
print(f" → Data preview: {str(result.data)[:200]}...")
if verbose and result.metadata:
print(f"\n Metadata:")
for key, value in result.metadata.items():
print(f" • {key}: {value}")
if verbose:
print(f"\n{'='*70}\n")
return {
"query": user_query,
"tool_used": tool_call.tool_name,
"result": result,
"history_length": len(self.execution_history)
}
def main():
agent = AdvancedToolAgent()
hard_queries = [
"Generate a SQL query to find all users from USA who have completed orders worth more than $150, and join with their order details",
"Create a data pipeline that filters records where category='A', then aggregates by count, and normalizes the results by a factor of 100",
"I need to call 3 APIs sequentially: first authenticate at /auth, then fetch user data at /users/{id}, and finally update preferences at /preferences. If any step fails, retry up to 3 times",
"Write a Python function that validates email addresses using regex, includes error handling, and has at least 2 test cases. Make sure it doesn't use any dangerous operations",
"Create a multi-step plan to: 1) Extract data from a database, 2) Transform it using pandas, 3) Generate a report, 4) Send via email. Show dependencies between steps"
]
print("\n" + " HARD MODE: COMPLEX QUERIES ".center(70, "=") + "\n")
for i, query in enumerate(hard_queries, 1):
print(f"\n{'#'*70}")
print(f"# CHALLENGE {i}/{len(hard_queries)}")
print(f"{'#'*70}")
try:
agent.run(query, verbose=True)
except Exception as e:
print(f" Critical error: {e}\n")
print("\n" + f" COMPLETED {len(agent.execution_history)} TOOL EXECUTIONS ".center(70, "=") + "\n")
print(f" Success rate: {sum(1 for h in agent.execution_history if h['success']) / len(agent.execution_history) * 100:.1f}%")
if __name__ == "__main__":
main()Итоговые выводы по подходу
- Чёткие контракты для инструментов через Pydantic.
- Детеминированные локальные инструменты для воспроизводимых тестов.
- Модельный маршрутизатор (Instructor) возвращает структурированные вызовы.
- Выполнение с ретраями, предупреждениями, метаданными и историей для наблюдаемости.
Шаблон легко масштабируется: замените моки реальными сервисами, расширьте схемы или добавьте распределённый планировщик, сохранив при этом возможность оффлайн-инференса.
Switch Language
Read this article in English