Offline Multi-Tool Reasoning Agent: Dynamic Planning, Error Recovery & Intelligent Tool Routing
'A practical tutorial on building a fully offline multi-tool reasoning agent using Instructor, Transformers and Pydantic. Includes code for tool mocks, schemas, routing and recovery.'
Building a Fully Offline Multi-Tool Reasoning Agent
This guide shows how to assemble a compact, fully offline agent that selects tools, validates inputs, plans multi-step workflows, and recovers from errors. The system relies on Instructor, Transformers, and Pydantic schemas to structure reasoning and orchestrate tool calls.
Environment setup and dependencies
Prepare the runtime by installing required libraries and creating a generation pipeline that works offline. The snippet below installs packages and attempts to import Instructor.
import subprocess
import sys
def install_dependencies():
import torch
packages = [
"instructor",
"transformers>=4.35.0",
"torch",
"accelerate",
"pydantic>=2.0.0",
"numpy",
"pandas"
]
if torch.cuda.is_available():
packages.append("bitsandbytes")
print(" GPU detected - installing quantization support")
else:
print(" No GPU detected - will use CPU (slower but works)")
for package in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
try:
import instructor
except ImportError:
print(" Installing dependencies...")
install_dependencies()
print(" Installation complete!")Set up the remaining imports and basic utilities you will reuse across the agent.
from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import instructor
import json
from datetime import datetime
import rePydantic schemas: structure and validation
Define strong typed schemas to describe SQL queries, data transformations, API requests, code generation artifacts, multi-step plans and tool calls. These schemas give the agent precise expectations and safety checks.
class SQLQuery(BaseModel):
"""Complex SQL generation with validation"""
table: str
columns: List[str]
where_conditions: Optional[Dict[str, Any]] = None
joins: Optional[List[Dict[str, str]]] = None
aggregations: Optional[Dict[str, str]] = None
order_by: Optional[List[str]] = None
@validator('columns')
def validate_columns(cls, v):
if not v:
raise ValueError("Must specify at least one column")
return v
class DataTransformation(BaseModel):
"""Schema for complex data pipeline operations"""
operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
source_data: str = Field(description="Reference to data source")
parameters: Dict[str, Any]
output_format: Literal["json", "csv", "dataframe"]
class APIRequest(BaseModel):
"""Multi-endpoint API orchestration"""
endpoints: List[Dict[str, str]] = Field(description="List of endpoints to call")
authentication: Dict[str, str]
request_order: Literal["sequential", "parallel", "conditional"]
error_handling: Literal["stop", "continue", "retry"]
max_retries: int = Field(default=3, ge=0, le=10)
class CodeGeneration(BaseModel):
"""Generate and validate code snippets"""
language: Literal["python", "javascript", "sql", "bash"]
purpose: str
code: str = Field(description="The generated code")
dependencies: List[str] = Field(default_factory=list)
test_cases: List[Dict[str, Any]] = Field(default_factory=list)
@validator('code')
def validate_code_safety(cls, v, values):
dangerous = ['eval(', 'exec(', '__import__', 'os.system']
if values.get('language') == 'python':
if any(d in v for d in dangerous):
raise ValueError("Code contains potentially dangerous operations")
return v
class MultiToolPlan(BaseModel):
"""Plan for multi-step tool execution"""
goal: str
steps: List[Dict[str, Any]] = Field(description="Ordered list of tool calls")
dependencies: Dict[str, List[str]] = Field(description="Step dependencies")
fallback_strategy: Optional[str] = None
estimated_duration: float = Field(description="Seconds")
class ToolCall(BaseModel):
"""Enhanced tool selection with context"""
reasoning: str
confidence: float = Field(ge=0.0, le=1.0)
tool_name: Literal["sql_engine", "data_transformer", "api_orchestrator",
"code_generator", "planner", "none"]
tool_input: Optional[Union[SQLQuery, DataTransformation, APIRequest,
CodeGeneration, MultiToolPlan]] = None
requires_human_approval: bool = False
class ExecutionResult(BaseModel):
"""Rich result with metadata"""
success: bool
data: Any
execution_time: float
warnings: List[str] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)These models provide validation (for example, preventing empty column lists or preventing unsafe Python constructs) and form the backbone of the agent's reasoning and routing logic.
Implementing tools (mocked execution)
To test routing and planning offline, implement deterministic mock tools: a SQL engine on mock tables, a data transformer, an API orchestrator, a code generator, and a planner. Each returns an ExecutionResult with metadata and warnings when applicable.
def sql_engine_tool(params: SQLQuery) -> ExecutionResult:
import time
start = time.time()
mock_tables = {
"users": [
{"id": 1, "name": "Alice", "age": 30, "country": "USA"},
{"id": 2, "name": "Bob", "age": 25, "country": "UK"},
{"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
],
"orders": [
{"id": 1, "user_id": 1, "amount": 100, "status": "completed"},
{"id": 2, "user_id": 1, "amount": 200, "status": "pending"},
{"id": 3, "user_id": 2, "amount": 150, "status": "completed"},
]
}
data = mock_tables.get(params.table, [])
if params.where_conditions:
data = [row for row in data if all(
row.get(k) == v for k, v in params.where_conditions.items()
)]
data = [{col: row.get(col) for col in params.columns} for row in data]
warnings = []
if params.aggregations:
warnings.append("Aggregation simplified in mock mode")
return ExecutionResult(
success=True,
data=data,
execution_time=time.time() - start,
warnings=warnings,
metadata={"rows_affected": len(data), "query_type": "SELECT"}
)
def data_transformer_tool(params: DataTransformation) -> ExecutionResult:
import time
start = time.time()
operations = {
"filter": lambda d, p: [x for x in d if x.get(p['field']) == p['value']],
"aggregate": lambda d, p: {"count": len(d), "operation": p.get('function', 'count')},
"normalize": lambda d, p: [{k: v/p.get('factor', 1) for k, v in x.items()} for x in d]
}
mock_data = [{"value": i, "category": "A" if i % 2 else "B"} for i in range(10)]
op_func = operations.get(params.operation)
if op_func:
result_data = op_func(mock_data, params.parameters)
else:
result_data = mock_data
return ExecutionResult(
success=True,
data=result_data,
execution_time=time.time() - start,
warnings=[],
metadata={"operation": params.operation, "input_rows": len(mock_data)}
)
def api_orchestrator_tool(params: APIRequest) -> ExecutionResult:
import time
start = time.time()
results = []
warnings = []
for i, endpoint in enumerate(params.endpoints):
if params.error_handling == "retry" and i == 1:
warnings.append(f"Endpoint {endpoint.get('url')} failed, retrying...")
results.append({
"endpoint": endpoint.get('url'),
"status": 200,
"data": f"Mock response from {endpoint.get('url')}"
})
return ExecutionResult(
success=True,
data=results,
execution_time=time.time() - start,
warnings=warnings,
metadata={"endpoints_called": len(params.endpoints), "order": params.request_order}
)
def code_generator_tool(params: CodeGeneration) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.code) > 1000:
warnings.append("Generated code is quite long, consider refactoring")
if not params.test_cases:
warnings.append("No test cases provided for generated code")
return ExecutionResult(
success=True,
data={"code": params.code, "language": params.language, "dependencies": params.dependencies},
execution_time=time.time() - start,
warnings=warnings,
metadata={"lines_of_code": len(params.code.split('\n'))}
)
def planner_tool(params: MultiToolPlan) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.steps) > 10:
warnings.append("Plan has many steps, consider breaking into sub-plans")
for step_id, deps in params.dependencies.items():
if step_id in deps:
warnings.append(f"Circular dependency detected in step {step_id}")
return ExecutionResult(
success=True,
data={"plan": params.steps, "estimated_time": params.estimated_duration},
execution_time=time.time() - start,
warnings=warnings,
metadata={"total_steps": len(params.steps)}
)
TOOLS = {
"sql_engine": sql_engine_tool,
"data_transformer": data_transformer_tool,
"api_orchestrator": api_orchestrator_tool,
"code_generator": code_generator_tool,
"planner": planner_tool
}These implementations emulate real tools and return structured results that the agent can inspect, log, and use for recovery decisions.
Agent architecture: routing, execution, and recovery
The AdvancedToolAgent wraps a local LLM pipeline (via Instructor) to produce structured ToolCall outputs (the response_model = ToolCall). The agent loads a model, creates a pipeline, and exposes methods to route queries to tools and execute them with retries and history logging.
class AdvancedToolAgent:
"""Agent with complex reasoning, error recovery, and multi-step planning"""
def __init__(self, model_name: str = "HuggingFaceH4/zephyr-7b-beta"):
import torch
print(f" Loading model: {model_name}")
model_kwargs = {"device_map": "auto"}
if torch.cuda.is_available():
print(" GPU detected - using 8-bit quantization")
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
model_kwargs["quantization_config"] = quantization_config
else:
print(" CPU mode - using smaller model for better performance")
model_name = "google/flan-t5-base"
model_kwargs["torch_dtype"] = "auto"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
**model_kwargs
)
self.pipe = pipeline(
"text-generation", model=self.model, tokenizer=self.tokenizer,
max_new_tokens=768, temperature=0.7, do_sample=True
)
self.client = instructor.from_pipe(self.pipe)
self.execution_history = []
print(" Agent initialized!")
def route_to_tool(self, user_query: str, context: Optional[str] = None) -> ToolCall:
tool_descriptions = """
Advanced Tools:
- sql_engine: Execute complex SQL queries with joins, aggregations, filtering
- data_transformer: Multi-step data pipelines (filter→aggregate→normalize)
- api_orchestrator: Call multiple APIs with dependencies, retries, error handling
- code_generator: Generate safe, validated code with tests in multiple languages
- planner: Create multi-step execution plans with dependency management
- none: Answer directly using reasoning
"""
prompt = f"""{tool_descriptions}
User query: {user_query}
{f'Context from previous steps: {context}' if context else ''}
Analyze the complexity and choose the appropriate tool. For multi-step tasks, use the planner."""
return self.client(prompt, response_model=ToolCall)
def execute_with_recovery(self, tool_call: ToolCall, max_retries: int = 2) -> ExecutionResult:
for attempt in range(max_retries + 1):
try:
if tool_call.tool_name == "none":
return ExecutionResult(
success=True, data="Direct response", execution_time=0.0,
warnings=[], metadata={}
)
tool_func = TOOLS.get(tool_call.tool_name)
if not tool_func:
return ExecutionResult(
success=False, data=None, execution_time=0.0,
warnings=[f"Tool {tool_call.tool_name} not found"], metadata={}
)
result = tool_func(tool_call.tool_input)
self.execution_history.append({
"tool": tool_call.tool_name,
"success": result.success,
"timestamp": datetime.now().isoformat()
})
return result
except Exception as e:
if attempt < max_retries:
print(f" Attempt {attempt + 1} failed, retrying...")
continue
return ExecutionResult(
success=False, data=None, execution_time=0.0,
warnings=[f"Failed after {max_retries + 1} attempts: {str(e)}"],
metadata={"error": str(e)}
)The agent logs execution history and handles missing tools, exceptions, and conditional behavior like returning direct answers for simple queries.
Running queries and a demo harness
A run method ties analysis, routing, execution and metadata reporting together. The demo main() triggers several representative, hard queries to show routing across SQL, transformation, API orchestration, code generation and planning.
def run(self, user_query: str, verbose: bool = True) -> Dict[str, Any]:
if verbose:
print(f"\n{'='*70}")
print(f" Complex Query: {user_query}")
print(f"{'='*70}")
if verbose:
print("\n Step 1: Analyzing query complexity & routing...")
tool_call = self.route_to_tool(user_query)
if verbose:
print(f" → Tool: {tool_call.tool_name}")
print(f" → Confidence: {tool_call.confidence:.2%}")
print(f" → Reasoning: {tool_call.reasoning}")
if tool_call.requires_human_approval:
print(f" Requires human approval!")
if verbose:
print("\n Step 2: Executing tool with error recovery...")
result = self.execute_with_recovery(tool_call)
if verbose:
print(f" → Success: {result.success}")
print(f" → Execution time: {result.execution_time:.3f}s")
if result.warnings:
print(f" → Warnings: {', '.join(result.warnings)}")
print(f" → Data preview: {str(result.data)[:200]}...")
if verbose and result.metadata:
print(f"\n Metadata:")
for key, value in result.metadata.items():
print(f" • {key}: {value}")
if verbose:
print(f"\n{'='*70}\n")
return {
"query": user_query,
"tool_used": tool_call.tool_name,
"result": result,
"history_length": len(self.execution_history)
}
def main():
agent = AdvancedToolAgent()
hard_queries = [
"Generate a SQL query to find all users from USA who have completed orders worth more than $150, and join with their order details",
"Create a data pipeline that filters records where category='A', then aggregates by count, and normalizes the results by a factor of 100",
"I need to call 3 APIs sequentially: first authenticate at /auth, then fetch user data at /users/{id}, and finally update preferences at /preferences. If any step fails, retry up to 3 times",
"Write a Python function that validates email addresses using regex, includes error handling, and has at least 2 test cases. Make sure it doesn't use any dangerous operations",
"Create a multi-step plan to: 1) Extract data from a database, 2) Transform it using pandas, 3) Generate a report, 4) Send via email. Show dependencies between steps"
]
print("\n" + " HARD MODE: COMPLEX QUERIES ".center(70, "=") + "\n")
for i, query in enumerate(hard_queries, 1):
print(f"\n{'#'*70}")
print(f"# CHALLENGE {i}/{len(hard_queries)}")
print(f"{'#'*70}")
try:
agent.run(query, verbose=True)
except Exception as e:
print(f" Critical error: {e}\n")
print("\n" + f" COMPLETED {len(agent.execution_history)} TOOL EXECUTIONS ".center(70, "=") + "\n")
print(f" Success rate: {sum(1 for h in agent.execution_history if h['success']) / len(agent.execution_history) * 100:.1f}%")
if __name__ == "__main__":
main()What this design delivers
- Clear, typed contracts for every tool via Pydantic models.
- Deterministic local tools for reproducible testing.
- An LLM-driven router (Instructor pipeline) that returns structured ToolCall objects.
- Execution with retries, warnings, metadata and history for observability.
This pattern scales: swap mock tools for real backends, adjust schemas, or extend the planner to orchestrate distributed workflows while staying fully offline for inference.
Сменить язык
Читать эту статью на русском