Spaces:
Runtime error
Runtime error
| """ | |
| Autonomous AI Agent with MCP Tool Calling using Groq API | |
| Groq offers FREE API access with fast inference on Llama, Mixtral models. | |
| No payment required - just need a free API key from console.groq.com | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import logging | |
| import asyncio | |
| from typing import List, Dict, Any, AsyncGenerator, Optional | |
| from mcp.tools.definitions import MCP_TOOLS | |
| from mcp.registry import MCPRegistry | |
| logger = logging.getLogger(__name__) | |
| # Groq FREE models | |
| GROQ_MODELS = [ | |
| "llama-3.1-70b-versatile", # Best quality, free | |
| "llama-3.1-8b-instant", # Fast, free | |
| "mixtral-8x7b-32768", # Good for complex tasks | |
| "gemma2-9b-it", # Google's model | |
| ] | |
| DEFAULT_MODEL = "llama-3.1-70b-versatile" | |
| class AutonomousMCPAgentGroq: | |
| """ | |
| AI Agent using Groq API (FREE, fast inference) | |
| Get your free API key at: https://console.groq.com | |
| """ | |
| def __init__( | |
| self, | |
| mcp_registry: MCPRegistry, | |
| api_key: str = None, | |
| model: str = None | |
| ): | |
| self.mcp_registry = mcp_registry | |
| self.api_key = api_key or os.getenv("GROQ_API_KEY") | |
| self.model = model or os.getenv("GROQ_MODEL", DEFAULT_MODEL) | |
| if not self.api_key: | |
| raise ValueError("GROQ_API_KEY is required. Get free key at https://console.groq.com") | |
| # Build tools for the prompt | |
| self.tools_description = self._build_tools_description() | |
| logger.info(f"Groq Agent initialized with model: {self.model}") | |
| def _build_tools_description(self) -> str: | |
| """Build tool descriptions for the system prompt""" | |
| tools_text = "" | |
| for tool in MCP_TOOLS: | |
| tools_text += f"\n- **{tool['name']}**: {tool['description']}" | |
| props = tool.get('input_schema', {}).get('properties', {}) | |
| required = tool.get('input_schema', {}).get('required', []) | |
| if props: | |
| tools_text += "\n Parameters:" | |
| for param, details in props.items(): | |
| req = "(required)" if param in required else "(optional)" | |
| tools_text += f"\n - {param} {req}: {details.get('description', '')}" | |
| return tools_text | |
| def _build_system_prompt(self) -> str: | |
| return f"""You are an AI sales agent with access to tools. Use tools to complete tasks. | |
| AVAILABLE TOOLS: | |
| {self.tools_description} | |
| TO USE A TOOL, respond with JSON in this exact format: | |
| ```json | |
| {{"tool": "tool_name", "parameters": {{"param1": "value1"}}}} | |
| ``` | |
| RULES: | |
| 1. Use search_web to find information | |
| 2. Use save_prospect, save_contact to store data | |
| 3. Use send_email to draft emails | |
| 4. After completing all tasks, provide a summary | |
| 5. Say "DONE" when finished | |
| Be concise and focused.""" | |
| async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]: | |
| """Run the agent on a task""" | |
| import requests | |
| yield { | |
| "type": "agent_start", | |
| "message": f"Starting task with {self.model}", | |
| "model": self.model | |
| } | |
| system_prompt = self._build_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": task} | |
| ] | |
| for iteration in range(1, max_iterations + 1): | |
| yield { | |
| "type": "iteration_start", | |
| "iteration": iteration, | |
| "message": f"Iteration {iteration}: AI reasoning..." | |
| } | |
| try: | |
| # Call Groq API | |
| response = self._call_groq(messages) | |
| assistant_content = response.get("choices", [{}])[0].get("message", {}).get("content", "") | |
| if not assistant_content: | |
| continue | |
| # Check for completion | |
| if "DONE" in assistant_content.upper(): | |
| yield { | |
| "type": "thought", | |
| "thought": assistant_content.replace("DONE", "").strip(), | |
| "message": "Task complete" | |
| } | |
| yield { | |
| "type": "agent_complete", | |
| "message": "Task complete!", | |
| "final_answer": assistant_content.replace("DONE", "").strip(), | |
| "iterations": iteration | |
| } | |
| return | |
| # Try to parse tool calls | |
| tool_calls = self._parse_tool_calls(assistant_content) | |
| if tool_calls: | |
| messages.append({"role": "assistant", "content": assistant_content}) | |
| tool_results = [] | |
| for tool_call in tool_calls: | |
| tool_name = tool_call.get("tool", "") | |
| tool_params = tool_call.get("parameters", {}) | |
| yield { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "input": tool_params, | |
| "message": f"Calling: {tool_name}" | |
| } | |
| try: | |
| result = await self._execute_tool(tool_name, tool_params) | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": result, | |
| "message": f"Tool {tool_name} completed" | |
| } | |
| tool_results.append({"tool": tool_name, "result": result}) | |
| except Exception as e: | |
| yield { | |
| "type": "tool_error", | |
| "tool": tool_name, | |
| "error": str(e), | |
| "message": f"Tool error: {e}" | |
| } | |
| tool_results.append({"tool": tool_name, "error": str(e)}) | |
| # Add tool results to conversation | |
| results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000] | |
| messages.append({"role": "user", "content": results_text}) | |
| else: | |
| # No tool calls - just a response | |
| yield { | |
| "type": "thought", | |
| "thought": assistant_content, | |
| "message": f"AI: {assistant_content[:100]}..." | |
| } | |
| messages.append({"role": "assistant", "content": assistant_content}) | |
| messages.append({"role": "user", "content": "Continue with the task. Use tools to gather data. Say DONE when finished."}) | |
| except Exception as e: | |
| logger.error(f"Error in iteration {iteration}: {e}") | |
| yield { | |
| "type": "agent_error", | |
| "error": str(e), | |
| "message": f"Error: {e}" | |
| } | |
| return | |
| yield { | |
| "type": "agent_max_iterations", | |
| "message": f"Reached max iterations ({max_iterations})", | |
| "iterations": max_iterations | |
| } | |
| def _call_groq(self, messages: List[Dict]) -> Dict: | |
| """Call Groq API""" | |
| import requests | |
| url = "https://api.groq.com/openai/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": self.model, | |
| "messages": messages, | |
| "max_tokens": 2048, | |
| "temperature": 0.7 | |
| } | |
| response = requests.post(url, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| return response.json() | |
| def _parse_tool_calls(self, text: str) -> List[Dict]: | |
| """Parse tool calls from response text""" | |
| import re | |
| tool_calls = [] | |
| # Match JSON blocks | |
| patterns = [ | |
| r'```json\s*(\{[^`]+\})\s*```', | |
| r'```\s*(\{[^`]+\})\s*```', | |
| r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})', | |
| ] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| for match in matches: | |
| try: | |
| data = json.loads(match.strip()) | |
| if "tool" in data: | |
| tool_calls.append(data) | |
| except json.JSONDecodeError: | |
| continue | |
| return tool_calls | |
| async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: | |
| """Execute an MCP tool""" | |
| if tool_name == "search_web": | |
| query = tool_input.get("query", "") | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(query, max_results=max_results) | |
| return {"results": results[:max_results], "count": len(results[:max_results])} | |
| elif tool_name == "search_news": | |
| query = tool_input.get("query", "") | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) | |
| return {"results": results[:max_results], "count": len(results[:max_results])} | |
| elif tool_name == "save_prospect": | |
| prospect_data = { | |
| "id": tool_input.get("prospect_id", str(uuid.uuid4())), | |
| "company": { | |
| "id": tool_input.get("company_id"), | |
| "name": tool_input.get("company_name"), | |
| "domain": tool_input.get("company_domain") | |
| }, | |
| "fit_score": tool_input.get("fit_score", 0), | |
| "status": tool_input.get("status", "new"), | |
| "metadata": tool_input.get("metadata", {}) | |
| } | |
| result = await self.mcp_registry.store.save_prospect(prospect_data) | |
| return {"status": result, "prospect_id": prospect_data["id"]} | |
| elif tool_name == "save_company": | |
| company_data = { | |
| "id": tool_input.get("company_id", str(uuid.uuid4())), | |
| "name": tool_input.get("name", ""), | |
| "domain": tool_input.get("domain", ""), | |
| "industry": tool_input.get("industry"), | |
| "description": tool_input.get("description"), | |
| "employee_count": tool_input.get("employee_count") | |
| } | |
| result = await self.mcp_registry.store.save_company(company_data) | |
| return {"status": result, "company_id": company_data["id"]} | |
| elif tool_name == "save_contact": | |
| contact_data = { | |
| "id": tool_input.get("contact_id", str(uuid.uuid4())), | |
| "company_id": tool_input.get("company_id", ""), | |
| "email": tool_input.get("email", ""), | |
| "first_name": tool_input.get("first_name"), | |
| "last_name": tool_input.get("last_name"), | |
| "title": tool_input.get("title"), | |
| "seniority": tool_input.get("seniority") | |
| } | |
| result = await self.mcp_registry.store.save_contact(contact_data) | |
| return {"status": result, "contact_id": contact_data["id"]} | |
| elif tool_name == "save_fact": | |
| fact_data = { | |
| "id": tool_input.get("fact_id", str(uuid.uuid4())), | |
| "company_id": tool_input.get("company_id", ""), | |
| "fact_type": tool_input.get("fact_type", ""), | |
| "content": tool_input.get("content", ""), | |
| "source_url": tool_input.get("source_url"), | |
| "confidence_score": tool_input.get("confidence_score", 0.8) | |
| } | |
| result = await self.mcp_registry.store.save_fact(fact_data) | |
| return {"status": result, "fact_id": fact_data["id"]} | |
| elif tool_name == "send_email": | |
| to = tool_input.get("to", "") | |
| subject = tool_input.get("subject", "") | |
| body = tool_input.get("body", "") | |
| prospect_id = tool_input.get("prospect_id", "") | |
| thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) | |
| return {"status": "sent", "thread_id": thread_id, "to": to} | |
| elif tool_name == "list_prospects": | |
| prospects = await self.mcp_registry.store.list_prospects() | |
| return {"prospects": prospects, "count": len(prospects)} | |
| elif tool_name == "get_prospect": | |
| prospect_id = tool_input.get("prospect_id", "") | |
| prospect = await self.mcp_registry.store.get_prospect(prospect_id) | |
| return prospect or {"error": "Prospect not found"} | |
| elif tool_name == "suggest_meeting_slots": | |
| slots = await self.mcp_registry.calendar.suggest_slots() | |
| return {"slots": slots[:3], "count": len(slots[:3])} | |
| else: | |
| raise ValueError(f"Unknown tool: {tool_name}") | |