Spaces:
Runtime error
Runtime error
| """ | |
| Autonomous AI Agent with MCP Tool Calling using Ollama Python Client | |
| Uses the ollama Python package for LLM inference. | |
| Based on: https://github.com/ollama/ollama-python | |
| Example usage (from the guide): | |
| from ollama import chat | |
| response = chat( | |
| model='granite4:1b', | |
| messages=[ | |
| {'role': 'system', 'content': 'You are a helpful assistant.'}, | |
| {'role': 'user', 'content': user_input} | |
| ], | |
| options={'temperature': 0.0, 'top_p': 1.0} | |
| ) | |
| output = response.message.content | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import logging | |
| import asyncio | |
| from typing import List, Dict, Any, AsyncGenerator | |
| from mcp.tools.definitions import MCP_TOOLS | |
| from mcp.registry import MCPRegistry | |
| logger = logging.getLogger(__name__) | |
| # Default model - IBM Granite 4 1B | |
| DEFAULT_MODEL = "granite4:1b" | |
| class AutonomousMCPAgentOllama: | |
| """ | |
| AI Agent using Ollama Python client (FREE local LLM) | |
| Uses ollama.chat() directly as per the official documentation. | |
| Temperature=0.0 and top_p=1.0 recommended for Granite family models. | |
| """ | |
| def __init__( | |
| self, | |
| mcp_registry: MCPRegistry, | |
| model: str = None | |
| ): | |
| self.mcp_registry = mcp_registry | |
| self.model = model or os.getenv("OLLAMA_MODEL", DEFAULT_MODEL) | |
| self.tools_description = self._build_tools_description() | |
| logger.info(f"Ollama Agent initialized with model: {self.model}") | |
| def _build_tools_description(self) -> str: | |
| """Build tool descriptions for the system prompt""" | |
| tools_text = "" | |
| for tool in MCP_TOOLS: | |
| tools_text += f"\n- **{tool['name']}**: {tool['description']}" | |
| props = tool.get('input_schema', {}).get('properties', {}) | |
| required = tool.get('input_schema', {}).get('required', []) | |
| if props: | |
| tools_text += "\n Parameters:" | |
| for param, details in props.items(): | |
| req = "(required)" if param in required else "(optional)" | |
| tools_text += f"\n - {param} {req}: {details.get('description', '')}" | |
| return tools_text | |
| def _build_system_prompt(self) -> str: | |
| return f"""You are an AI sales agent with access to tools. | |
| AVAILABLE TOOLS: | |
| {self.tools_description} | |
| TO USE A TOOL, respond with JSON: | |
| ```json | |
| {{"tool": "tool_name", "parameters": {{"param1": "value1"}}}} | |
| ``` | |
| RULES: | |
| 1. Use search_web to find information | |
| 2. Use save_prospect, save_contact to store data | |
| 3. Use send_email to draft emails | |
| 4. Say "DONE" when finished with a summary | |
| Be concise.""" | |
| async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]: | |
| """Run the agent on a task""" | |
| yield { | |
| "type": "agent_start", | |
| "message": f"Starting with Ollama ({self.model})", | |
| "model": self.model | |
| } | |
| system_prompt = self._build_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": task} | |
| ] | |
| for iteration in range(1, max_iterations + 1): | |
| yield { | |
| "type": "iteration_start", | |
| "iteration": iteration, | |
| "message": f"Iteration {iteration}: Thinking..." | |
| } | |
| try: | |
| # Call Ollama using the Python client | |
| response = await self._call_ollama(messages) | |
| assistant_content = response.get("content", "") | |
| if not assistant_content: | |
| continue | |
| # Check for completion | |
| if "DONE" in assistant_content.upper(): | |
| final_text = assistant_content.replace("DONE", "").replace("done", "").strip() | |
| yield { | |
| "type": "thought", | |
| "thought": final_text, | |
| "message": "Task complete" | |
| } | |
| yield { | |
| "type": "agent_complete", | |
| "message": "Task complete!", | |
| "final_answer": final_text, | |
| "iterations": iteration | |
| } | |
| return | |
| # Parse tool calls | |
| tool_calls = self._parse_tool_calls(assistant_content) | |
| if tool_calls: | |
| messages.append({"role": "assistant", "content": assistant_content}) | |
| tool_results = [] | |
| for tool_call in tool_calls: | |
| tool_name = tool_call.get("tool", "") | |
| tool_params = tool_call.get("parameters", {}) | |
| yield { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "input": tool_params, | |
| "message": f"Calling: {tool_name}" | |
| } | |
| try: | |
| result = await self._execute_tool(tool_name, tool_params) | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": result, | |
| "message": f"{tool_name} completed" | |
| } | |
| tool_results.append({"tool": tool_name, "result": result}) | |
| except Exception as e: | |
| yield { | |
| "type": "tool_error", | |
| "tool": tool_name, | |
| "error": str(e) | |
| } | |
| tool_results.append({"tool": tool_name, "error": str(e)}) | |
| # Add results to conversation | |
| results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000] | |
| messages.append({"role": "user", "content": results_text}) | |
| else: | |
| # No tool calls | |
| yield { | |
| "type": "thought", | |
| "thought": assistant_content, | |
| "message": f"AI: {assistant_content[:100]}..." | |
| } | |
| messages.append({"role": "assistant", "content": assistant_content}) | |
| messages.append({"role": "user", "content": "Continue. Use tools to complete the task. Say DONE when finished."}) | |
| except Exception as e: | |
| logger.error(f"Error: {e}") | |
| yield { | |
| "type": "agent_error", | |
| "error": str(e), | |
| "message": f"Error: {e}" | |
| } | |
| return | |
| yield { | |
| "type": "agent_max_iterations", | |
| "message": f"Reached max iterations ({max_iterations})", | |
| "iterations": max_iterations | |
| } | |
| async def _call_ollama(self, messages: List[Dict]) -> Dict: | |
| """ | |
| Call Ollama using the official Python client. | |
| Uses ollama.chat() directly as per the guide: | |
| https://github.com/ollama/ollama-python | |
| Temperature=0.0 and top_p=1.0 recommended for Granite models. | |
| """ | |
| try: | |
| from ollama import chat, ResponseError | |
| except ImportError: | |
| raise ImportError("ollama package not installed. Run: pip install ollama") | |
| try: | |
| # Use ollama.chat() directly as shown in the guide | |
| # Run in executor to not block the async event loop | |
| loop = asyncio.get_event_loop() | |
| response = await loop.run_in_executor( | |
| None, | |
| lambda: chat( | |
| model=self.model, | |
| messages=messages, | |
| options={ | |
| "temperature": 0.0, # Deterministic output for tool calling | |
| "top_p": 1.0 # Full probability mass (Granite recommended) | |
| } | |
| ) | |
| ) | |
| # Extract response content: response.message.content | |
| content = "" | |
| if hasattr(response, 'message') and hasattr(response.message, 'content'): | |
| content = response.message.content | |
| elif isinstance(response, dict): | |
| content = response.get("message", {}).get("content", "") | |
| return {"content": content} | |
| except ResponseError as e: | |
| # Handle Ollama-specific errors (model not available, etc.) | |
| logger.error(f"Ollama ResponseError: {e}") | |
| raise Exception(f"Ollama error: {e}. Make sure Ollama is running and the model '{self.model}' is pulled.") | |
| except Exception as e: | |
| logger.error(f"Ollama call failed: {e}") | |
| raise Exception(f"Ollama error: {e}") | |
| def _parse_tool_calls(self, text: str) -> List[Dict]: | |
| """Parse tool calls from response""" | |
| import re | |
| tool_calls = [] | |
| patterns = [ | |
| r'```json\s*(\{[^`]+\})\s*```', | |
| r'```\s*(\{[^`]+\})\s*```', | |
| r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})', | |
| ] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| for match in matches: | |
| try: | |
| data = json.loads(match.strip()) | |
| if "tool" in data: | |
| tool_calls.append(data) | |
| except: | |
| continue | |
| return tool_calls | |
| async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: | |
| """Execute an MCP tool""" | |
| if tool_name == "search_web": | |
| query = tool_input.get("query", "") | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(query, max_results=max_results) | |
| return {"results": results[:max_results], "count": len(results[:max_results])} | |
| elif tool_name == "search_news": | |
| query = tool_input.get("query", "") | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) | |
| return {"results": results[:max_results], "count": len(results[:max_results])} | |
| elif tool_name == "save_prospect": | |
| prospect_data = { | |
| "id": tool_input.get("prospect_id", str(uuid.uuid4())), | |
| "company": { | |
| "id": tool_input.get("company_id"), | |
| "name": tool_input.get("company_name"), | |
| "domain": tool_input.get("company_domain") | |
| }, | |
| "fit_score": tool_input.get("fit_score", 0), | |
| "status": tool_input.get("status", "new"), | |
| "metadata": tool_input.get("metadata", {}) | |
| } | |
| result = await self.mcp_registry.store.save_prospect(prospect_data) | |
| return {"status": result, "prospect_id": prospect_data["id"]} | |
| elif tool_name == "save_company": | |
| company_data = { | |
| "id": tool_input.get("company_id", str(uuid.uuid4())), | |
| "name": tool_input.get("name", ""), | |
| "domain": tool_input.get("domain", ""), | |
| "industry": tool_input.get("industry"), | |
| "description": tool_input.get("description"), | |
| "employee_count": tool_input.get("employee_count") | |
| } | |
| result = await self.mcp_registry.store.save_company(company_data) | |
| return {"status": result, "company_id": company_data["id"]} | |
| elif tool_name == "save_contact": | |
| contact_data = { | |
| "id": tool_input.get("contact_id", str(uuid.uuid4())), | |
| "company_id": tool_input.get("company_id", ""), | |
| "email": tool_input.get("email", ""), | |
| "first_name": tool_input.get("first_name"), | |
| "last_name": tool_input.get("last_name"), | |
| "title": tool_input.get("title"), | |
| "seniority": tool_input.get("seniority") | |
| } | |
| result = await self.mcp_registry.store.save_contact(contact_data) | |
| return {"status": result, "contact_id": contact_data["id"]} | |
| elif tool_name == "save_fact": | |
| fact_data = { | |
| "id": tool_input.get("fact_id", str(uuid.uuid4())), | |
| "company_id": tool_input.get("company_id", ""), | |
| "fact_type": tool_input.get("fact_type", ""), | |
| "content": tool_input.get("content", ""), | |
| "source_url": tool_input.get("source_url"), | |
| "confidence_score": tool_input.get("confidence_score", 0.8) | |
| } | |
| result = await self.mcp_registry.store.save_fact(fact_data) | |
| return {"status": result, "fact_id": fact_data["id"]} | |
| elif tool_name == "send_email": | |
| to = tool_input.get("to", "") | |
| subject = tool_input.get("subject", "") | |
| body = tool_input.get("body", "") | |
| prospect_id = tool_input.get("prospect_id", "") | |
| thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) | |
| return {"status": "drafted", "thread_id": thread_id, "to": to} | |
| elif tool_name == "list_prospects": | |
| prospects = await self.mcp_registry.store.list_prospects() | |
| return {"prospects": prospects, "count": len(prospects)} | |
| elif tool_name == "get_prospect": | |
| prospect_id = tool_input.get("prospect_id", "") | |
| prospect = await self.mcp_registry.store.get_prospect(prospect_id) | |
| return prospect or {"error": "Not found"} | |
| elif tool_name == "suggest_meeting_slots": | |
| slots = await self.mcp_registry.calendar.suggest_slots() | |
| return {"slots": slots[:3], "count": len(slots[:3])} | |
| else: | |
| raise ValueError(f"Unknown tool: {tool_name}") | |