Spaces:
Runtime error
Runtime error
| """ | |
| Autonomous AI Agent with MCP Tool Calling | |
| This agent uses Claude 3.5 Sonnet (or compatible LLM) to autonomously | |
| decide which MCP tools to call based on the user's task. | |
| This is TRUE AI-driven MCP usage - no hardcoded workflow! | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import logging | |
| from typing import List, Dict, Any, AsyncGenerator | |
| from anthropic import AsyncAnthropic | |
| from mcp.tools.definitions import MCP_TOOLS | |
| from mcp.registry import MCPRegistry | |
| logger = logging.getLogger(__name__) | |
| class AutonomousMCPAgent: | |
| """ | |
| AI Agent that autonomously uses MCP servers as tools. | |
| Key Features: | |
| - Uses Claude 3.5 Sonnet for tool calling | |
| - Autonomously decides which MCP tools to use | |
| - No hardcoded workflow - AI makes all decisions | |
| - Proper MCP protocol implementation | |
| """ | |
| def __init__(self, mcp_registry: MCPRegistry, api_key: str = None): | |
| """ | |
| Initialize the autonomous agent | |
| Args: | |
| mcp_registry: MCP registry with all servers | |
| api_key: Anthropic API key (or use ANTHROPIC_API_KEY env var) | |
| """ | |
| self.mcp_registry = mcp_registry | |
| self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY") | |
| if not self.api_key: | |
| raise ValueError( | |
| "Anthropic API key required for autonomous agent. " | |
| "Set ANTHROPIC_API_KEY environment variable or pass api_key parameter." | |
| ) | |
| self.client = AsyncAnthropic(api_key=self.api_key) | |
| self.model = "claude-3-5-sonnet-20241022" | |
| # System prompt for the agent | |
| self.system_prompt = """You are an autonomous AI agent for B2B sales automation. | |
| You have access to MCP (Model Context Protocol) servers that provide tools for: | |
| - Web search (find company information, news, insights) | |
| - Data storage (save prospects, companies, contacts, facts) | |
| - Email management (send emails, track threads) | |
| - Calendar (schedule meetings) | |
| Your goal is to help with B2B sales tasks like: | |
| - Finding and researching potential customers | |
| - Enriching company data with facts and insights | |
| - Finding decision-maker contacts | |
| - Drafting personalized outreach emails | |
| - Managing prospect pipeline | |
| IMPORTANT: | |
| 1. Think step-by-step about what information you need | |
| 2. Use tools autonomously to gather information | |
| 3. Save important data to the store for persistence | |
| 4. Be thorough in research before making recommendations | |
| 5. Always check suppression list before suggesting email sends | |
| You should: | |
| - Search for company information when needed | |
| - Save prospects and companies to the database | |
| - Find and save contacts | |
| - Generate personalized outreach based on research | |
| - Track your progress and findings | |
| Work autonomously - decide which tools to use and when!""" | |
| logger.info(f"Autonomous MCP Agent initialized with model: {self.model}") | |
| async def run( | |
| self, | |
| task: str, | |
| max_iterations: int = 15 | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """ | |
| Run the agent autonomously on a task. | |
| The agent will: | |
| 1. Understand the task | |
| 2. Decide which MCP tools to call | |
| 3. Execute tools autonomously | |
| 4. Continue until task is complete or max iterations reached | |
| Args: | |
| task: The task to complete (e.g., "Research and create outreach for Shopify") | |
| max_iterations: Maximum tool calls to prevent infinite loops | |
| Yields: | |
| Events showing agent's progress and tool calls | |
| """ | |
| yield { | |
| "type": "agent_start", | |
| "message": f"🤖 Autonomous AI Agent starting task: {task}", | |
| "model": self.model | |
| } | |
| # Initialize conversation | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": task | |
| } | |
| ] | |
| iteration = 0 | |
| while iteration < max_iterations: | |
| iteration += 1 | |
| yield { | |
| "type": "iteration_start", | |
| "iteration": iteration, | |
| "message": f"🔄 Iteration {iteration}: AI deciding next action..." | |
| } | |
| try: | |
| # Call Claude with tools | |
| response = await self.client.messages.create( | |
| model=self.model, | |
| max_tokens=4096, | |
| system=self.system_prompt, | |
| messages=messages, | |
| tools=MCP_TOOLS | |
| ) | |
| # Add assistant response to conversation | |
| messages.append({ | |
| "role": "assistant", | |
| "content": response.content | |
| }) | |
| # Check if AI wants to use tools | |
| tool_calls = [block for block in response.content if block.type == "tool_use"] | |
| if not tool_calls: | |
| # AI is done - no more tools to call | |
| final_text = next( | |
| (block.text for block in response.content if hasattr(block, "text")), | |
| "Task completed!" | |
| ) | |
| yield { | |
| "type": "agent_complete", | |
| "message": f"✅ Task complete!", | |
| "final_response": final_text, | |
| "iterations": iteration | |
| } | |
| break | |
| # Execute tool calls | |
| tool_results = [] | |
| for tool_call in tool_calls: | |
| tool_name = tool_call.name | |
| tool_input = tool_call.input | |
| yield { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "input": tool_input, | |
| "message": f"🔧 AI calling tool: {tool_name}" | |
| } | |
| # Execute the MCP tool | |
| try: | |
| result = await self._execute_mcp_tool(tool_name, tool_input) | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": result, | |
| "message": f"✓ Tool {tool_name} completed" | |
| } | |
| # Add tool result to conversation | |
| tool_results.append({ | |
| "type": "tool_result", | |
| "tool_use_id": tool_call.id, | |
| "content": json.dumps(result, default=str) | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Tool execution failed: {tool_name} - {error_msg}") | |
| yield { | |
| "type": "tool_error", | |
| "tool": tool_name, | |
| "error": error_msg, | |
| "message": f"❌ Tool {tool_name} failed: {error_msg}" | |
| } | |
| tool_results.append({ | |
| "type": "tool_result", | |
| "tool_use_id": tool_call.id, | |
| "content": json.dumps({"error": error_msg}), | |
| "is_error": True | |
| }) | |
| # Add tool results to conversation | |
| messages.append({ | |
| "role": "user", | |
| "content": tool_results | |
| }) | |
| except Exception as e: | |
| logger.error(f"Agent iteration failed: {e}") | |
| yield { | |
| "type": "agent_error", | |
| "error": str(e), | |
| "message": f"❌ Agent error: {str(e)}" | |
| } | |
| break | |
| if iteration >= max_iterations: | |
| yield { | |
| "type": "agent_max_iterations", | |
| "message": f"⚠️ Reached maximum iterations ({max_iterations})", | |
| "iterations": iteration | |
| } | |
| async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: | |
| """ | |
| Execute an MCP tool by routing to the appropriate MCP server. | |
| This is where we actually call the MCP servers! | |
| """ | |
| # ============ SEARCH MCP SERVER ============ | |
| if tool_name == "search_web": | |
| query = tool_input["query"] | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(query, max_results=max_results) | |
| return { | |
| "results": results, | |
| "count": len(results) | |
| } | |
| elif tool_name == "search_news": | |
| query = tool_input["query"] | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) | |
| return { | |
| "results": results, | |
| "count": len(results) | |
| } | |
| # ============ STORE MCP SERVER ============ | |
| elif tool_name == "save_prospect": | |
| prospect_data = { | |
| "id": tool_input.get("prospect_id", str(uuid.uuid4())), | |
| "company": { | |
| "id": tool_input.get("company_id"), | |
| "name": tool_input.get("company_name"), | |
| "domain": tool_input.get("company_domain") | |
| }, | |
| "fit_score": tool_input.get("fit_score", 0), | |
| "status": tool_input.get("status", "new"), | |
| "metadata": tool_input.get("metadata", {}) | |
| } | |
| result = await self.mcp_registry.store.save_prospect(prospect_data) | |
| return {"status": result, "prospect_id": prospect_data["id"]} | |
| elif tool_name == "get_prospect": | |
| prospect_id = tool_input["prospect_id"] | |
| prospect = await self.mcp_registry.store.get_prospect(prospect_id) | |
| return prospect or {"error": "Prospect not found"} | |
| elif tool_name == "list_prospects": | |
| prospects = await self.mcp_registry.store.list_prospects() | |
| status_filter = tool_input.get("status") | |
| if status_filter: | |
| prospects = [p for p in prospects if p.get("status") == status_filter] | |
| return { | |
| "prospects": prospects, | |
| "count": len(prospects) | |
| } | |
| elif tool_name == "save_company": | |
| company_data = { | |
| "id": tool_input.get("company_id", str(uuid.uuid4())), | |
| "name": tool_input["name"], | |
| "domain": tool_input["domain"], | |
| "industry": tool_input.get("industry"), | |
| "description": tool_input.get("description"), | |
| "employee_count": tool_input.get("employee_count") | |
| } | |
| result = await self.mcp_registry.store.save_company(company_data) | |
| return {"status": result, "company_id": company_data["id"]} | |
| elif tool_name == "get_company": | |
| company_id = tool_input["company_id"] | |
| company = await self.mcp_registry.store.get_company(company_id) | |
| return company or {"error": "Company not found"} | |
| elif tool_name == "save_fact": | |
| fact_data = { | |
| "id": tool_input.get("fact_id", str(uuid.uuid4())), | |
| "company_id": tool_input["company_id"], | |
| "fact_type": tool_input["fact_type"], | |
| "content": tool_input["content"], | |
| "source_url": tool_input.get("source_url"), | |
| "confidence_score": tool_input.get("confidence_score", 0.8) | |
| } | |
| result = await self.mcp_registry.store.save_fact(fact_data) | |
| return {"status": result, "fact_id": fact_data["id"]} | |
| elif tool_name == "save_contact": | |
| contact_data = { | |
| "id": tool_input.get("contact_id", str(uuid.uuid4())), | |
| "company_id": tool_input["company_id"], | |
| "email": tool_input["email"], | |
| "first_name": tool_input.get("first_name"), | |
| "last_name": tool_input.get("last_name"), | |
| "title": tool_input.get("title"), | |
| "seniority": tool_input.get("seniority") | |
| } | |
| result = await self.mcp_registry.store.save_contact(contact_data) | |
| return {"status": result, "contact_id": contact_data["id"]} | |
| elif tool_name == "list_contacts_by_domain": | |
| domain = tool_input["domain"] | |
| contacts = await self.mcp_registry.store.list_contacts_by_domain(domain) | |
| return { | |
| "contacts": contacts, | |
| "count": len(contacts) | |
| } | |
| elif tool_name == "check_suppression": | |
| supp_type = tool_input["suppression_type"] | |
| value = tool_input["value"] | |
| is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value) | |
| return { | |
| "suppressed": is_suppressed, | |
| "value": value, | |
| "type": supp_type | |
| } | |
| # ============ EMAIL MCP SERVER ============ | |
| elif tool_name == "send_email": | |
| to = tool_input["to"] | |
| subject = tool_input["subject"] | |
| body = tool_input["body"] | |
| prospect_id = tool_input["prospect_id"] | |
| thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) | |
| return { | |
| "status": "sent", | |
| "thread_id": thread_id, | |
| "to": to | |
| } | |
| elif tool_name == "get_email_thread": | |
| prospect_id = tool_input["prospect_id"] | |
| thread = await self.mcp_registry.email.get_thread(prospect_id) | |
| return thread or {"error": "No email thread found"} | |
| # ============ CALENDAR MCP SERVER ============ | |
| elif tool_name == "suggest_meeting_slots": | |
| num_slots = tool_input.get("num_slots", 3) | |
| slots = await self.mcp_registry.calendar.suggest_slots() | |
| return { | |
| "slots": slots[:num_slots], | |
| "count": len(slots[:num_slots]) | |
| } | |
| elif tool_name == "generate_calendar_invite": | |
| start_time = tool_input["start_time"] | |
| end_time = tool_input["end_time"] | |
| title = tool_input["title"] | |
| slot = { | |
| "start_iso": start_time, | |
| "end_iso": end_time, | |
| "title": title | |
| } | |
| ics = await self.mcp_registry.calendar.generate_ics(slot) | |
| return { | |
| "ics_content": ics, | |
| "meeting": slot | |
| } | |
| else: | |
| raise ValueError(f"Unknown MCP tool: {tool_name}") | |