<RETURN_TO_BASE

Offline Multi-Tool Reasoning Agent: Dynamic Planning, Error Recovery & Intelligent Tool Routing

'A practical tutorial on building a fully offline multi-tool reasoning agent using Instructor, Transformers and Pydantic. Includes code for tool mocks, schemas, routing and recovery.'

Building a Fully Offline Multi-Tool Reasoning Agent

This guide shows how to assemble a compact, fully offline agent that selects tools, validates inputs, plans multi-step workflows, and recovers from errors. The system relies on Instructor, Transformers, and Pydantic schemas to structure reasoning and orchestrate tool calls.

Environment setup and dependencies

Prepare the runtime by installing required libraries and creating a generation pipeline that works offline. The snippet below installs packages and attempts to import Instructor.

import subprocess
import sys
 
 
def install_dependencies():
   import torch
   packages = [
       "instructor",
       "transformers>=4.35.0",
       "torch",
       "accelerate",
       "pydantic>=2.0.0",
       "numpy",
       "pandas"
   ]
   if torch.cuda.is_available():
       packages.append("bitsandbytes")
       print(" GPU detected - installing quantization support")
   else:
       print("  No GPU detected - will use CPU (slower but works)")
   for package in packages:
       subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
 
 
try:
   import instructor
except ImportError:
   print(" Installing dependencies...")
   install_dependencies()
   print(" Installation complete!")

Set up the remaining imports and basic utilities you will reuse across the agent.

from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import instructor
import json
from datetime import datetime
import re

Pydantic schemas: structure and validation

Define strong typed schemas to describe SQL queries, data transformations, API requests, code generation artifacts, multi-step plans and tool calls. These schemas give the agent precise expectations and safety checks.

class SQLQuery(BaseModel):
   """Complex SQL generation with validation"""
   table: str
   columns: List[str]
   where_conditions: Optional[Dict[str, Any]] = None
   joins: Optional[List[Dict[str, str]]] = None
   aggregations: Optional[Dict[str, str]] = None
   order_by: Optional[List[str]] = None
  
   @validator('columns')
   def validate_columns(cls, v):
       if not v:
           raise ValueError("Must specify at least one column")
       return v
 
 
class DataTransformation(BaseModel):
   """Schema for complex data pipeline operations"""
   operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
   source_data: str = Field(description="Reference to data source")
   parameters: Dict[str, Any]
   output_format: Literal["json", "csv", "dataframe"]
  
class APIRequest(BaseModel):
   """Multi-endpoint API orchestration"""
   endpoints: List[Dict[str, str]] = Field(description="List of endpoints to call")
   authentication: Dict[str, str]
   request_order: Literal["sequential", "parallel", "conditional"]
   error_handling: Literal["stop", "continue", "retry"]
   max_retries: int = Field(default=3, ge=0, le=10)
 
 
class CodeGeneration(BaseModel):
   """Generate and validate code snippets"""
   language: Literal["python", "javascript", "sql", "bash"]
   purpose: str
   code: str = Field(description="The generated code")
   dependencies: List[str] = Field(default_factory=list)
   test_cases: List[Dict[str, Any]] = Field(default_factory=list)
  
   @validator('code')
   def validate_code_safety(cls, v, values):
       dangerous = ['eval(', 'exec(', '__import__', 'os.system']
       if values.get('language') == 'python':
           if any(d in v for d in dangerous):
               raise ValueError("Code contains potentially dangerous operations")
       return v
 
 
class MultiToolPlan(BaseModel):
   """Plan for multi-step tool execution"""
   goal: str
   steps: List[Dict[str, Any]] = Field(description="Ordered list of tool calls")
   dependencies: Dict[str, List[str]] = Field(description="Step dependencies")
   fallback_strategy: Optional[str] = None
   estimated_duration: float = Field(description="Seconds")
 
 
class ToolCall(BaseModel):
   """Enhanced tool selection with context"""
   reasoning: str
   confidence: float = Field(ge=0.0, le=1.0)
   tool_name: Literal["sql_engine", "data_transformer", "api_orchestrator",
                      "code_generator", "planner", "none"]
   tool_input: Optional[Union[SQLQuery, DataTransformation, APIRequest,
                              CodeGeneration, MultiToolPlan]] = None
   requires_human_approval: bool = False
 
 
