Spaces:
Runtime error
Runtime error
| """ | |
| Autonomous AI Agent with MCP Tool Calling using Local Transformers | |
| This agent uses Hugging Face Transformers library to run models locally, | |
| avoiding inference API delays and availability issues. | |
| Uses Qwen3-0.6B for fast, local inference with tool calling support. | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import logging | |
| import asyncio | |
| import re | |
| from typing import List, Dict, Any, AsyncGenerator, Optional | |
| from mcp.tools.definitions import MCP_TOOLS, list_all_tools | |
| from mcp.registry import MCPRegistry | |
| logger = logging.getLogger(__name__) | |
| # Default model - small but capable | |
| DEFAULT_MODEL = "Qwen/Qwen3-0.6B" | |
| class AutonomousMCPAgentTransformers: | |
| """ | |
| AI Agent that autonomously uses MCP servers as tools using local Transformers. | |
| Runs models locally for fast, reliable inference without API dependencies. | |
| """ | |
| def __init__( | |
| self, | |
| mcp_registry: MCPRegistry, | |
| model_name: str = None, | |
| device: str = None | |
| ): | |
| """ | |
| Initialize the autonomous agent with local Transformers | |
| Args: | |
| mcp_registry: MCP registry with all servers | |
| model_name: Model to use (default: Qwen/Qwen3-0.6B) | |
| device: Device to run on ('cuda', 'cpu', or 'auto') | |
| """ | |
| self.mcp_registry = mcp_registry | |
| self.model_name = model_name or os.getenv("TRANSFORMERS_MODEL", DEFAULT_MODEL) | |
| self.device = device or os.getenv("TRANSFORMERS_DEVICE", "auto") | |
| # Lazy load model and tokenizer | |
| self.pipeline = None | |
| self.tokenizer = None | |
| self.model = None | |
| self._initialized = False | |
| # Create tool definitions for the prompt | |
| self.tools_description = self._create_tools_description() | |
| logger.info(f"Autonomous MCP Agent (Transformers) initialized") | |
| logger.info(f" Model: {self.model_name}") | |
| logger.info(f" Device: {self.device}") | |
| logger.info(f" Available tools: {len(MCP_TOOLS)}") | |
| def _initialize_model(self): | |
| """Lazy initialization of the model""" | |
| if self._initialized: | |
| return | |
| logger.info(f"Loading model {self.model_name}...") | |
| try: | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| # Determine device | |
| if self.device == "auto": | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| else: | |
| device = self.device | |
| logger.info(f"Using device: {device}") | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True | |
| ) | |
| # Load model with appropriate settings | |
| model_kwargs = { | |
| "trust_remote_code": True, | |
| } | |
| if device == "cuda": | |
| model_kwargs["torch_dtype"] = torch.float16 | |
| model_kwargs["device_map"] = "auto" | |
| else: | |
| model_kwargs["torch_dtype"] = torch.float32 | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| **model_kwargs | |
| ) | |
| if device == "cpu": | |
| self.model = self.model.to(device) | |
| # Create pipeline for easier generation | |
| self.pipeline = pipeline( | |
| "text-generation", | |
| model=self.model, | |
| tokenizer=self.tokenizer, | |
| device=None if device == "cuda" else device, # device_map handles cuda | |
| ) | |
| self._initialized = True | |
| logger.info(f"Model {self.model_name} loaded successfully") | |
| except ImportError as e: | |
| raise ImportError( | |
| f"transformers package not installed or missing dependencies!\n" | |
| f"Install with: pip install transformers torch\n" | |
| f"Error: {e}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to load model: {e}") | |
| raise | |
| def _create_tools_description(self) -> str: | |
| """Create a description of available tools for the prompt""" | |
| tools_text = "Available tools:\n\n" | |
| for tool in MCP_TOOLS: | |
| tools_text += f"- **{tool['name']}**: {tool['description']}\n" | |
| if tool.get('input_schema', {}).get('properties'): | |
| tools_text += " Parameters:\n" | |
| for param, details in tool['input_schema']['properties'].items(): | |
| required = param in tool['input_schema'].get('required', []) | |
| req_str = " (required)" if required else " (optional)" | |
| tools_text += f" - {param}{req_str}: {details.get('description', '')}\n" | |
| tools_text += "\n" | |
| return tools_text | |
| def _build_system_prompt(self) -> str: | |
| """Build the system prompt with tool instructions""" | |
| return f"""You are an autonomous AI agent for B2B sales automation. | |
| You have access to MCP (Model Context Protocol) tools that let you: | |
| - Search the web for company information and news | |
| - Save prospects, companies, contacts, and facts to a database | |
| - Send emails and manage email threads | |
| - Schedule meetings and generate calendar invites | |
| {self.tools_description} | |
| To use a tool, respond with a JSON block in this exact format: | |
| ```tool | |
| {{"tool": "tool_name", "parameters": {{"param1": "value1", "param2": "value2"}}}} | |
| ``` | |
| You can call multiple tools by including multiple tool blocks. | |
| After using tools and gathering information, provide your final response. | |
| When the task is complete, end with "TASK_COMPLETE" on a new line. | |
| Be concise and efficient. Focus on completing the task.""" | |
| def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]: | |
| """Parse tool calls from the model's response""" | |
| tool_calls = [] | |
| # Pattern to match tool JSON blocks | |
| # Match ```tool ... ``` or ```json ... ``` or just JSON objects with "tool" key | |
| patterns = [ | |
| r'```tool\s*\n?(.*?)\n?```', | |
| r'```json\s*\n?(.*?)\n?```', | |
| r'\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\}', | |
| ] | |
| for pattern in patterns[:2]: # First two patterns use groups | |
| matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| try: | |
| tool_data = json.loads(match.strip()) | |
| if "tool" in tool_data: | |
| tool_calls.append(tool_data) | |
| except json.JSONDecodeError: | |
| continue | |
| # Try direct JSON pattern | |
| direct_matches = re.findall(patterns[2], response) | |
| for match in direct_matches: | |
| try: | |
| tool_data = json.loads(match) | |
| if tool_data not in tool_calls: # Avoid duplicates | |
| tool_calls.append(tool_data) | |
| except json.JSONDecodeError: | |
| continue | |
| return tool_calls | |
| def _generate_response(self, messages: List[Dict[str, str]], max_new_tokens: int = 512) -> str: | |
| """Generate a response from the model""" | |
| self._initialize_model() | |
| try: | |
| # Apply chat template | |
| inputs = self.tokenizer.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| return_dict=True, | |
| return_tensors="pt", | |
| ) | |
| # Move to model device | |
| if hasattr(self.model, 'device'): | |
| inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
| # Generate | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.9, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| # Decode only the new tokens | |
| input_length = inputs["input_ids"].shape[-1] | |
| response = self.tokenizer.decode( | |
| outputs[0][input_length:], | |
| skip_special_tokens=True | |
| ) | |
| return response.strip() | |
| except Exception as e: | |
| logger.error(f"Generation error: {e}") | |
| raise | |
| async def run( | |
| self, | |
| task: str, | |
| max_iterations: int = 10 | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """ | |
| Run the agent autonomously on a task. | |
| Args: | |
| task: The task to complete | |
| max_iterations: Maximum tool calls to prevent infinite loops | |
| Yields: | |
| Events showing agent's progress and tool calls | |
| """ | |
| yield { | |
| "type": "agent_start", | |
| "message": f"Autonomous AI Agent (Transformers) starting task", | |
| "task": task, | |
| "model": self.model_name | |
| } | |
| # Initialize model (lazy load) | |
| try: | |
| self._initialize_model() | |
| yield { | |
| "type": "model_loaded", | |
| "message": f"Model {self.model_name} ready" | |
| } | |
| except Exception as e: | |
| yield { | |
| "type": "agent_error", | |
| "error": str(e), | |
| "message": f"Failed to load model: {e}" | |
| } | |
| return | |
| # Build conversation | |
| system_prompt = self._build_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": task} | |
| ] | |
| iteration = 0 | |
| accumulated_results = [] | |
| while iteration < max_iterations: | |
| iteration += 1 | |
| yield { | |
| "type": "iteration_start", | |
| "iteration": iteration, | |
| "message": f"Iteration {iteration}: Thinking..." | |
| } | |
| try: | |
| # Generate response | |
| response = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| self._generate_response, | |
| messages, | |
| 512 | |
| ) | |
| logger.info(f"Model response (iteration {iteration}): {response[:200]}...") | |
| # Check for task completion | |
| if "TASK_COMPLETE" in response: | |
| # Extract final answer (everything before TASK_COMPLETE) | |
| final_answer = response.replace("TASK_COMPLETE", "").strip() | |
| yield { | |
| "type": "thought", | |
| "thought": final_answer, | |
| "message": f"AI Response: {final_answer[:100]}..." | |
| } | |
| yield { | |
| "type": "agent_complete", | |
| "message": "Task complete!", | |
| "final_answer": final_answer, | |
| "iterations": iteration | |
| } | |
| return | |
| # Parse tool calls | |
| tool_calls = self._parse_tool_calls(response) | |
| if tool_calls: | |
| # Execute each tool call | |
| tool_results = [] | |
| for tool_call in tool_calls: | |
| tool_name = tool_call.get("tool", "") | |
| tool_params = tool_call.get("parameters", {}) | |
| yield { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "input": tool_params, | |
| "message": f"Action: {tool_name}" | |
| } | |
| try: | |
| result = await self._execute_mcp_tool(tool_name, tool_params) | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": result, | |
| "message": f"Tool {tool_name} completed" | |
| } | |
| tool_results.append({ | |
| "tool": tool_name, | |
| "result": result | |
| }) | |
| accumulated_results.append({ | |
| "tool": tool_name, | |
| "params": tool_params, | |
| "result": result | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Tool execution failed: {tool_name} - {error_msg}") | |
| yield { | |
| "type": "tool_error", | |
| "tool": tool_name, | |
| "error": error_msg, | |
| "message": f"Tool {tool_name} failed: {error_msg}" | |
| } | |
| tool_results.append({ | |
| "tool": tool_name, | |
| "error": error_msg | |
| }) | |
| # Add assistant response and tool results to conversation | |
| messages.append({"role": "assistant", "content": response}) | |
| # Format tool results for the model | |
| results_text = "Tool results:\n" | |
| for tr in tool_results: | |
| if "error" in tr: | |
| results_text += f"- {tr['tool']}: Error - {tr['error']}\n" | |
| else: | |
| result_str = json.dumps(tr['result'], default=str)[:500] | |
| results_text += f"- {tr['tool']}: {result_str}\n" | |
| messages.append({"role": "user", "content": results_text}) | |
| else: | |
| # No tool calls - this might be a thought or partial response | |
| yield { | |
| "type": "thought", | |
| "thought": response, | |
| "message": f"AI Response: {response[:100]}..." | |
| } | |
| # Add to conversation and prompt for continuation | |
| messages.append({"role": "assistant", "content": response}) | |
| messages.append({ | |
| "role": "user", | |
| "content": "Continue with the task. Use the available tools to gather information and complete the task. When done, say TASK_COMPLETE." | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Error in iteration {iteration}: {error_msg}", exc_info=True) | |
| yield { | |
| "type": "agent_error", | |
| "error": error_msg, | |
| "message": f"Error: {error_msg}" | |
| } | |
| # Try to continue if we have results | |
| if accumulated_results: | |
| break | |
| return | |
| # Max iterations reached | |
| yield { | |
| "type": "agent_max_iterations", | |
| "message": f"Reached maximum iterations ({max_iterations})", | |
| "iterations": iteration, | |
| "accumulated_results": accumulated_results | |
| } | |
| async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: | |
| """ | |
| Execute an MCP tool by routing to the appropriate MCP server. | |
| """ | |
| # ============ SEARCH MCP SERVER ============ | |
| if tool_name == "search_web": | |
| query = tool_input.get("query", "") | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(query, max_results=max_results) | |
| return { | |
| "results": results[:max_results], | |
| "count": len(results[:max_results]) | |
| } | |
| elif tool_name == "search_news": | |
| query = tool_input.get("query", "") | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) | |
| return { | |
| "results": results[:max_results], | |
| "count": len(results[:max_results]) | |
| } | |
| # ============ STORE MCP SERVER ============ | |
| elif tool_name == "save_prospect": | |
| prospect_data = { | |
| "id": tool_input.get("prospect_id", str(uuid.uuid4())), | |
| "company": { | |
| "id": tool_input.get("company_id"), | |
| "name": tool_input.get("company_name"), | |
| "domain": tool_input.get("company_domain") | |
| }, | |
| "fit_score": tool_input.get("fit_score", 0), | |
| "status": tool_input.get("status", "new"), | |
| "metadata": tool_input.get("metadata", {}) | |
| } | |
| result = await self.mcp_registry.store.save_prospect(prospect_data) | |
| return {"status": result, "prospect_id": prospect_data["id"]} | |
| elif tool_name == "get_prospect": | |
| prospect_id = tool_input.get("prospect_id", "") | |
| prospect = await self.mcp_registry.store.get_prospect(prospect_id) | |
| return prospect or {"error": "Prospect not found"} | |
| elif tool_name == "list_prospects": | |
| prospects = await self.mcp_registry.store.list_prospects() | |
| status_filter = tool_input.get("status") | |
| if status_filter: | |
| prospects = [p for p in prospects if p.get("status") == status_filter] | |
| return { | |
| "prospects": prospects, | |
| "count": len(prospects) | |
| } | |
| elif tool_name == "save_company": | |
| company_data = { | |
| "id": tool_input.get("company_id", str(uuid.uuid4())), | |
| "name": tool_input.get("name", ""), | |
| "domain": tool_input.get("domain", ""), | |
| "industry": tool_input.get("industry"), | |
| "description": tool_input.get("description"), | |
| "employee_count": tool_input.get("employee_count") | |
| } | |
| result = await self.mcp_registry.store.save_company(company_data) | |
| return {"status": result, "company_id": company_data["id"]} | |
| elif tool_name == "get_company": | |
| company_id = tool_input.get("company_id", "") | |
| company = await self.mcp_registry.store.get_company(company_id) | |
| return company or {"error": "Company not found"} | |
| elif tool_name == "save_fact": | |
| fact_data = { | |
| "id": tool_input.get("fact_id", str(uuid.uuid4())), | |
| "company_id": tool_input.get("company_id", ""), | |
| "fact_type": tool_input.get("fact_type", ""), | |
| "content": tool_input.get("content", ""), | |
| "source_url": tool_input.get("source_url"), | |
| "confidence_score": tool_input.get("confidence_score", 0.8) | |
| } | |
| result = await self.mcp_registry.store.save_fact(fact_data) | |
| return {"status": result, "fact_id": fact_data["id"]} | |
| elif tool_name == "save_contact": | |
| contact_data = { | |
| "id": tool_input.get("contact_id", str(uuid.uuid4())), | |
| "company_id": tool_input.get("company_id", ""), | |
| "email": tool_input.get("email", ""), | |
| "first_name": tool_input.get("first_name"), | |
| "last_name": tool_input.get("last_name"), | |
| "title": tool_input.get("title"), | |
| "seniority": tool_input.get("seniority") | |
| } | |
| result = await self.mcp_registry.store.save_contact(contact_data) | |
| return {"status": result, "contact_id": contact_data["id"]} | |
| elif tool_name == "list_contacts_by_domain": | |
| domain = tool_input.get("domain", "") | |
| contacts = await self.mcp_registry.store.list_contacts_by_domain(domain) | |
| return { | |
| "contacts": contacts, | |
| "count": len(contacts) | |
| } | |
| elif tool_name == "check_suppression": | |
| supp_type = tool_input.get("suppression_type", "email") | |
| value = tool_input.get("value", "") | |
| is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value) | |
| return { | |
| "suppressed": is_suppressed, | |
| "value": value, | |
| "type": supp_type | |
| } | |
| # ============ EMAIL MCP SERVER ============ | |
| elif tool_name == "send_email": | |
| to = tool_input.get("to", "") | |
| subject = tool_input.get("subject", "") | |
| body = tool_input.get("body", "") | |
| prospect_id = tool_input.get("prospect_id", "") | |
| thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) | |
| return { | |
| "status": "sent", | |
| "thread_id": thread_id, | |
| "to": to | |
| } | |
| elif tool_name == "get_email_thread": | |
| prospect_id = tool_input.get("prospect_id", "") | |
| thread = await self.mcp_registry.email.get_thread(prospect_id) | |
| return thread or {"error": "No email thread found"} | |
| # ============ CALENDAR MCP SERVER ============ | |
| elif tool_name == "suggest_meeting_slots": | |
| num_slots = tool_input.get("num_slots", 3) | |
| slots = await self.mcp_registry.calendar.suggest_slots() | |
| return { | |
| "slots": slots[:num_slots], | |
| "count": len(slots[:num_slots]) | |
| } | |
| elif tool_name == "generate_calendar_invite": | |
| start_time = tool_input.get("start_time", "") | |
| end_time = tool_input.get("end_time", "") | |
| title = tool_input.get("title", "Meeting") | |
| slot = { | |
| "start_iso": start_time, | |
| "end_iso": end_time, | |
| "title": title | |
| } | |
| ics = await self.mcp_registry.calendar.generate_ics(slot) | |
| return { | |
| "ics_content": ics, | |
| "meeting": slot | |
| } | |
| else: | |
| raise ValueError(f"Unknown MCP tool: {tool_name}") | |