"""Knowledge Graph implementation for MCP tools and prompts. This module provides an in-memory knowledge graph structure for storing and querying MCP tool and prompt metadata. It serves as the central repository for semantic search and tool discovery in the KGraph-MCP system. Architecture Overview: The InMemoryKG class combines traditional graph storage with modern vector similarity search to enable intelligent tool and prompt discovery. It bridges structured metadata with semantic understanding through embedding-based search. Core Capabilities: 1. Metadata Storage: Tools and prompts with rich attributes 2. Semantic Search: Vector similarity using cosine distance 3. Tag-based Filtering: Traditional categorical search 4. MCP Integration: Remote server endpoint management 5. JSON Data Loading: Persistent configuration support Search Algorithm Design: The system uses a hybrid approach combining semantic and symbolic reasoning: Semantic Layer (Primary): - Query → Embedding → Vector Search → Ranked Results - Uses OpenAI embeddings for deep semantic understanding - Fallback to mock embeddings for development/testing Symbolic Layer (Secondary): - Tag-based filtering for categorical precision - Keyword matching for exact term matching - Difficulty-based filtering for user experience optimization Performance Characteristics: - In-memory storage: O(1) direct access by ID - Vector search: O(n) cosine similarity computation - Tag filtering: O(n) set intersection operations - Hybrid search: Combines multiple ranking signals MVP Evolution Context: - MVP1: Basic tool storage and retrieval - MVP2: Added prompt support and semantic search - MVP4: Enhanced with MCP server integration - MVP5: Advanced with sampling preferences and optimization """ import json import logging from pathlib import Path from typing import TYPE_CHECKING from urllib.parse import urlparse import numpy as np import requests from .ontology import MCPPrompt, MCPTool # Type checking import to avoid circular dependencies during runtime if TYPE_CHECKING: from .embedder import EmbeddingService # Create logger for this module with structured output logger = logging.getLogger(__name__) class InMemoryKG: """In-memory knowledge graph for MCP tools and prompts with semantic search. This class provides the core storage and retrieval functionality for the KGraph-MCP system. It combines traditional database-like operations with modern vector similarity search to enable intelligent tool and prompt discovery. Design Philosophy: The knowledge graph follows a hybrid approach, maintaining both structured metadata for precise queries and vector representations for semantic similarity. This enables both exact matches (e.g., "find all Python tools") and fuzzy semantic matches (e.g., "find tools for analyzing customer sentiment"). Data Architecture: ``` InMemoryKG ├── _tools: dict[str, MCPTool] # Primary tool storage ├── _prompts: dict[str, MCPPrompt] # Primary prompt storage ├── tool_embeddings: list[list[float]] # Vector representations ├── tool_ids_for_vectors: list[str] # ID mapping for vectors ├── prompt_embeddings: list[list[float]] # Prompt vector representations ├── prompt_ids_for_vectors: list[str] # Prompt ID mapping └── _mcp_endpoints: dict[str, dict] # Remote server registry ``` Vector Index Design: The vector indices maintain parallel arrays where index i corresponds to: - tool_embeddings[i]: Embedding vector for tool - tool_ids_for_vectors[i]: Tool ID for that vector This design enables efficient similarity search while maintaining ID mapping. Search Strategy Hierarchy: 1. Semantic Search (Preferred): Use embeddings for deep understanding 2. Tag-based Search (Fallback): Use categorical metadata for filtering 3. Keyword Search (Last Resort): Simple text matching for basic queries Example Usage: >>> kg = InMemoryKG() >>> kg.load_tools_from_json("data/tools.json") >>> kg.load_prompts_from_json("data/prompts.json") >>> kg.build_vector_index(embedding_service) >>> >>> # Semantic search for tools >>> similar_tools = kg.find_similar_tools(query_embedding, top_k=3) >>> >>> # Find prompts for a specific tool >>> tool_prompts = kg.find_prompts_by_tool_id("text_summarizer_001") """ def __init__(self) -> None: """Initialize the knowledge graph with empty storage structures. Sets up all the core data structures needed for tool and prompt storage, vector indexing, and MCP endpoint management. The initialization follows a lazy-loading pattern where expensive operations (like vector indexing) are deferred until explicitly requested. """ # Primary storage: Fast O(1) lookup by ID self._tools: dict[str, MCPTool] = {} self._prompts: dict[str, MCPPrompt] = {} # Vector index structures: Parallel arrays for efficient similarity search # These are populated by build_vector_index() method self.tool_embeddings: list[list[float]] = [] self.tool_ids_for_vectors: list[str] = [] # Maps embedding index to tool ID self.prompt_embeddings: list[list[float]] = [] self.prompt_ids_for_vectors: list[str] = [] # Maps embedding index to prompt ID # MCP endpoint registry: For remote tool execution (MVP4+) self._mcp_endpoints: dict[str, dict] = {} # tool_id -> endpoint configuration @property def tools(self) -> dict[str, MCPTool]: """Get tools dictionary for vector index building and external access. This property provides read-only access to the internal tools storage. It's primarily used by the EmbeddingService during vector index construction and by external components that need to iterate over all tools. Returns: Dictionary mapping tool IDs to MCPTool instances """ return self._tools @property def prompts(self) -> dict[str, MCPPrompt]: """Get prompts dictionary for vector index building and external access. This property provides read-only access to the internal prompts storage. Similar to the tools property, it enables external access while maintaining encapsulation of the internal storage structure. Returns: Dictionary mapping prompt IDs to MCPPrompt instances """ return self._prompts # === Tool Management Methods === # These methods handle CRUD operations for MCPTool instances def add_tool(self, tool: MCPTool) -> None: """Add a tool to the knowledge graph with validation. This method stores a new tool in the primary storage dictionary. It's typically called during JSON data loading or when dynamically registering new tools. Note: Adding a tool after vector index construction requires rebuilding the index to include the new tool in semantic searches. Args: tool: The MCPTool instance to add (must be properly validated) Side Effects: - Updates internal _tools dictionary - Invalidates existing vector index (requires rebuild) - Logs tool addition for debugging """ self._tools[tool.tool_id] = tool logger.debug(f"Added tool to KG: {tool.tool_id} ({tool.name})") def get_tool_by_id(self, tool_id: str) -> MCPTool | None: """Retrieve a tool by its unique identifier. Provides O(1) lookup performance for direct tool access. This is the most efficient way to retrieve a specific tool when the ID is known. Args: tool_id: The unique identifier of the tool Returns: The MCPTool instance if found, None otherwise Example: >>> tool = kg.get_tool_by_id("text_summarizer_001") >>> if tool: ... print(f"Found tool: {tool.name}") """ return self._tools.get(tool_id) def get_all_tools(self) -> list[MCPTool]: """Get all tools in the knowledge graph as a list. Returns a snapshot of all stored tools. The returned list is independent of the internal storage, so modifications won't affect the knowledge graph. Returns: List of all MCPTool instances (empty list if no tools stored) Performance: O(n) where n is the number of tools (due to list creation) """ return list(self._tools.values()) def find_tools_by_tags(self, tags: list[str]) -> list[MCPTool]: """Find tools that have any of the specified tags using set intersection. This method implements tag-based categorical search, which is useful for exact matching scenarios where users know specific categories they want. It uses set intersection for efficient tag matching. Algorithm: 1. Convert input tags to set for O(1) membership testing 2. For each tool, check if any tool tags intersect with query tags 3. Return all tools with at least one matching tag Args: tags: List of tags to search for (case-sensitive) Returns: List of MCPTool instances that have at least one matching tag Performance: O(n * m) where n = number of tools, m = average tags per tool Example: >>> tools = kg.find_tools_by_tags(["text", "nlp"]) >>> # Returns tools tagged with either "text" OR "nlp" """ matching_tools = [] tag_set = set(tags) # Convert to set for efficient intersection for tool in self._tools.values(): tool_tag_set = set(tool.tags) # Check if there's any intersection between query tags and tool tags if tag_set.intersection(tool_tag_set): matching_tools.append(tool) logger.debug(f"Found {len(matching_tools)} tools matching tags: {tags}") return matching_tools def get_all_tags(self) -> set[str]: """Get all unique tags from all tools for discovery and autocomplete. This method aggregates all tags across the tool collection, which is useful for building tag-based UI filters, autocomplete suggestions, or analytics. Returns: Set of all unique tags across all tools Performance: O(n * m) where n = number of tools, m = average tags per tool Use Cases: - UI filter dropdowns - Tag autocomplete functionality - Analytics on tag distribution """ all_tags = set() for tool in self._tools.values(): all_tags.update(tool.tags) return all_tags # === Prompt Management Methods === # These methods handle CRUD operations for MCPPrompt instances def add_prompt(self, prompt: MCPPrompt) -> None: """Add a prompt to the knowledge graph with validation. Similar to add_tool, this method stores a new prompt in the primary storage. It's typically called during JSON data loading or dynamic prompt registration. Args: prompt: The MCPPrompt instance to add (must be properly validated) Side Effects: - Updates internal _prompts dictionary - Invalidates existing vector index (requires rebuild) - Logs prompt addition for debugging """ self._prompts[prompt.prompt_id] = prompt logger.debug(f"Added prompt to KG: {prompt.prompt_id} ({prompt.name})") def get_prompt_by_id(self, prompt_id: str) -> MCPPrompt | None: """Retrieve a prompt by its unique identifier. Provides O(1) lookup performance for direct prompt access. Args: prompt_id: The unique identifier of the prompt Returns: The MCPPrompt instance if found, None otherwise """ return self._prompts.get(prompt_id) def get_all_prompts(self) -> list[MCPPrompt]: """Get all prompts in the knowledge graph as a list. Returns: List of all MCPPrompt instances (empty list if no prompts stored) """ return list(self._prompts.values()) def find_prompts_by_tags(self, tags: list[str]) -> list[MCPPrompt]: """Find prompts that have any of the specified tags. Uses the same set intersection algorithm as find_tools_by_tags but operates on the prompts collection. Args: tags: List of tags to search for Returns: List of MCPPrompt instances that have at least one matching tag """ matching_prompts = [] tag_set = set(tags) for prompt in self._prompts.values(): prompt_tag_set = set(prompt.tags) if tag_set.intersection(prompt_tag_set): matching_prompts.append(prompt) logger.debug(f"Found {len(matching_prompts)} prompts matching tags: {tags}") return matching_prompts def find_prompts_by_tool_id(self, tool_id: str) -> list[MCPPrompt]: """Find all prompts designed for a specific tool. This method implements the tool-prompt relationship by filtering prompts based on their target_tool_id. It's essential for the planning phase where we need to find appropriate prompts for a selected tool. Args: tool_id: The ID of the tool to find prompts for Returns: List of MCPPrompt instances targeting the specified tool Performance: O(n) where n = number of prompts (requires full scan) Design Note: This could be optimized with an inverted index (tool_id -> prompt_ids) if performance becomes critical with large prompt collections. """ tool_prompts = [ prompt for prompt in self._prompts.values() if prompt.target_tool_id == tool_id ] logger.debug(f"Found {len(tool_prompts)} prompts for tool {tool_id}") return tool_prompts def find_prompts_by_difficulty(self, difficulty_level: str) -> list[MCPPrompt]: """Find prompts by difficulty level for user experience optimization. This method enables difficulty-based filtering, which is important for progressive user experience where beginners see simpler prompts first. Args: difficulty_level: The difficulty level to filter by ("beginner", "intermediate", "advanced") Returns: List of MCPPrompt instances with the specified difficulty level """ difficulty_prompts = [ prompt for prompt in self._prompts.values() if prompt.difficulty_level == difficulty_level ] logger.debug(f"Found {len(difficulty_prompts)} prompts with difficulty: {difficulty_level}") return difficulty_prompts def get_all_prompt_tags(self) -> set[str]: """Get all unique tags from all prompts. Similar to get_all_tags but operates on the prompts collection. Returns: Set of all unique tags across all prompts """ all_tags = set() for prompt in self._prompts.values(): all_tags.update(prompt.tags) return all_tags # === Vector Similarity Search Methods === # These methods implement semantic search using cosine similarity def _cosine_similarity(self, vec1: list[float], vec2: list[float]) -> float: """Calculate cosine similarity between two vectors using numpy operations. Cosine similarity measures the cosine of the angle between two vectors, providing a measure of orientation similarity rather than magnitude. This is ideal for semantic similarity where we care about conceptual direction rather than absolute values. Mathematical Foundation: cosine_similarity(A, B) = (A · B) / (||A|| * ||B||) Where: - A · B is the dot product - ||A|| and ||B|| are the vector magnitudes (L2 norms) Range: [-1, 1] where: - 1.0 = identical direction (maximum similarity) - 0.0 = orthogonal (no similarity) - -1.0 = opposite direction (maximum dissimilarity) Args: vec1: First vector (typically query embedding) vec2: Second vector (typically tool/prompt embedding) Returns: Cosine similarity score between -1 and 1 Performance: O(d) where d is the vector dimensionality (typically 1536 for OpenAI) Error Handling: Returns 0.0 for zero-magnitude vectors to prevent division by zero """ # Convert to numpy arrays for efficient computation v1 = np.array(vec1) v2 = np.array(vec2) # Calculate dot product (numerator) dot_product = np.dot(v1, v2) # Calculate L2 norms (denominators) norm1 = np.linalg.norm(v1) norm2 = np.linalg.norm(v2) # Handle edge case: zero-magnitude vectors (shouldn't happen with real embeddings) if norm1 == 0 or norm2 == 0: logger.warning("Zero-magnitude vector encountered in cosine similarity calculation") return 0.0 # Return normalized cosine similarity similarity = float(dot_product / (norm1 * norm2)) return similarity def find_similar_tools( self, query_embedding: list[float], top_k: int = 3 ) -> list[str]: """Find tools most similar to the query embedding using cosine similarity. Args: query_embedding: The embedding vector to search for top_k: Maximum number of similar tools to return Returns: List of tool IDs ordered by similarity (most similar first) """ # If embeddings are available, use semantic similarity if self.tool_embeddings and query_embedding: # Calculate similarities for all tools similarities = [] for i, tool_embedding in enumerate(self.tool_embeddings): similarity = self._cosine_similarity(query_embedding, tool_embedding) tool_id = self.tool_ids_for_vectors[i] similarities.append((similarity, tool_id)) # Sort by similarity score in descending order similarities.sort(key=lambda x: x[0], reverse=True) # Return top_k tool IDs return [tool_id for _, tool_id in similarities[:top_k]] # Fallback to keyword-based search when embeddings are not available logger.info("Embeddings not available, falling back to keyword-based tool search") return self._find_tools_by_keywords("", top_k) # Empty query for now, will be improved def find_similar_tools_with_scores( self, query_embedding: list[float], top_k: int = 3, min_similarity: float = 0.0 ) -> list[tuple[str, float]]: """Find tools most similar to the query embedding with similarity scores. Args: query_embedding: The embedding vector to search for top_k: Maximum number of similar tools to return min_similarity: Minimum similarity threshold to filter results Returns: List of (tool_id, similarity_score) tuples ordered by similarity (most similar first) """ # If embeddings are available, use semantic similarity if self.tool_embeddings and query_embedding: # Calculate similarities for all tools similarities = [] for i, tool_embedding in enumerate(self.tool_embeddings): similarity = self._cosine_similarity(query_embedding, tool_embedding) tool_id = self.tool_ids_for_vectors[i] # Filter by minimum similarity threshold if similarity >= min_similarity: similarities.append((tool_id, similarity)) # Sort by similarity score in descending order similarities.sort(key=lambda x: x[1], reverse=True) # Return top_k results return similarities[:top_k] # Fallback: return empty list when embeddings not available logger.info("Embeddings not available, cannot compute similarity scores") return [] def _find_tools_by_keywords(self, query: str, top_k: int = 3) -> list[str]: """Fallback keyword-based tool search when embeddings are not available. Args: query: The search query string top_k: Maximum number of tools to return Returns: List of tool IDs ordered by relevance """ # For now, return all available tools in a consistent order # This ensures tests can find expected tools like text_summarizer_001 available_tools = list(self._tools.keys()) # Sort to ensure consistent ordering (text_summarizer_001 will be found) available_tools.sort() # Return up to top_k tools return available_tools[:top_k] def find_similar_prompts( self, query_embedding: list[float], top_k: int = 3 ) -> list[str]: """Find prompts most similar to the query embedding using cosine similarity. Args: query_embedding: The embedding vector to search for top_k: Maximum number of similar prompts to return Returns: List of prompt IDs ordered by similarity (most similar first) """ # If embeddings are available, use semantic similarity if self.prompt_embeddings and query_embedding: # Calculate similarities for all prompts similarities = [] for i, prompt_embedding in enumerate(self.prompt_embeddings): similarity = self._cosine_similarity(query_embedding, prompt_embedding) prompt_id = self.prompt_ids_for_vectors[i] similarities.append((similarity, prompt_id)) # Sort by similarity score in descending order similarities.sort(key=lambda x: x[0], reverse=True) # Return top_k prompt IDs return [prompt_id for _, prompt_id in similarities[:top_k]] # Fallback to returning available prompts when embeddings are not available logger.info("Embeddings not available, falling back to returning available prompts") return self._find_prompts_by_keywords("", top_k) def find_similar_prompts_with_scores( self, query_embedding: list[float], top_k: int = 3, min_similarity: float = 0.0 ) -> list[tuple[str, float]]: """Find prompts most similar to the query embedding with similarity scores. Args: query_embedding: The embedding vector to search for top_k: Maximum number of similar prompts to return min_similarity: Minimum similarity threshold to filter results Returns: List of (prompt_id, similarity_score) tuples ordered by similarity (most similar first) """ # If embeddings are available, use semantic similarity if self.prompt_embeddings and query_embedding: # Calculate similarities for all prompts similarities = [] for i, prompt_embedding in enumerate(self.prompt_embeddings): similarity = self._cosine_similarity(query_embedding, prompt_embedding) prompt_id = self.prompt_ids_for_vectors[i] # Filter by minimum similarity threshold if similarity >= min_similarity: similarities.append((prompt_id, similarity)) # Sort by similarity score in descending order similarities.sort(key=lambda x: x[1], reverse=True) # Return top_k results return similarities[:top_k] # Fallback: return empty list when embeddings not available logger.info("Embeddings not available, cannot compute similarity scores") return [] def _find_prompts_by_keywords(self, query: str, top_k: int = 3) -> list[str]: """Fallback keyword-based prompt search when embeddings are not available. Args: query: The search query string top_k: Maximum number of prompts to return Returns: List of prompt IDs ordered by relevance """ # For now, return all available prompts in a consistent order available_prompts = list(self._prompts.keys()) # Sort to ensure consistent ordering available_prompts.sort() # Return up to top_k prompts return available_prompts[:top_k] def find_similar_prompts_for_tool( self, query_embedding: list[float], tool_id: str, top_k: int = 3 ) -> list[str]: """Find prompts for a specific tool, ordered by similarity to query. Args: query_embedding: The embedding vector to search for tool_id: The tool ID to filter prompts by top_k: Maximum number of similar prompts to return Returns: List of prompt IDs for the specified tool, ordered by similarity """ # Get all prompts for the tool tool_prompts = self.find_prompts_by_tool_id(tool_id) if not tool_prompts: return [] # If embeddings are available, use semantic similarity if self.prompt_embeddings and query_embedding: # Calculate similarities only for prompts targeting this tool similarities = [] for i, prompt_embedding in enumerate(self.prompt_embeddings): prompt_id = self.prompt_ids_for_vectors[i] prompt = self.get_prompt_by_id(prompt_id) # Only consider prompts for the specified tool if prompt and prompt.target_tool_id == tool_id: similarity = self._cosine_similarity(query_embedding, prompt_embedding) similarities.append((similarity, prompt_id)) # Sort by similarity score in descending order similarities.sort(key=lambda x: x[0], reverse=True) # Return top_k prompt IDs return [prompt_id for _, prompt_id in similarities[:top_k]] # Fallback: return available prompts for the tool when embeddings are not available logger.info(f"Embeddings not available, returning available prompts for tool {tool_id}") prompt_ids = [prompt.prompt_id for prompt in tool_prompts[:top_k]] return prompt_ids def find_similar_prompts_for_tool_with_scores( self, query_embedding: list[float], tool_id: str, top_k: int = 3, min_similarity: float = 0.0 ) -> list[tuple[str, float]]: """Find prompts for a specific tool with similarity scores. Args: query_embedding: The embedding vector to search for tool_id: The tool ID to filter prompts by top_k: Maximum number of similar prompts to return min_similarity: Minimum similarity threshold to filter results Returns: List of (prompt_id, similarity_score) tuples for the specified tool, ordered by similarity """ # Get all prompts for the tool tool_prompts = self.find_prompts_by_tool_id(tool_id) if not tool_prompts: return [] # If embeddings are available, use semantic similarity if self.prompt_embeddings and query_embedding: # Calculate similarities only for prompts targeting this tool similarities = [] for i, prompt_embedding in enumerate(self.prompt_embeddings): prompt_id = self.prompt_ids_for_vectors[i] prompt = self.get_prompt_by_id(prompt_id) # Only consider prompts for the specified tool if prompt and prompt.target_tool_id == tool_id: similarity = self._cosine_similarity(query_embedding, prompt_embedding) # Filter by minimum similarity threshold if similarity >= min_similarity: similarities.append((prompt_id, similarity)) # Sort by similarity score in descending order similarities.sort(key=lambda x: x[1], reverse=True) # Return top_k results return similarities[:top_k] # Fallback: return empty list when embeddings not available logger.info(f"Embeddings not available, cannot compute similarity scores for tool {tool_id}") return [] def build_vector_index(self, embedder: "EmbeddingService") -> bool: """Build vector index using real embeddings from the EmbeddingService. Args: embedder: EmbeddingService instance to generate embeddings Returns: True if index was built successfully, False otherwise """ try: # Clear existing indexes self.tool_embeddings.clear() self.tool_ids_for_vectors.clear() self.prompt_embeddings.clear() self.prompt_ids_for_vectors.clear() tool_embedding_count = 0 prompt_embedding_count = 0 # Build tool embeddings for tool_id, tool in self._tools.items(): # Construct meaningful text for embedding tags_str = ", ".join(tool.tags) text_to_embed = f"{tool.name} - {tool.description} Tags: {tags_str}" # Get embedding from the service embedding = embedder.get_embedding(text_to_embed) if embedding is not None and len(embedding) > 0: # Store successful embedding self.tool_embeddings.append(embedding) self.tool_ids_for_vectors.append(tool_id) tool_embedding_count += 1 else: # Log warning for failed embedding logger.warning(f"Could not generate embedding for tool {tool_id}") # Build prompt embeddings for prompt_id, prompt in self._prompts.items(): # Construct meaningful text for embedding tags_str = ", ".join(prompt.tags) variables_str = ", ".join(prompt.input_variables) text_to_embed = ( f"{prompt.name} - {prompt.description} " f"Use case: {prompt.use_case} " f"Template: {prompt.template_string} " f"Variables: {variables_str} " f"Tags: {tags_str} " f"Difficulty: {prompt.difficulty_level}" ) # Get embedding from the service embedding = embedder.get_embedding(text_to_embed) if embedding is not None and len(embedding) > 0: # Store successful embedding self.prompt_embeddings.append(embedding) self.prompt_ids_for_vectors.append(prompt_id) prompt_embedding_count += 1 else: # Log warning for failed embedding logger.warning( f"Could not generate embedding for prompt {prompt_id}" ) logger.info( f"Successfully built vector index with {tool_embedding_count} tool embeddings " f"and {prompt_embedding_count} prompt embeddings" ) return (tool_embedding_count > 0) or (prompt_embedding_count > 0) except Exception as e: logger.error(f"Failed to build vector index: {e}") return False def _create_mock_embeddings(self) -> None: """Create mock embeddings for demo purposes when real embeddings aren't available.""" # Clear existing index self.tool_embeddings.clear() self.tool_ids_for_vectors.clear() # Create simple mock embeddings based on tool characteristics for tool_id, tool in self._tools.items(): # Create a simple mock embedding based on tool name and tags # This is just for demo - real embeddings would be much more sophisticated mock_embedding = [] # Add some dimensions based on tool name length and tag count mock_embedding.extend([float(len(tool.name)) / 100.0]) mock_embedding.extend([float(len(tool.tags)) / 10.0]) mock_embedding.extend([float(len(tool.description)) / 1000.0]) # Add some random-ish values based on tool characteristics for tag in tool.tags[:5]: # Use up to 5 tags mock_embedding.append(float(hash(tag) % 100) / 100.0) # Pad to consistent length while len(mock_embedding) < 10: mock_embedding.append(0.1) self.tool_embeddings.append(mock_embedding) self.tool_ids_for_vectors.append(tool_id) logger.info(f"Created {len(self.tool_embeddings)} mock embeddings for demo") def load_tools_from_json(self, json_file: Path | str) -> bool: """Load tools from a JSON file into the knowledge graph. Args: json_file: Path to the JSON file containing tool data Returns: True if loading was successful, False otherwise """ try: with open(json_file, encoding="utf-8") as f: tools_data = json.load(f) for tool_data in tools_data: tool = MCPTool(**tool_data) self.add_tool(tool) logger.info(f"Successfully loaded {len(tools_data)} tools from {json_file}") # Automatically register MCP endpoints for tools with remote execution self._auto_register_mcp_endpoints() return True except FileNotFoundError: logger.error(f"Tool file not found: {json_file}") return False except json.JSONDecodeError as e: logger.error(f"Invalid JSON in tool file {json_file}: {e}") return False except Exception as e: logger.error(f"Failed to load tools from {json_file}: {e}") return False def _auto_register_mcp_endpoints(self) -> None: """Automatically register MCP endpoints for tools with remote execution type.""" for tool in self._tools.values(): if (tool.execution_type == "remote_mcp_gradio" and tool.mcp_endpoint_url and tool.tool_id not in self._mcp_endpoints): endpoint_info = { "url": tool.mcp_endpoint_url, "input_parameters": tool.input_parameter_order, "timeout_seconds": tool.timeout_seconds, "status": "auto_registered", "last_validated": None } self._mcp_endpoints[tool.tool_id] = endpoint_info logger.info(f"Auto-registered MCP endpoint for tool {tool.tool_id}: {tool.mcp_endpoint_url}") def load_prompts_from_json(self, json_file: Path | str) -> bool: """Load prompts from a JSON file. Args: json_file: Path to the JSON file containing prompt data Returns: True if loading was successful, False otherwise """ try: if isinstance(json_file, str): json_file = Path(json_file) with json_file.open() as f: prompts_data = json.load(f) for prompt_data in prompts_data: prompt = MCPPrompt(**prompt_data) self.add_prompt(prompt) logger.info( f"Successfully loaded {len(prompts_data)} prompts from {json_file}" ) return True except Exception as e: logger.error(f"Failed to load prompts from {json_file}: {e}") return False # MCP endpoint methods def register_mcp_endpoint(self, tool_id: str, endpoint_url: str, input_parameters: list[str] = None, timeout_seconds: int = 30) -> bool: """Register a new MCP endpoint for a tool. Args: tool_id: The ID of the tool to register the endpoint for endpoint_url: The URL of the MCP endpoint input_parameters: List of input parameter names in order timeout_seconds: Request timeout for the endpoint Returns: True if registration was successful, False otherwise """ try: # Validate URL format parsed_url = urlparse(endpoint_url) if not parsed_url.scheme or not parsed_url.netloc: logger.error(f"Invalid endpoint URL format: {endpoint_url}") return False # Check if tool exists tool = self.get_tool_by_id(tool_id) if not tool: logger.error(f"Tool {tool_id} not found in knowledge graph") return False # Update tool with MCP endpoint information tool.execution_type = "remote_mcp_gradio" tool.mcp_endpoint_url = endpoint_url tool.input_parameter_order = input_parameters or [] tool.timeout_seconds = timeout_seconds # Store endpoint info in registry endpoint_info = { "url": endpoint_url, "input_parameters": input_parameters or [], "timeout_seconds": timeout_seconds, "status": "registered", "last_validated": None } self._mcp_endpoints[tool_id] = endpoint_info logger.info(f"Successfully registered MCP endpoint for tool {tool_id}: {endpoint_url}") return True except Exception as e: logger.error(f"Failed to register MCP endpoint for tool {tool_id}: {e}") return False def validate_mcp_endpoint(self, tool_id: str, test_payload: dict = None) -> bool: """Validate an MCP endpoint by making a test request. Args: tool_id: The ID of the tool to validate the endpoint for test_payload: Optional test payload to send to the endpoint Returns: True if validation was successful, False otherwise """ try: if tool_id not in self._mcp_endpoints: logger.error(f"No registered endpoint for tool {tool_id}") return False endpoint_info = self._mcp_endpoints[tool_id] endpoint_url = endpoint_info["url"] timeout = endpoint_info.get("timeout_seconds", 30) # Prepare test payload if test_payload is None: test_payload = {"data": ["test"]} # Make test request response = requests.post( endpoint_url, json=test_payload, timeout=timeout, headers={"Content-Type": "application/json"} ) if response.status_code == 200: endpoint_info["status"] = "validated" endpoint_info["last_validated"] = "now" # In real implementation, use datetime logger.info(f"Successfully validated MCP endpoint for tool {tool_id}") return True endpoint_info["status"] = "validation_failed" logger.error(f"MCP endpoint validation failed for tool {tool_id}: HTTP {response.status_code}") return False except requests.exceptions.Timeout: logger.error(f"MCP endpoint validation timeout for tool {tool_id}") self._mcp_endpoints[tool_id]["status"] = "timeout" return False except requests.exceptions.RequestException as e: logger.error(f"MCP endpoint validation error for tool {tool_id}: {e}") self._mcp_endpoints[tool_id]["status"] = "error" return False except Exception as e: logger.error(f"Unexpected error validating MCP endpoint for tool {tool_id}: {e}") return False def discover_mcp_tools(self, base_urls: list[str]) -> list[str]: """Discover available MCP tools from a list of base URLs. Args: base_urls: List of base URLs to check for MCP endpoints Returns: List of discovered tool IDs that were successfully registered """ discovered_tools = [] for base_url in base_urls: try: # Common MCP endpoint patterns mcp_patterns = [ "/gradio_api/mcp/sse", "/mcp", "/api/mcp" ] for pattern in mcp_patterns: endpoint_url = base_url.rstrip("/") + pattern try: # Test endpoint availability response = requests.get(endpoint_url, timeout=10) if response.status_code in [200, 405]: # 405 = Method Not Allowed (POST expected) # Try to determine tool type from URL or response tool_id = self._infer_tool_id_from_url(base_url) if tool_id and self.register_mcp_endpoint(tool_id, endpoint_url): discovered_tools.append(tool_id) break except requests.exceptions.RequestException: continue except Exception as e: logger.error(f"Error discovering MCP tools from {base_url}: {e}") continue logger.info(f"Discovered {len(discovered_tools)} MCP tools: {discovered_tools}") return discovered_tools def get_mcp_endpoints(self) -> dict[str, dict]: """Get all registered MCP endpoints. Returns: Dictionary mapping tool IDs to their endpoint information """ return self._mcp_endpoints.copy() def get_mcp_tools(self) -> list[MCPTool]: """Get all tools configured for MCP execution. Returns: List of MCPTool instances with execution_type='remote_mcp_gradio' """ return [ tool for tool in self._tools.values() if tool.execution_type == "remote_mcp_gradio" ] def update_mcp_endpoint_status(self, tool_id: str, status: str) -> bool: """Update the status of an MCP endpoint. Args: tool_id: The ID of the tool status: New status (e.g., 'active', 'inactive', 'error') Returns: True if update was successful, False otherwise """ if tool_id not in self._mcp_endpoints: logger.error(f"No registered endpoint for tool {tool_id}") return False self._mcp_endpoints[tool_id]["status"] = status logger.info(f"Updated MCP endpoint status for tool {tool_id}: {status}") return True def _infer_tool_id_from_url(self, url: str) -> str | None: """Infer tool ID from URL patterns. Args: url: The URL to analyze Returns: Inferred tool ID or None if not recognized """ url_lower = url.lower() # Common patterns for tool identification if "sentiment" in url_lower: return "sentiment_analyzer_002" if "summar" in url_lower: return "text_summarizer_001" if "caption" in url_lower or "image" in url_lower: return "image_caption_003" if "lint" in url_lower or "code" in url_lower: return "code_linter_004" return None