class ExecutionResult(BaseModel):
   """Rich result with metadata"""
   success: bool
   data: Any
   execution_time: float
   warnings: List[str] = Field(default_factory=list)
   metadata: Dict[str, Any] = Field(default_factory=dict)

These models provide validation (for example, preventing empty column lists or preventing unsafe Python constructs) and form the backbone of the agent's reasoning and routing logic.

Implementing tools (mocked execution)

To test routing and planning offline, implement deterministic mock tools: a SQL engine on mock tables, a data transformer, an API orchestrator, a code generator, and a planner. Each returns an ExecutionResult with metadata and warnings when applicable.

def sql_engine_tool(params: SQLQuery) -> ExecutionResult:
   import time
   start = time.time()
   mock_tables = {
       "users": [
           {"id": 1, "name": "Alice", "age": 30, "country": "USA"},
           {"id": 2, "name": "Bob", "age": 25, "country": "UK"},
           {"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
       ],
       "orders": [
           {"id": 1, "user_id": 1, "amount": 100, "status": "completed"},
           {"id": 2, "user_id": 1, "amount": 200, "status": "pending"},
           {"id": 3, "user_id": 2, "amount": 150, "status": "completed"},
       ]
   }
   data = mock_tables.get(params.table, [])
   if params.where_conditions:
       data = [row for row in data if all(
           row.get(k) == v for k, v in params.where_conditions.items()
       )]
   data = [{col: row.get(col) for col in params.columns} for row in data]
   warnings = []
   if params.aggregations:
       warnings.append("Aggregation simplified in mock mode")
   return ExecutionResult(
       success=True,
       data=data,
       execution_time=time.time() - start,
       warnings=warnings,
       metadata={"rows_affected": len(data), "query_type": "SELECT"}
   )
 
 
 
def data_transformer_tool(params: DataTransformation) -> ExecutionResult:
   import time
   start = time.time()
   operations = {
       "filter": lambda d, p: [x for x in d if x.get(p['field']) == p['value']],
       "aggregate": lambda d, p: {"count": len(d), "operation": p.get('function', 'count')},
       "normalize": lambda d, p: [{k: v/p.get('factor', 1) for k, v in x.items()} for x in d]
   }
   mock_data = [{"value": i, "category": "A" if i % 2 else "B"} for i in range(10)]
   op_func = operations.get(params.operation)
   if op_func:
       result_data = op_func(mock_data, params.parameters)
   else:
       result_data = mock_data
   return ExecutionResult(
       success=True,
       data=result_data,
       execution_time=time.time() - start,
       warnings=[],
       metadata={"operation": params.operation, "input_rows": len(mock_data)}
   )
 
 
 
def api_orchestrator_tool(params: APIRequest) -> ExecutionResult:
   import time
   start = time.time()
   results = []
   warnings = []
   for i, endpoint in enumerate(params.endpoints):
       if params.error_handling == "retry" and i == 1:
           warnings.append(f"Endpoint {endpoint.get('url')} failed, retrying...")
       results.append({
           "endpoint": endpoint.get('url'),
           "status": 200,
           "data": f"Mock response from {endpoint.get('url')}"
       })
   return ExecutionResult(
       success=True,
       data=results,
       execution_time=time.time() - start,
       warnings=warnings,
       metadata={"endpoints_called": len(params.endpoints), "order": params.request_order}
   )
 
 
 
def code_generator_tool(params: CodeGeneration) -> ExecutionResult:
   import time
   start = time.time()
   warnings = []
   if len(params.code) > 1000:
       warnings.append("Generated code is quite long, consider refactoring")
   if not params.test_cases:
       warnings.append("No test cases provided for generated code")
   return ExecutionResult(
       success=True,
       data={"code": params.code, "language": params.language, "dependencies": params.dependencies},
       execution_time=time.time() - start,
       warnings=warnings,
       metadata={"lines_of_code": len(params.code.split('\n'))}
   )
 
 
 
def planner_tool(params: MultiToolPlan) -> ExecutionResult:
   import time
   start = time.time()
   warnings = []
   if len(params.steps) > 10:
       warnings.append("Plan has many steps, consider breaking into sub-plans")
   for step_id, deps in params.dependencies.items():
       if step_id in deps:
           warnings.append(f"Circular dependency detected in step {step_id}")
   return ExecutionResult(
       success=True,
       data={"plan": params.steps, "estimated_time": params.estimated_duration},
       execution_time=time.time() - start,
       warnings=warnings,
       metadata={"total_steps": len(params.steps)}
   )
 
 
TOOLS = {
   "sql_engine": sql_engine_tool,
   "data_transformer": data_transformer_tool,
   "api_orchestrator": api_orchestrator_tool,
   "code_generator": code_generator_tool,
   "planner": planner_tool
}

These implementations emulate real tools and return structured results that the agent can inspect, log, and use for recovery decisions.

Agent architecture: routing, execution, and recovery

The AdvancedToolAgent wraps a local LLM pipeline (via Instructor) to produce structured ToolCall outputs (the response_model = ToolCall). The agent loads a model, creates a pipeline, and exposes methods to route queries to tools and execute them with retries and history logging.

class AdvancedToolAgent:
   """Agent with complex reasoning, error recovery, and multi-step planning"""
  
   def __init__(self, model_name: str = "HuggingFaceH4/zephyr-7b-beta"):
       import torch
       print(f" Loading model: {model_name}")
       model_kwargs = {"device_map": "auto"}
       if torch.cuda.is_available():
           print(" GPU detected - using 8-bit quantization")
           from transformers import BitsAndBytesConfig
           quantization_config = BitsAndBytesConfig(
               load_in_8bit=True,
               llm_int8_threshold=6.0
           )
           model_kwargs["quantization_config"] = quantization_config
       else:
           print(" CPU mode - using smaller model for better performance")
           model_name = "google/flan-t5-base"
           model_kwargs["torch_dtype"] = "auto"
       self.tokenizer = AutoTokenizer.from_pretrained(model_name)
       self.model = AutoModelForCausalLM.from_pretrained(
           model_name,
           **model_kwargs
       )
       self.pipe = pipeline(
           "text-generation", model=self.model, tokenizer=self.tokenizer,
           max_new_tokens=768, temperature=0.7, do_sample=True
       )
       self.client = instructor.from_pipe(self.pipe)
       self.execution_history = []
       print(" Agent initialized!")
  
   def route_to_tool(self, user_query: str, context: Optional[str] = None) -> ToolCall:
       tool_descriptions = """
Advanced Tools:
- sql_engine: Execute complex SQL queries with joins, aggregations, filtering
- data_transformer: Multi-step data pipelines (filter→aggregate→normalize)
- api_orchestrator: Call multiple APIs with dependencies, retries, error handling
- code_generator: Generate safe, validated code with tests in multiple languages
- planner: Create multi-step execution plans with dependency management
- none: Answer directly using reasoning
"""
       prompt = f"""{tool_descriptions}
 
 
User query: {user_query}
{f'Context from previous steps: {context}' if context else ''}
 
 
Analyze the complexity and choose the appropriate tool. For multi-step tasks, use the planner."""
       return self.client(prompt, response_model=ToolCall)
  
   def execute_with_recovery(self, tool_call: ToolCall, max_retries: int = 2) -> ExecutionResult:
       for attempt in range(max_retries + 1):
           try:
               if tool_call.tool_name == "none":
                   return ExecutionResult(
                       success=True, data="Direct response", execution_time=0.0,
                       warnings=[], metadata={}
                   )
               tool_func = TOOLS.get(tool_call.tool_name)
               if not tool_func:
                   return ExecutionResult(
                       success=False, data=None, execution_time=0.0,
                       warnings=[f"Tool {tool_call.tool_name} not found"], metadata={}
                   )
               result = tool_func(tool_call.tool_input)
               self.execution_history.append({
                   "tool": tool_call.tool_name,
                   "success": result.success,
                   "timestamp": datetime.now().isoformat()
               })
               return result
           except Exception as e:
               if attempt < max_retries:
                   print(f"     Attempt {attempt + 1} failed, retrying...")
                   continue
               return ExecutionResult(
                   success=False, data=None, execution_time=0.0,
                   warnings=[f"Failed after {max_retries + 1} attempts: {str(e)}"],
                   metadata={"error": str(e)}
               )

The agent logs execution history and handles missing tools, exceptions, and conditional behavior like returning direct answers for simple queries.

Running queries and a demo harness

A run method ties analysis, routing, execution and metadata reporting together. The demo main() triggers several representative, hard queries to show routing across SQL, transformation, API orchestration, code generation and planning.

  def run(self, user_query: str, verbose: bool = True) -> Dict[str, Any]:
       if verbose:
           print(f"\n{'='*70}")
           print(f" Complex Query: {user_query}")
           print(f"{'='*70}")
       if verbose:
           print("\n Step 1: Analyzing query complexity & routing...")
       tool_call = self.route_to_tool(user_query)
       if verbose:
           print(f"   → Tool: {tool_call.tool_name}")
           print(f"   → Confidence: {tool_call.confidence:.2%}")
           print(f"   → Reasoning: {tool_call.reasoning}")
           if tool_call.requires_human_approval:
               print(f"     Requires human approval!")
       if verbose:
           print("\n  Step 2: Executing tool with error recovery...")
       result = self.execute_with_recovery(tool_call)
       if verbose:
           print(f"   → Success: {result.success}")
           print(f"   → Execution time: {result.execution_time:.3f}s")
           if result.warnings:
               print(f"   → Warnings: {', '.join(result.warnings)}")
           print(f"   → Data preview: {str(result.data)[:200]}...")
       if verbose and result.metadata:
           print(f"\n Metadata:")
           for key, value in result.metadata.items():
               print(f"   • {key}: {value}")
       if verbose:
           print(f"\n{'='*70}\n")
       return {
           "query": user_query,
           "tool_used": tool_call.tool_name,
           "result": result,
           "history_length": len(self.execution_history)
       }
 
 
 
def main():
   agent = AdvancedToolAgent()
   hard_queries = [
       "Generate a SQL query to find all users from USA who have completed orders worth more than $150, and join with their order details",
       "Create a data pipeline that filters records where category='A', then aggregates by count, and normalizes the results by a factor of 100",
       "I need to call 3 APIs sequentially: first authenticate at /auth, then fetch user data at /users/{id}, and finally update preferences at /preferences. If any step fails, retry up to 3 times",
       "Write a Python function that validates email addresses using regex, includes error handling, and has at least 2 test cases. Make sure it doesn't use any dangerous operations",
       "Create a multi-step plan to: 1) Extract data from a database, 2) Transform it using pandas, 3) Generate a report, 4) Send via email. Show dependencies between steps"
   ]
   print("\n" + " HARD MODE: COMPLEX QUERIES ".center(70, "=") + "\n")
   for i, query in enumerate(hard_queries, 1):
       print(f"\n{'#'*70}")
       print(f"# CHALLENGE {i}/{len(hard_queries)}")
       print(f"{'#'*70}")
       try:
           agent.run(query, verbose=True)
       except Exception as e:
           print(f" Critical error: {e}\n")
   print("\n" + f" COMPLETED {len(agent.execution_history)} TOOL EXECUTIONS ".center(70, "=") + "\n")
   print(f" Success rate: {sum(1 for h in agent.execution_history if h['success']) / len(agent.execution_history) * 100:.1f}%")
 
 
if __name__ == "__main__":
   main()

What this design delivers

  • Clear, typed contracts for every tool via Pydantic models.
  • Deterministic local tools for reproducible testing.
  • An LLM-driven router (Instructor pipeline) that returns structured ToolCall objects.
  • Execution with retries, warnings, metadata and history for observability.

This pattern scales: swap mock tools for real backends, adjust schemas, or extend the planner to orchestrate distributed workflows while staying fully offline for inference.

🇷🇺

Сменить язык

Читать эту статью на русском

Переключить на Русский