|
|
"""Knowledge Graph implementation for MCP tools and prompts. |
|
|
|
|
|
This module provides an in-memory knowledge graph structure for storing and querying |
|
|
MCP tool and prompt metadata. It serves as the central repository for semantic search |
|
|
and tool discovery in the KGraph-MCP system. |
|
|
|
|
|
Architecture Overview: |
|
|
The InMemoryKG class combines traditional graph storage with modern vector similarity |
|
|
search to enable intelligent tool and prompt discovery. It bridges structured metadata |
|
|
with semantic understanding through embedding-based search. |
|
|
|
|
|
Core Capabilities: |
|
|
1. Metadata Storage: Tools and prompts with rich attributes |
|
|
2. Semantic Search: Vector similarity using cosine distance |
|
|
3. Tag-based Filtering: Traditional categorical search |
|
|
4. MCP Integration: Remote server endpoint management |
|
|
5. JSON Data Loading: Persistent configuration support |
|
|
|
|
|
Search Algorithm Design: |
|
|
The system uses a hybrid approach combining semantic and symbolic reasoning: |
|
|
|
|
|
Semantic Layer (Primary): |
|
|
- Query β Embedding β Vector Search β Ranked Results |
|
|
- Uses OpenAI embeddings for deep semantic understanding |
|
|
- Fallback to mock embeddings for development/testing |
|
|
|
|
|
Symbolic Layer (Secondary): |
|
|
- Tag-based filtering for categorical precision |
|
|
- Keyword matching for exact term matching |
|
|
- Difficulty-based filtering for user experience optimization |
|
|
|
|
|
Performance Characteristics: |
|
|
- In-memory storage: O(1) direct access by ID |
|
|
- Vector search: O(n) cosine similarity computation |
|
|
- Tag filtering: O(n) set intersection operations |
|
|
- Hybrid search: Combines multiple ranking signals |
|
|
|
|
|
MVP Evolution Context: |
|
|
- MVP1: Basic tool storage and retrieval |
|
|
- MVP2: Added prompt support and semantic search |
|
|
- MVP4: Enhanced with MCP server integration |
|
|
- MVP5: Advanced with sampling preferences and optimization |
|
|
""" |
|
|
|
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import TYPE_CHECKING |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
import numpy as np |
|
|
import requests |
|
|
|
|
|
from .ontology import MCPPrompt, MCPTool |
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from .embedder import EmbeddingService |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class InMemoryKG: |
|
|
"""In-memory knowledge graph for MCP tools and prompts with semantic search. |
|
|
|
|
|
This class provides the core storage and retrieval functionality for the KGraph-MCP |
|
|
system. It combines traditional database-like operations with modern vector similarity |
|
|
search to enable intelligent tool and prompt discovery. |
|
|
|
|
|
Design Philosophy: |
|
|
The knowledge graph follows a hybrid approach, maintaining both structured |
|
|
metadata for precise queries and vector representations for semantic similarity. |
|
|
This enables both exact matches (e.g., "find all Python tools") and fuzzy |
|
|
semantic matches (e.g., "find tools for analyzing customer sentiment"). |
|
|
|
|
|
Data Architecture: |
|
|
``` |
|
|
InMemoryKG |
|
|
βββ _tools: dict[str, MCPTool] # Primary tool storage |
|
|
βββ _prompts: dict[str, MCPPrompt] # Primary prompt storage |
|
|
βββ tool_embeddings: list[list[float]] # Vector representations |
|
|
βββ tool_ids_for_vectors: list[str] # ID mapping for vectors |
|
|
βββ prompt_embeddings: list[list[float]] # Prompt vector representations |
|
|
βββ prompt_ids_for_vectors: list[str] # Prompt ID mapping |
|
|
βββ _mcp_endpoints: dict[str, dict] # Remote server registry |
|
|
``` |
|
|
|
|
|
Vector Index Design: |
|
|
The vector indices maintain parallel arrays where index i corresponds to: |
|
|
- tool_embeddings[i]: Embedding vector for tool |
|
|
- tool_ids_for_vectors[i]: Tool ID for that vector |
|
|
|
|
|
This design enables efficient similarity search while maintaining ID mapping. |
|
|
|
|
|
Search Strategy Hierarchy: |
|
|
1. Semantic Search (Preferred): Use embeddings for deep understanding |
|
|
2. Tag-based Search (Fallback): Use categorical metadata for filtering |
|
|
3. Keyword Search (Last Resort): Simple text matching for basic queries |
|
|
|
|
|
Example Usage: |
|
|
>>> kg = InMemoryKG() |
|
|
>>> kg.load_tools_from_json("data/tools.json") |
|
|
>>> kg.load_prompts_from_json("data/prompts.json") |
|
|
>>> kg.build_vector_index(embedding_service) |
|
|
>>> |
|
|
>>> # Semantic search for tools |
|
|
>>> similar_tools = kg.find_similar_tools(query_embedding, top_k=3) |
|
|
>>> |
|
|
>>> # Find prompts for a specific tool |
|
|
>>> tool_prompts = kg.find_prompts_by_tool_id("text_summarizer_001") |
|
|
""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
"""Initialize the knowledge graph with empty storage structures. |
|
|
|
|
|
Sets up all the core data structures needed for tool and prompt storage, |
|
|
vector indexing, and MCP endpoint management. The initialization follows |
|
|
a lazy-loading pattern where expensive operations (like vector indexing) |
|
|
are deferred until explicitly requested. |
|
|
""" |
|
|
|
|
|
self._tools: dict[str, MCPTool] = {} |
|
|
self._prompts: dict[str, MCPPrompt] = {} |
|
|
|
|
|
|
|
|
|
|
|
self.tool_embeddings: list[list[float]] = [] |
|
|
self.tool_ids_for_vectors: list[str] = [] |
|
|
self.prompt_embeddings: list[list[float]] = [] |
|
|
self.prompt_ids_for_vectors: list[str] = [] |
|
|
|
|
|
|
|
|
self._mcp_endpoints: dict[str, dict] = {} |
|
|
|
|
|
@property |
|
|
def tools(self) -> dict[str, MCPTool]: |
|
|
"""Get tools dictionary for vector index building and external access. |
|
|
|
|
|
This property provides read-only access to the internal tools storage. |
|
|
It's primarily used by the EmbeddingService during vector index construction |
|
|
and by external components that need to iterate over all tools. |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping tool IDs to MCPTool instances |
|
|
""" |
|
|
return self._tools |
|
|
|
|
|
@property |
|
|
def prompts(self) -> dict[str, MCPPrompt]: |
|
|
"""Get prompts dictionary for vector index building and external access. |
|
|
|
|
|
This property provides read-only access to the internal prompts storage. |
|
|
Similar to the tools property, it enables external access while maintaining |
|
|
encapsulation of the internal storage structure. |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping prompt IDs to MCPPrompt instances |
|
|
""" |
|
|
return self._prompts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_tool(self, tool: MCPTool) -> None: |
|
|
"""Add a tool to the knowledge graph with validation. |
|
|
|
|
|
This method stores a new tool in the primary storage dictionary. It's typically |
|
|
called during JSON data loading or when dynamically registering new tools. |
|
|
|
|
|
Note: Adding a tool after vector index construction requires rebuilding the |
|
|
index to include the new tool in semantic searches. |
|
|
|
|
|
Args: |
|
|
tool: The MCPTool instance to add (must be properly validated) |
|
|
|
|
|
Side Effects: |
|
|
- Updates internal _tools dictionary |
|
|
- Invalidates existing vector index (requires rebuild) |
|
|
- Logs tool addition for debugging |
|
|
""" |
|
|
self._tools[tool.tool_id] = tool |
|
|
logger.debug(f"Added tool to KG: {tool.tool_id} ({tool.name})") |
|
|
|
|
|
def get_tool_by_id(self, tool_id: str) -> MCPTool | None: |
|
|
"""Retrieve a tool by its unique identifier. |
|
|
|
|
|
Provides O(1) lookup performance for direct tool access. This is the most |
|
|
efficient way to retrieve a specific tool when the ID is known. |
|
|
|
|
|
Args: |
|
|
tool_id: The unique identifier of the tool |
|
|
|
|
|
Returns: |
|
|
The MCPTool instance if found, None otherwise |
|
|
|
|
|
Example: |
|
|
>>> tool = kg.get_tool_by_id("text_summarizer_001") |
|
|
>>> if tool: |
|
|
... print(f"Found tool: {tool.name}") |
|
|
""" |
|
|
return self._tools.get(tool_id) |
|
|
|
|
|
def get_all_tools(self) -> list[MCPTool]: |
|
|
"""Get all tools in the knowledge graph as a list. |
|
|
|
|
|
Returns a snapshot of all stored tools. The returned list is independent |
|
|
of the internal storage, so modifications won't affect the knowledge graph. |
|
|
|
|
|
Returns: |
|
|
List of all MCPTool instances (empty list if no tools stored) |
|
|
|
|
|
Performance: |
|
|
O(n) where n is the number of tools (due to list creation) |
|
|
""" |
|
|
return list(self._tools.values()) |
|
|
|
|
|
def find_tools_by_tags(self, tags: list[str]) -> list[MCPTool]: |
|
|
"""Find tools that have any of the specified tags using set intersection. |
|
|
|
|
|
This method implements tag-based categorical search, which is useful for |
|
|
exact matching scenarios where users know specific categories they want. |
|
|
It uses set intersection for efficient tag matching. |
|
|
|
|
|
Algorithm: |
|
|
1. Convert input tags to set for O(1) membership testing |
|
|
2. For each tool, check if any tool tags intersect with query tags |
|
|
3. Return all tools with at least one matching tag |
|
|
|
|
|
Args: |
|
|
tags: List of tags to search for (case-sensitive) |
|
|
|
|
|
Returns: |
|
|
List of MCPTool instances that have at least one matching tag |
|
|
|
|
|
Performance: |
|
|
O(n * m) where n = number of tools, m = average tags per tool |
|
|
|
|
|
Example: |
|
|
>>> tools = kg.find_tools_by_tags(["text", "nlp"]) |
|
|
>>> # Returns tools tagged with either "text" OR "nlp" |
|
|
""" |
|
|
matching_tools = [] |
|
|
tag_set = set(tags) |
|
|
|
|
|
for tool in self._tools.values(): |
|
|
tool_tag_set = set(tool.tags) |
|
|
|
|
|
if tag_set.intersection(tool_tag_set): |
|
|
matching_tools.append(tool) |
|
|
|
|
|
logger.debug(f"Found {len(matching_tools)} tools matching tags: {tags}") |
|
|
return matching_tools |
|
|
|
|
|
def get_all_tags(self) -> set[str]: |
|
|
"""Get all unique tags from all tools for discovery and autocomplete. |
|
|
|
|
|
This method aggregates all tags across the tool collection, which is useful |
|
|
for building tag-based UI filters, autocomplete suggestions, or analytics. |
|
|
|
|
|
Returns: |
|
|
Set of all unique tags across all tools |
|
|
|
|
|
Performance: |
|
|
O(n * m) where n = number of tools, m = average tags per tool |
|
|
|
|
|
Use Cases: |
|
|
- UI filter dropdowns |
|
|
- Tag autocomplete functionality |
|
|
- Analytics on tag distribution |
|
|
""" |
|
|
all_tags = set() |
|
|
for tool in self._tools.values(): |
|
|
all_tags.update(tool.tags) |
|
|
return all_tags |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_prompt(self, prompt: MCPPrompt) -> None: |
|
|
"""Add a prompt to the knowledge graph with validation. |
|
|
|
|
|
Similar to add_tool, this method stores a new prompt in the primary storage. |
|
|
It's typically called during JSON data loading or dynamic prompt registration. |
|
|
|
|
|
Args: |
|
|
prompt: The MCPPrompt instance to add (must be properly validated) |
|
|
|
|
|
Side Effects: |
|
|
- Updates internal _prompts dictionary |
|
|
- Invalidates existing vector index (requires rebuild) |
|
|
- Logs prompt addition for debugging |
|
|
""" |
|
|
self._prompts[prompt.prompt_id] = prompt |
|
|
logger.debug(f"Added prompt to KG: {prompt.prompt_id} ({prompt.name})") |
|
|
|
|
|
def get_prompt_by_id(self, prompt_id: str) -> MCPPrompt | None: |
|
|
"""Retrieve a prompt by its unique identifier. |
|
|
|
|
|
Provides O(1) lookup performance for direct prompt access. |
|
|
|
|
|
Args: |
|
|
prompt_id: The unique identifier of the prompt |
|
|
|
|
|
Returns: |
|
|
The MCPPrompt instance if found, None otherwise |
|
|
""" |
|
|
return self._prompts.get(prompt_id) |
|
|
|
|
|
def get_all_prompts(self) -> list[MCPPrompt]: |
|
|
"""Get all prompts in the knowledge graph as a list. |
|
|
|
|
|
Returns: |
|
|
List of all MCPPrompt instances (empty list if no prompts stored) |
|
|
""" |
|
|
return list(self._prompts.values()) |
|
|
|
|
|
def find_prompts_by_tags(self, tags: list[str]) -> list[MCPPrompt]: |
|
|
"""Find prompts that have any of the specified tags. |
|
|
|
|
|
Uses the same set intersection algorithm as find_tools_by_tags but |
|
|
operates on the prompts collection. |
|
|
|
|
|
Args: |
|
|
tags: List of tags to search for |
|
|
|
|
|
Returns: |
|
|
List of MCPPrompt instances that have at least one matching tag |
|
|
""" |
|
|
matching_prompts = [] |
|
|
tag_set = set(tags) |
|
|
|
|
|
for prompt in self._prompts.values(): |
|
|
prompt_tag_set = set(prompt.tags) |
|
|
if tag_set.intersection(prompt_tag_set): |
|
|
matching_prompts.append(prompt) |
|
|
|
|
|
logger.debug(f"Found {len(matching_prompts)} prompts matching tags: {tags}") |
|
|
return matching_prompts |
|
|
|
|
|
def find_prompts_by_tool_id(self, tool_id: str) -> list[MCPPrompt]: |
|
|
"""Find all prompts designed for a specific tool. |
|
|
|
|
|
This method implements the tool-prompt relationship by filtering prompts |
|
|
based on their target_tool_id. It's essential for the planning phase where |
|
|
we need to find appropriate prompts for a selected tool. |
|
|
|
|
|
Args: |
|
|
tool_id: The ID of the tool to find prompts for |
|
|
|
|
|
Returns: |
|
|
List of MCPPrompt instances targeting the specified tool |
|
|
|
|
|
Performance: |
|
|
O(n) where n = number of prompts (requires full scan) |
|
|
|
|
|
Design Note: |
|
|
This could be optimized with an inverted index (tool_id -> prompt_ids) |
|
|
if performance becomes critical with large prompt collections. |
|
|
""" |
|
|
tool_prompts = [ |
|
|
prompt |
|
|
for prompt in self._prompts.values() |
|
|
if prompt.target_tool_id == tool_id |
|
|
] |
|
|
|
|
|
logger.debug(f"Found {len(tool_prompts)} prompts for tool {tool_id}") |
|
|
return tool_prompts |
|
|
|
|
|
def find_prompts_by_difficulty(self, difficulty_level: str) -> list[MCPPrompt]: |
|
|
"""Find prompts by difficulty level for user experience optimization. |
|
|
|
|
|
This method enables difficulty-based filtering, which is important for |
|
|
progressive user experience where beginners see simpler prompts first. |
|
|
|
|
|
Args: |
|
|
difficulty_level: The difficulty level to filter by ("beginner", "intermediate", "advanced") |
|
|
|
|
|
Returns: |
|
|
List of MCPPrompt instances with the specified difficulty level |
|
|
""" |
|
|
difficulty_prompts = [ |
|
|
prompt |
|
|
for prompt in self._prompts.values() |
|
|
if prompt.difficulty_level == difficulty_level |
|
|
] |
|
|
|
|
|
logger.debug(f"Found {len(difficulty_prompts)} prompts with difficulty: {difficulty_level}") |
|
|
return difficulty_prompts |
|
|
|
|
|
def get_all_prompt_tags(self) -> set[str]: |
|
|
"""Get all unique tags from all prompts. |
|
|
|
|
|
Similar to get_all_tags but operates on the prompts collection. |
|
|
|
|
|
Returns: |
|
|
Set of all unique tags across all prompts |
|
|
""" |
|
|
all_tags = set() |
|
|
for prompt in self._prompts.values(): |
|
|
all_tags.update(prompt.tags) |
|
|
return all_tags |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _cosine_similarity(self, vec1: list[float], vec2: list[float]) -> float: |
|
|
"""Calculate cosine similarity between two vectors using numpy operations. |
|
|
|
|
|
Cosine similarity measures the cosine of the angle between two vectors, |
|
|
providing a measure of orientation similarity rather than magnitude. |
|
|
This is ideal for semantic similarity where we care about conceptual |
|
|
direction rather than absolute values. |
|
|
|
|
|
Mathematical Foundation: |
|
|
cosine_similarity(A, B) = (A Β· B) / (||A|| * ||B||) |
|
|
|
|
|
Where: |
|
|
- A Β· B is the dot product |
|
|
- ||A|| and ||B|| are the vector magnitudes (L2 norms) |
|
|
|
|
|
Range: [-1, 1] where: |
|
|
- 1.0 = identical direction (maximum similarity) |
|
|
- 0.0 = orthogonal (no similarity) |
|
|
- -1.0 = opposite direction (maximum dissimilarity) |
|
|
|
|
|
Args: |
|
|
vec1: First vector (typically query embedding) |
|
|
vec2: Second vector (typically tool/prompt embedding) |
|
|
|
|
|
Returns: |
|
|
Cosine similarity score between -1 and 1 |
|
|
|
|
|
Performance: |
|
|
O(d) where d is the vector dimensionality (typically 1536 for OpenAI) |
|
|
|
|
|
Error Handling: |
|
|
Returns 0.0 for zero-magnitude vectors to prevent division by zero |
|
|
""" |
|
|
|
|
|
v1 = np.array(vec1) |
|
|
v2 = np.array(vec2) |
|
|
|
|
|
|
|
|
dot_product = np.dot(v1, v2) |
|
|
|
|
|
|
|
|
norm1 = np.linalg.norm(v1) |
|
|
norm2 = np.linalg.norm(v2) |
|
|
|
|
|
|
|
|
if norm1 == 0 or norm2 == 0: |
|
|
logger.warning("Zero-magnitude vector encountered in cosine similarity calculation") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
similarity = float(dot_product / (norm1 * norm2)) |
|
|
return similarity |
|
|
|
|
|
def find_similar_tools( |
|
|
self, query_embedding: list[float], top_k: int = 3 |
|
|
) -> list[str]: |
|
|
"""Find tools most similar to the query embedding using cosine similarity. |
|
|
|
|
|
Args: |
|
|
query_embedding: The embedding vector to search for |
|
|
top_k: Maximum number of similar tools to return |
|
|
|
|
|
Returns: |
|
|
List of tool IDs ordered by similarity (most similar first) |
|
|
""" |
|
|
|
|
|
if self.tool_embeddings and query_embedding: |
|
|
|
|
|
similarities = [] |
|
|
for i, tool_embedding in enumerate(self.tool_embeddings): |
|
|
similarity = self._cosine_similarity(query_embedding, tool_embedding) |
|
|
tool_id = self.tool_ids_for_vectors[i] |
|
|
similarities.append((similarity, tool_id)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
|
|
|
return [tool_id for _, tool_id in similarities[:top_k]] |
|
|
|
|
|
|
|
|
logger.info("Embeddings not available, falling back to keyword-based tool search") |
|
|
return self._find_tools_by_keywords("", top_k) |
|
|
|
|
|
def find_similar_tools_with_scores( |
|
|
self, query_embedding: list[float], top_k: int = 3, min_similarity: float = 0.0 |
|
|
) -> list[tuple[str, float]]: |
|
|
"""Find tools most similar to the query embedding with similarity scores. |
|
|
|
|
|
Args: |
|
|
query_embedding: The embedding vector to search for |
|
|
top_k: Maximum number of similar tools to return |
|
|
min_similarity: Minimum similarity threshold to filter results |
|
|
|
|
|
Returns: |
|
|
List of (tool_id, similarity_score) tuples ordered by similarity (most similar first) |
|
|
""" |
|
|
|
|
|
if self.tool_embeddings and query_embedding: |
|
|
|
|
|
similarities = [] |
|
|
for i, tool_embedding in enumerate(self.tool_embeddings): |
|
|
similarity = self._cosine_similarity(query_embedding, tool_embedding) |
|
|
tool_id = self.tool_ids_for_vectors[i] |
|
|
|
|
|
|
|
|
if similarity >= min_similarity: |
|
|
similarities.append((tool_id, similarity)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
return similarities[:top_k] |
|
|
|
|
|
|
|
|
logger.info("Embeddings not available, cannot compute similarity scores") |
|
|
return [] |
|
|
|
|
|
def _find_tools_by_keywords(self, query: str, top_k: int = 3) -> list[str]: |
|
|
"""Fallback keyword-based tool search when embeddings are not available. |
|
|
|
|
|
Args: |
|
|
query: The search query string |
|
|
top_k: Maximum number of tools to return |
|
|
|
|
|
Returns: |
|
|
List of tool IDs ordered by relevance |
|
|
""" |
|
|
|
|
|
|
|
|
available_tools = list(self._tools.keys()) |
|
|
|
|
|
|
|
|
available_tools.sort() |
|
|
|
|
|
|
|
|
return available_tools[:top_k] |
|
|
|
|
|
def find_similar_prompts( |
|
|
self, query_embedding: list[float], top_k: int = 3 |
|
|
) -> list[str]: |
|
|
"""Find prompts most similar to the query embedding using cosine similarity. |
|
|
|
|
|
Args: |
|
|
query_embedding: The embedding vector to search for |
|
|
top_k: Maximum number of similar prompts to return |
|
|
|
|
|
Returns: |
|
|
List of prompt IDs ordered by similarity (most similar first) |
|
|
""" |
|
|
|
|
|
if self.prompt_embeddings and query_embedding: |
|
|
|
|
|
similarities = [] |
|
|
for i, prompt_embedding in enumerate(self.prompt_embeddings): |
|
|
similarity = self._cosine_similarity(query_embedding, prompt_embedding) |
|
|
prompt_id = self.prompt_ids_for_vectors[i] |
|
|
similarities.append((similarity, prompt_id)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
|
|
|
return [prompt_id for _, prompt_id in similarities[:top_k]] |
|
|
|
|
|
|
|
|
logger.info("Embeddings not available, falling back to returning available prompts") |
|
|
return self._find_prompts_by_keywords("", top_k) |
|
|
|
|
|
def find_similar_prompts_with_scores( |
|
|
self, query_embedding: list[float], top_k: int = 3, min_similarity: float = 0.0 |
|
|
) -> list[tuple[str, float]]: |
|
|
"""Find prompts most similar to the query embedding with similarity scores. |
|
|
|
|
|
Args: |
|
|
query_embedding: The embedding vector to search for |
|
|
top_k: Maximum number of similar prompts to return |
|
|
min_similarity: Minimum similarity threshold to filter results |
|
|
|
|
|
Returns: |
|
|
List of (prompt_id, similarity_score) tuples ordered by similarity (most similar first) |
|
|
""" |
|
|
|
|
|
if self.prompt_embeddings and query_embedding: |
|
|
|
|
|
similarities = [] |
|
|
for i, prompt_embedding in enumerate(self.prompt_embeddings): |
|
|
similarity = self._cosine_similarity(query_embedding, prompt_embedding) |
|
|
prompt_id = self.prompt_ids_for_vectors[i] |
|
|
|
|
|
|
|
|
if similarity >= min_similarity: |
|
|
similarities.append((prompt_id, similarity)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
return similarities[:top_k] |
|
|
|
|
|
|
|
|
logger.info("Embeddings not available, cannot compute similarity scores") |
|
|
return [] |
|
|
|
|
|
def _find_prompts_by_keywords(self, query: str, top_k: int = 3) -> list[str]: |
|
|
"""Fallback keyword-based prompt search when embeddings are not available. |
|
|
|
|
|
Args: |
|
|
query: The search query string |
|
|
top_k: Maximum number of prompts to return |
|
|
|
|
|
Returns: |
|
|
List of prompt IDs ordered by relevance |
|
|
""" |
|
|
|
|
|
available_prompts = list(self._prompts.keys()) |
|
|
|
|
|
|
|
|
available_prompts.sort() |
|
|
|
|
|
|
|
|
return available_prompts[:top_k] |
|
|
|
|
|
def find_similar_prompts_for_tool( |
|
|
self, query_embedding: list[float], tool_id: str, top_k: int = 3 |
|
|
) -> list[str]: |
|
|
"""Find prompts for a specific tool, ordered by similarity to query. |
|
|
|
|
|
Args: |
|
|
query_embedding: The embedding vector to search for |
|
|
tool_id: The tool ID to filter prompts by |
|
|
top_k: Maximum number of similar prompts to return |
|
|
|
|
|
Returns: |
|
|
List of prompt IDs for the specified tool, ordered by similarity |
|
|
""" |
|
|
|
|
|
tool_prompts = self.find_prompts_by_tool_id(tool_id) |
|
|
if not tool_prompts: |
|
|
return [] |
|
|
|
|
|
|
|
|
if self.prompt_embeddings and query_embedding: |
|
|
|
|
|
similarities = [] |
|
|
for i, prompt_embedding in enumerate(self.prompt_embeddings): |
|
|
prompt_id = self.prompt_ids_for_vectors[i] |
|
|
prompt = self.get_prompt_by_id(prompt_id) |
|
|
|
|
|
|
|
|
if prompt and prompt.target_tool_id == tool_id: |
|
|
similarity = self._cosine_similarity(query_embedding, prompt_embedding) |
|
|
similarities.append((similarity, prompt_id)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
|
|
|
return [prompt_id for _, prompt_id in similarities[:top_k]] |
|
|
|
|
|
|
|
|
logger.info(f"Embeddings not available, returning available prompts for tool {tool_id}") |
|
|
prompt_ids = [prompt.prompt_id for prompt in tool_prompts[:top_k]] |
|
|
return prompt_ids |
|
|
|
|
|
def find_similar_prompts_for_tool_with_scores( |
|
|
self, query_embedding: list[float], tool_id: str, top_k: int = 3, min_similarity: float = 0.0 |
|
|
) -> list[tuple[str, float]]: |
|
|
"""Find prompts for a specific tool with similarity scores. |
|
|
|
|
|
Args: |
|
|
query_embedding: The embedding vector to search for |
|
|
tool_id: The tool ID to filter prompts by |
|
|
top_k: Maximum number of similar prompts to return |
|
|
min_similarity: Minimum similarity threshold to filter results |
|
|
|
|
|
Returns: |
|
|
List of (prompt_id, similarity_score) tuples for the specified tool, ordered by similarity |
|
|
""" |
|
|
|
|
|
tool_prompts = self.find_prompts_by_tool_id(tool_id) |
|
|
if not tool_prompts: |
|
|
return [] |
|
|
|
|
|
|
|
|
if self.prompt_embeddings and query_embedding: |
|
|
|
|
|
similarities = [] |
|
|
for i, prompt_embedding in enumerate(self.prompt_embeddings): |
|
|
prompt_id = self.prompt_ids_for_vectors[i] |
|
|
prompt = self.get_prompt_by_id(prompt_id) |
|
|
|
|
|
|
|
|
if prompt and prompt.target_tool_id == tool_id: |
|
|
similarity = self._cosine_similarity(query_embedding, prompt_embedding) |
|
|
|
|
|
|
|
|
if similarity >= min_similarity: |
|
|
similarities.append((prompt_id, similarity)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
return similarities[:top_k] |
|
|
|
|
|
|
|
|
logger.info(f"Embeddings not available, cannot compute similarity scores for tool {tool_id}") |
|
|
return [] |
|
|
|
|
|
def build_vector_index(self, embedder: "EmbeddingService") -> bool: |
|
|
"""Build vector index using real embeddings from the EmbeddingService. |
|
|
|
|
|
Args: |
|
|
embedder: EmbeddingService instance to generate embeddings |
|
|
|
|
|
Returns: |
|
|
True if index was built successfully, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
self.tool_embeddings.clear() |
|
|
self.tool_ids_for_vectors.clear() |
|
|
self.prompt_embeddings.clear() |
|
|
self.prompt_ids_for_vectors.clear() |
|
|
|
|
|
tool_embedding_count = 0 |
|
|
prompt_embedding_count = 0 |
|
|
|
|
|
|
|
|
for tool_id, tool in self._tools.items(): |
|
|
|
|
|
tags_str = ", ".join(tool.tags) |
|
|
text_to_embed = f"{tool.name} - {tool.description} Tags: {tags_str}" |
|
|
|
|
|
|
|
|
embedding = embedder.get_embedding(text_to_embed) |
|
|
|
|
|
if embedding is not None and len(embedding) > 0: |
|
|
|
|
|
self.tool_embeddings.append(embedding) |
|
|
self.tool_ids_for_vectors.append(tool_id) |
|
|
tool_embedding_count += 1 |
|
|
else: |
|
|
|
|
|
logger.warning(f"Could not generate embedding for tool {tool_id}") |
|
|
|
|
|
|
|
|
for prompt_id, prompt in self._prompts.items(): |
|
|
|
|
|
tags_str = ", ".join(prompt.tags) |
|
|
variables_str = ", ".join(prompt.input_variables) |
|
|
text_to_embed = ( |
|
|
f"{prompt.name} - {prompt.description} " |
|
|
f"Use case: {prompt.use_case} " |
|
|
f"Template: {prompt.template_string} " |
|
|
f"Variables: {variables_str} " |
|
|
f"Tags: {tags_str} " |
|
|
f"Difficulty: {prompt.difficulty_level}" |
|
|
) |
|
|
|
|
|
|
|
|
embedding = embedder.get_embedding(text_to_embed) |
|
|
|
|
|
if embedding is not None and len(embedding) > 0: |
|
|
|
|
|
self.prompt_embeddings.append(embedding) |
|
|
self.prompt_ids_for_vectors.append(prompt_id) |
|
|
prompt_embedding_count += 1 |
|
|
else: |
|
|
|
|
|
logger.warning( |
|
|
f"Could not generate embedding for prompt {prompt_id}" |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"Successfully built vector index with {tool_embedding_count} tool embeddings " |
|
|
f"and {prompt_embedding_count} prompt embeddings" |
|
|
) |
|
|
return (tool_embedding_count > 0) or (prompt_embedding_count > 0) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to build vector index: {e}") |
|
|
return False |
|
|
|
|
|
def _create_mock_embeddings(self) -> None: |
|
|
"""Create mock embeddings for demo purposes when real embeddings aren't available.""" |
|
|
|
|
|
self.tool_embeddings.clear() |
|
|
self.tool_ids_for_vectors.clear() |
|
|
|
|
|
|
|
|
for tool_id, tool in self._tools.items(): |
|
|
|
|
|
|
|
|
mock_embedding = [] |
|
|
|
|
|
|
|
|
mock_embedding.extend([float(len(tool.name)) / 100.0]) |
|
|
mock_embedding.extend([float(len(tool.tags)) / 10.0]) |
|
|
mock_embedding.extend([float(len(tool.description)) / 1000.0]) |
|
|
|
|
|
|
|
|
for tag in tool.tags[:5]: |
|
|
mock_embedding.append(float(hash(tag) % 100) / 100.0) |
|
|
|
|
|
|
|
|
while len(mock_embedding) < 10: |
|
|
mock_embedding.append(0.1) |
|
|
|
|
|
self.tool_embeddings.append(mock_embedding) |
|
|
self.tool_ids_for_vectors.append(tool_id) |
|
|
|
|
|
logger.info(f"Created {len(self.tool_embeddings)} mock embeddings for demo") |
|
|
|
|
|
def load_tools_from_json(self, json_file: Path | str) -> bool: |
|
|
"""Load tools from a JSON file into the knowledge graph. |
|
|
|
|
|
Args: |
|
|
json_file: Path to the JSON file containing tool data |
|
|
|
|
|
Returns: |
|
|
True if loading was successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
with open(json_file, encoding="utf-8") as f: |
|
|
tools_data = json.load(f) |
|
|
|
|
|
for tool_data in tools_data: |
|
|
tool = MCPTool(**tool_data) |
|
|
self.add_tool(tool) |
|
|
|
|
|
logger.info(f"Successfully loaded {len(tools_data)} tools from {json_file}") |
|
|
|
|
|
|
|
|
self._auto_register_mcp_endpoints() |
|
|
|
|
|
return True |
|
|
|
|
|
except FileNotFoundError: |
|
|
logger.error(f"Tool file not found: {json_file}") |
|
|
return False |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Invalid JSON in tool file {json_file}: {e}") |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load tools from {json_file}: {e}") |
|
|
return False |
|
|
|
|
|
def _auto_register_mcp_endpoints(self) -> None: |
|
|
"""Automatically register MCP endpoints for tools with remote execution type.""" |
|
|
for tool in self._tools.values(): |
|
|
if (tool.execution_type == "remote_mcp_gradio" and |
|
|
tool.mcp_endpoint_url and |
|
|
tool.tool_id not in self._mcp_endpoints): |
|
|
|
|
|
endpoint_info = { |
|
|
"url": tool.mcp_endpoint_url, |
|
|
"input_parameters": tool.input_parameter_order, |
|
|
"timeout_seconds": tool.timeout_seconds, |
|
|
"status": "auto_registered", |
|
|
"last_validated": None |
|
|
} |
|
|
self._mcp_endpoints[tool.tool_id] = endpoint_info |
|
|
logger.info(f"Auto-registered MCP endpoint for tool {tool.tool_id}: {tool.mcp_endpoint_url}") |
|
|
|
|
|
def load_prompts_from_json(self, json_file: Path | str) -> bool: |
|
|
"""Load prompts from a JSON file. |
|
|
|
|
|
Args: |
|
|
json_file: Path to the JSON file containing prompt data |
|
|
|
|
|
Returns: |
|
|
True if loading was successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
if isinstance(json_file, str): |
|
|
json_file = Path(json_file) |
|
|
|
|
|
with json_file.open() as f: |
|
|
prompts_data = json.load(f) |
|
|
|
|
|
for prompt_data in prompts_data: |
|
|
prompt = MCPPrompt(**prompt_data) |
|
|
self.add_prompt(prompt) |
|
|
|
|
|
logger.info( |
|
|
f"Successfully loaded {len(prompts_data)} prompts from {json_file}" |
|
|
) |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load prompts from {json_file}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def register_mcp_endpoint(self, tool_id: str, endpoint_url: str, |
|
|
input_parameters: list[str] = None, |
|
|
timeout_seconds: int = 30) -> bool: |
|
|
"""Register a new MCP endpoint for a tool. |
|
|
|
|
|
Args: |
|
|
tool_id: The ID of the tool to register the endpoint for |
|
|
endpoint_url: The URL of the MCP endpoint |
|
|
input_parameters: List of input parameter names in order |
|
|
timeout_seconds: Request timeout for the endpoint |
|
|
|
|
|
Returns: |
|
|
True if registration was successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
parsed_url = urlparse(endpoint_url) |
|
|
if not parsed_url.scheme or not parsed_url.netloc: |
|
|
logger.error(f"Invalid endpoint URL format: {endpoint_url}") |
|
|
return False |
|
|
|
|
|
|
|
|
tool = self.get_tool_by_id(tool_id) |
|
|
if not tool: |
|
|
logger.error(f"Tool {tool_id} not found in knowledge graph") |
|
|
return False |
|
|
|
|
|
|
|
|
tool.execution_type = "remote_mcp_gradio" |
|
|
tool.mcp_endpoint_url = endpoint_url |
|
|
tool.input_parameter_order = input_parameters or [] |
|
|
tool.timeout_seconds = timeout_seconds |
|
|
|
|
|
|
|
|
endpoint_info = { |
|
|
"url": endpoint_url, |
|
|
"input_parameters": input_parameters or [], |
|
|
"timeout_seconds": timeout_seconds, |
|
|
"status": "registered", |
|
|
"last_validated": None |
|
|
} |
|
|
self._mcp_endpoints[tool_id] = endpoint_info |
|
|
|
|
|
logger.info(f"Successfully registered MCP endpoint for tool {tool_id}: {endpoint_url}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to register MCP endpoint for tool {tool_id}: {e}") |
|
|
return False |
|
|
|
|
|
def validate_mcp_endpoint(self, tool_id: str, test_payload: dict = None) -> bool: |
|
|
"""Validate an MCP endpoint by making a test request. |
|
|
|
|
|
Args: |
|
|
tool_id: The ID of the tool to validate the endpoint for |
|
|
test_payload: Optional test payload to send to the endpoint |
|
|
|
|
|
Returns: |
|
|
True if validation was successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
if tool_id not in self._mcp_endpoints: |
|
|
logger.error(f"No registered endpoint for tool {tool_id}") |
|
|
return False |
|
|
|
|
|
endpoint_info = self._mcp_endpoints[tool_id] |
|
|
endpoint_url = endpoint_info["url"] |
|
|
timeout = endpoint_info.get("timeout_seconds", 30) |
|
|
|
|
|
|
|
|
if test_payload is None: |
|
|
test_payload = {"data": ["test"]} |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
endpoint_url, |
|
|
json=test_payload, |
|
|
timeout=timeout, |
|
|
headers={"Content-Type": "application/json"} |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
endpoint_info["status"] = "validated" |
|
|
endpoint_info["last_validated"] = "now" |
|
|
logger.info(f"Successfully validated MCP endpoint for tool {tool_id}") |
|
|
return True |
|
|
endpoint_info["status"] = "validation_failed" |
|
|
logger.error(f"MCP endpoint validation failed for tool {tool_id}: HTTP {response.status_code}") |
|
|
return False |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
logger.error(f"MCP endpoint validation timeout for tool {tool_id}") |
|
|
self._mcp_endpoints[tool_id]["status"] = "timeout" |
|
|
return False |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"MCP endpoint validation error for tool {tool_id}: {e}") |
|
|
self._mcp_endpoints[tool_id]["status"] = "error" |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error validating MCP endpoint for tool {tool_id}: {e}") |
|
|
return False |
|
|
|
|
|
def discover_mcp_tools(self, base_urls: list[str]) -> list[str]: |
|
|
"""Discover available MCP tools from a list of base URLs. |
|
|
|
|
|
Args: |
|
|
base_urls: List of base URLs to check for MCP endpoints |
|
|
|
|
|
Returns: |
|
|
List of discovered tool IDs that were successfully registered |
|
|
""" |
|
|
discovered_tools = [] |
|
|
|
|
|
for base_url in base_urls: |
|
|
try: |
|
|
|
|
|
mcp_patterns = [ |
|
|
"/gradio_api/mcp/sse", |
|
|
"/mcp", |
|
|
"/api/mcp" |
|
|
] |
|
|
|
|
|
for pattern in mcp_patterns: |
|
|
endpoint_url = base_url.rstrip("/") + pattern |
|
|
|
|
|
try: |
|
|
|
|
|
response = requests.get(endpoint_url, timeout=10) |
|
|
if response.status_code in [200, 405]: |
|
|
|
|
|
tool_id = self._infer_tool_id_from_url(base_url) |
|
|
if tool_id and self.register_mcp_endpoint(tool_id, endpoint_url): |
|
|
discovered_tools.append(tool_id) |
|
|
break |
|
|
except requests.exceptions.RequestException: |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error discovering MCP tools from {base_url}: {e}") |
|
|
continue |
|
|
|
|
|
logger.info(f"Discovered {len(discovered_tools)} MCP tools: {discovered_tools}") |
|
|
return discovered_tools |
|
|
|
|
|
def get_mcp_endpoints(self) -> dict[str, dict]: |
|
|
"""Get all registered MCP endpoints. |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping tool IDs to their endpoint information |
|
|
""" |
|
|
return self._mcp_endpoints.copy() |
|
|
|
|
|
def get_mcp_tools(self) -> list[MCPTool]: |
|
|
"""Get all tools configured for MCP execution. |
|
|
|
|
|
Returns: |
|
|
List of MCPTool instances with execution_type='remote_mcp_gradio' |
|
|
""" |
|
|
return [ |
|
|
tool for tool in self._tools.values() |
|
|
if tool.execution_type == "remote_mcp_gradio" |
|
|
] |
|
|
|
|
|
def update_mcp_endpoint_status(self, tool_id: str, status: str) -> bool: |
|
|
"""Update the status of an MCP endpoint. |
|
|
|
|
|
Args: |
|
|
tool_id: The ID of the tool |
|
|
status: New status (e.g., 'active', 'inactive', 'error') |
|
|
|
|
|
Returns: |
|
|
True if update was successful, False otherwise |
|
|
""" |
|
|
if tool_id not in self._mcp_endpoints: |
|
|
logger.error(f"No registered endpoint for tool {tool_id}") |
|
|
return False |
|
|
|
|
|
self._mcp_endpoints[tool_id]["status"] = status |
|
|
logger.info(f"Updated MCP endpoint status for tool {tool_id}: {status}") |
|
|
return True |
|
|
|
|
|
def _infer_tool_id_from_url(self, url: str) -> str | None: |
|
|
"""Infer tool ID from URL patterns. |
|
|
|
|
|
Args: |
|
|
url: The URL to analyze |
|
|
|
|
|
Returns: |
|
|
Inferred tool ID or None if not recognized |
|
|
""" |
|
|
url_lower = url.lower() |
|
|
|
|
|
|
|
|
if "sentiment" in url_lower: |
|
|
return "sentiment_analyzer_002" |
|
|
if "summar" in url_lower: |
|
|
return "text_summarizer_001" |
|
|
if "caption" in url_lower or "image" in url_lower: |
|
|
return "image_caption_003" |
|
|
if "lint" in url_lower or "code" in url_lower: |
|
|
return "code_linter_004" |
|
|
|
|
|
return None |
|
|
|