""" Autonomous AI Agent with MCP Tool Calling using Local Transformers This agent uses Hugging Face Transformers library to run models locally, avoiding inference API delays and availability issues. Uses Qwen3-0.6B for fast, local inference with tool calling support. """ import os import json import uuid import logging import asyncio import re from typing import List, Dict, Any, AsyncGenerator, Optional from mcp.tools.definitions import MCP_TOOLS, list_all_tools from mcp.registry import MCPRegistry logger = logging.getLogger(__name__) # Default model - small but capable DEFAULT_MODEL = "Qwen/Qwen3-0.6B" class AutonomousMCPAgentTransformers: """ AI Agent that autonomously uses MCP servers as tools using local Transformers. Runs models locally for fast, reliable inference without API dependencies. """ def __init__( self, mcp_registry: MCPRegistry, model_name: str = None, device: str = None ): """ Initialize the autonomous agent with local Transformers Args: mcp_registry: MCP registry with all servers model_name: Model to use (default: Qwen/Qwen3-0.6B) device: Device to run on ('cuda', 'cpu', or 'auto') """ self.mcp_registry = mcp_registry self.model_name = model_name or os.getenv("TRANSFORMERS_MODEL", DEFAULT_MODEL) self.device = device or os.getenv("TRANSFORMERS_DEVICE", "auto") # Lazy load model and tokenizer self.pipeline = None self.tokenizer = None self.model = None self._initialized = False # Create tool definitions for the prompt self.tools_description = self._create_tools_description() logger.info(f"Autonomous MCP Agent (Transformers) initialized") logger.info(f" Model: {self.model_name}") logger.info(f" Device: {self.device}") logger.info(f" Available tools: {len(MCP_TOOLS)}") def _initialize_model(self): """Lazy initialization of the model""" if self._initialized: return logger.info(f"Loading model {self.model_name}...") try: from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM import torch # Determine device if self.device == "auto": device = "cuda" if torch.cuda.is_available() else "cpu" else: device = self.device logger.info(f"Using device: {device}") # Load tokenizer self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True ) # Load model with appropriate settings model_kwargs = { "trust_remote_code": True, } if device == "cuda": model_kwargs["torch_dtype"] = torch.float16 model_kwargs["device_map"] = "auto" else: model_kwargs["torch_dtype"] = torch.float32 self.model = AutoModelForCausalLM.from_pretrained( self.model_name, **model_kwargs ) if device == "cpu": self.model = self.model.to(device) # Create pipeline for easier generation self.pipeline = pipeline( "text-generation", model=self.model, tokenizer=self.tokenizer, device=None if device == "cuda" else device, # device_map handles cuda ) self._initialized = True logger.info(f"Model {self.model_name} loaded successfully") except ImportError as e: raise ImportError( f"transformers package not installed or missing dependencies!\n" f"Install with: pip install transformers torch\n" f"Error: {e}" ) except Exception as e: logger.error(f"Failed to load model: {e}") raise def _create_tools_description(self) -> str: """Create a description of available tools for the prompt""" tools_text = "Available tools:\n\n" for tool in MCP_TOOLS: tools_text += f"- **{tool['name']}**: {tool['description']}\n" if tool.get('input_schema', {}).get('properties'): tools_text += " Parameters:\n" for param, details in tool['input_schema']['properties'].items(): required = param in tool['input_schema'].get('required', []) req_str = " (required)" if required else " (optional)" tools_text += f" - {param}{req_str}: {details.get('description', '')}\n" tools_text += "\n" return tools_text def _build_system_prompt(self) -> str: """Build the system prompt with tool instructions""" return f"""You are an autonomous AI agent for B2B sales automation. You have access to MCP (Model Context Protocol) tools that let you: - Search the web for company information and news - Save prospects, companies, contacts, and facts to a database - Send emails and manage email threads - Schedule meetings and generate calendar invites {self.tools_description} To use a tool, respond with a JSON block in this exact format: ```tool {{"tool": "tool_name", "parameters": {{"param1": "value1", "param2": "value2"}}}} ``` You can call multiple tools by including multiple tool blocks. After using tools and gathering information, provide your final response. When the task is complete, end with "TASK_COMPLETE" on a new line. Be concise and efficient. Focus on completing the task.""" def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]: """Parse tool calls from the model's response""" tool_calls = [] # Pattern to match tool JSON blocks # Match ```tool ... ``` or ```json ... ``` or just JSON objects with "tool" key patterns = [ r'```tool\s*\n?(.*?)\n?```', r'```json\s*\n?(.*?)\n?```', r'\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\}', ] for pattern in patterns[:2]: # First two patterns use groups matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) for match in matches: try: tool_data = json.loads(match.strip()) if "tool" in tool_data: tool_calls.append(tool_data) except json.JSONDecodeError: continue # Try direct JSON pattern direct_matches = re.findall(patterns[2], response) for match in direct_matches: try: tool_data = json.loads(match) if tool_data not in tool_calls: # Avoid duplicates tool_calls.append(tool_data) except json.JSONDecodeError: continue return tool_calls def _generate_response(self, messages: List[Dict[str, str]], max_new_tokens: int = 512) -> str: """Generate a response from the model""" self._initialize_model() try: # Apply chat template inputs = self.tokenizer.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ) # Move to model device if hasattr(self.model, 'device'): inputs = {k: v.to(self.model.device) for k, v in inputs.items()} # Generate outputs = self.model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=self.tokenizer.eos_token_id, ) # Decode only the new tokens input_length = inputs["input_ids"].shape[-1] response = self.tokenizer.decode( outputs[0][input_length:], skip_special_tokens=True ) return response.strip() except Exception as e: logger.error(f"Generation error: {e}") raise async def run( self, task: str, max_iterations: int = 10 ) -> AsyncGenerator[Dict[str, Any], None]: """ Run the agent autonomously on a task. Args: task: The task to complete max_iterations: Maximum tool calls to prevent infinite loops Yields: Events showing agent's progress and tool calls """ yield { "type": "agent_start", "message": f"Autonomous AI Agent (Transformers) starting task", "task": task, "model": self.model_name } # Initialize model (lazy load) try: self._initialize_model() yield { "type": "model_loaded", "message": f"Model {self.model_name} ready" } except Exception as e: yield { "type": "agent_error", "error": str(e), "message": f"Failed to load model: {e}" } return # Build conversation system_prompt = self._build_system_prompt() messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": task} ] iteration = 0 accumulated_results = [] while iteration < max_iterations: iteration += 1 yield { "type": "iteration_start", "iteration": iteration, "message": f"Iteration {iteration}: Thinking..." } try: # Generate response response = await asyncio.get_event_loop().run_in_executor( None, self._generate_response, messages, 512 ) logger.info(f"Model response (iteration {iteration}): {response[:200]}...") # Check for task completion if "TASK_COMPLETE" in response: # Extract final answer (everything before TASK_COMPLETE) final_answer = response.replace("TASK_COMPLETE", "").strip() yield { "type": "thought", "thought": final_answer, "message": f"AI Response: {final_answer[:100]}..." } yield { "type": "agent_complete", "message": "Task complete!", "final_answer": final_answer, "iterations": iteration } return # Parse tool calls tool_calls = self._parse_tool_calls(response) if tool_calls: # Execute each tool call tool_results = [] for tool_call in tool_calls: tool_name = tool_call.get("tool", "") tool_params = tool_call.get("parameters", {}) yield { "type": "tool_call", "tool": tool_name, "input": tool_params, "message": f"Action: {tool_name}" } try: result = await self._execute_mcp_tool(tool_name, tool_params) yield { "type": "tool_result", "tool": tool_name, "result": result, "message": f"Tool {tool_name} completed" } tool_results.append({ "tool": tool_name, "result": result }) accumulated_results.append({ "tool": tool_name, "params": tool_params, "result": result }) except Exception as e: error_msg = str(e) logger.error(f"Tool execution failed: {tool_name} - {error_msg}") yield { "type": "tool_error", "tool": tool_name, "error": error_msg, "message": f"Tool {tool_name} failed: {error_msg}" } tool_results.append({ "tool": tool_name, "error": error_msg }) # Add assistant response and tool results to conversation messages.append({"role": "assistant", "content": response}) # Format tool results for the model results_text = "Tool results:\n" for tr in tool_results: if "error" in tr: results_text += f"- {tr['tool']}: Error - {tr['error']}\n" else: result_str = json.dumps(tr['result'], default=str)[:500] results_text += f"- {tr['tool']}: {result_str}\n" messages.append({"role": "user", "content": results_text}) else: # No tool calls - this might be a thought or partial response yield { "type": "thought", "thought": response, "message": f"AI Response: {response[:100]}..." } # Add to conversation and prompt for continuation messages.append({"role": "assistant", "content": response}) messages.append({ "role": "user", "content": "Continue with the task. Use the available tools to gather information and complete the task. When done, say TASK_COMPLETE." }) except Exception as e: error_msg = str(e) logger.error(f"Error in iteration {iteration}: {error_msg}", exc_info=True) yield { "type": "agent_error", "error": error_msg, "message": f"Error: {error_msg}" } # Try to continue if we have results if accumulated_results: break return # Max iterations reached yield { "type": "agent_max_iterations", "message": f"Reached maximum iterations ({max_iterations})", "iterations": iteration, "accumulated_results": accumulated_results } async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: """ Execute an MCP tool by routing to the appropriate MCP server. """ # ============ SEARCH MCP SERVER ============ if tool_name == "search_web": query = tool_input.get("query", "") max_results = tool_input.get("max_results", 5) results = await self.mcp_registry.search.query(query, max_results=max_results) return { "results": results[:max_results], "count": len(results[:max_results]) } elif tool_name == "search_news": query = tool_input.get("query", "") max_results = tool_input.get("max_results", 5) results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) return { "results": results[:max_results], "count": len(results[:max_results]) } # ============ STORE MCP SERVER ============ elif tool_name == "save_prospect": prospect_data = { "id": tool_input.get("prospect_id", str(uuid.uuid4())), "company": { "id": tool_input.get("company_id"), "name": tool_input.get("company_name"), "domain": tool_input.get("company_domain") }, "fit_score": tool_input.get("fit_score", 0), "status": tool_input.get("status", "new"), "metadata": tool_input.get("metadata", {}) } result = await self.mcp_registry.store.save_prospect(prospect_data) return {"status": result, "prospect_id": prospect_data["id"]} elif tool_name == "get_prospect": prospect_id = tool_input.get("prospect_id", "") prospect = await self.mcp_registry.store.get_prospect(prospect_id) return prospect or {"error": "Prospect not found"} elif tool_name == "list_prospects": prospects = await self.mcp_registry.store.list_prospects() status_filter = tool_input.get("status") if status_filter: prospects = [p for p in prospects if p.get("status") == status_filter] return { "prospects": prospects, "count": len(prospects) } elif tool_name == "save_company": company_data = { "id": tool_input.get("company_id", str(uuid.uuid4())), "name": tool_input.get("name", ""), "domain": tool_input.get("domain", ""), "industry": tool_input.get("industry"), "description": tool_input.get("description"), "employee_count": tool_input.get("employee_count") } result = await self.mcp_registry.store.save_company(company_data) return {"status": result, "company_id": company_data["id"]} elif tool_name == "get_company": company_id = tool_input.get("company_id", "") company = await self.mcp_registry.store.get_company(company_id) return company or {"error": "Company not found"} elif tool_name == "save_fact": fact_data = { "id": tool_input.get("fact_id", str(uuid.uuid4())), "company_id": tool_input.get("company_id", ""), "fact_type": tool_input.get("fact_type", ""), "content": tool_input.get("content", ""), "source_url": tool_input.get("source_url"), "confidence_score": tool_input.get("confidence_score", 0.8) } result = await self.mcp_registry.store.save_fact(fact_data) return {"status": result, "fact_id": fact_data["id"]} elif tool_name == "save_contact": contact_data = { "id": tool_input.get("contact_id", str(uuid.uuid4())), "company_id": tool_input.get("company_id", ""), "email": tool_input.get("email", ""), "first_name": tool_input.get("first_name"), "last_name": tool_input.get("last_name"), "title": tool_input.get("title"), "seniority": tool_input.get("seniority") } result = await self.mcp_registry.store.save_contact(contact_data) return {"status": result, "contact_id": contact_data["id"]} elif tool_name == "list_contacts_by_domain": domain = tool_input.get("domain", "") contacts = await self.mcp_registry.store.list_contacts_by_domain(domain) return { "contacts": contacts, "count": len(contacts) } elif tool_name == "check_suppression": supp_type = tool_input.get("suppression_type", "email") value = tool_input.get("value", "") is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value) return { "suppressed": is_suppressed, "value": value, "type": supp_type } # ============ EMAIL MCP SERVER ============ elif tool_name == "send_email": to = tool_input.get("to", "") subject = tool_input.get("subject", "") body = tool_input.get("body", "") prospect_id = tool_input.get("prospect_id", "") thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) return { "status": "sent", "thread_id": thread_id, "to": to } elif tool_name == "get_email_thread": prospect_id = tool_input.get("prospect_id", "") thread = await self.mcp_registry.email.get_thread(prospect_id) return thread or {"error": "No email thread found"} # ============ CALENDAR MCP SERVER ============ elif tool_name == "suggest_meeting_slots": num_slots = tool_input.get("num_slots", 3) slots = await self.mcp_registry.calendar.suggest_slots() return { "slots": slots[:num_slots], "count": len(slots[:num_slots]) } elif tool_name == "generate_calendar_invite": start_time = tool_input.get("start_time", "") end_time = tool_input.get("end_time", "") title = tool_input.get("title", "Meeting") slot = { "start_iso": start_time, "end_iso": end_time, "title": title } ics = await self.mcp_registry.calendar.generate_ics(slot) return { "ics_content": ics, "meeting": slot } else: raise ValueError(f"Unknown MCP tool: {tool_name}")