|
|
"""Tool Discovery Engine for MVP 1: KG-Powered Tool Suggester.""" |
|
|
|
|
|
import logging |
|
|
import os |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime |
|
|
from typing import Any |
|
|
|
|
|
import numpy as np |
|
|
from openai import OpenAI |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class MCPTool(BaseModel): |
|
|
"""MCP Tool data model for the Knowledge Graph.""" |
|
|
|
|
|
id: str = Field(description="Unique tool identifier") |
|
|
name: str = Field(description="Human-readable tool name") |
|
|
description: str = Field(description="Detailed tool description") |
|
|
category: str = Field(description="Tool category") |
|
|
capabilities: list[str] = Field( |
|
|
default_factory=list, description="Tool capabilities" |
|
|
) |
|
|
input_types: list[str] = Field( |
|
|
default_factory=list, description="Supported input types" |
|
|
) |
|
|
output_types: list[str] = Field( |
|
|
default_factory=list, description="Supported output types" |
|
|
) |
|
|
tags: list[str] = Field(default_factory=list, description="Search tags") |
|
|
complexity: str = Field(default="medium", description="Tool complexity level") |
|
|
created_at: datetime = Field(default_factory=datetime.now) |
|
|
|
|
|
|
|
|
class ToolSearchCriteria(BaseModel): |
|
|
"""Search criteria for tool discovery.""" |
|
|
|
|
|
query: str = Field(description="User's natural language query") |
|
|
max_results: int = Field(default=3, description="Maximum number of results") |
|
|
category_filter: str | None = Field(None, description="Filter by tool category") |
|
|
complexity_filter: str | None = Field(None, description="Filter by complexity") |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ToolMatch: |
|
|
"""Represents a tool match with similarity score.""" |
|
|
|
|
|
tool: MCPTool |
|
|
similarity_score: float |
|
|
tool_id: str |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.tool_id: |
|
|
self.tool_id = self.tool.id |
|
|
|
|
|
|
|
|
class EmbeddingService: |
|
|
"""Service for generating and comparing embeddings.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize embedding service with OpenAI client.""" |
|
|
self.client = None |
|
|
self._initialize_client() |
|
|
|
|
|
def _initialize_client(self): |
|
|
"""Initialize OpenAI client if API key available.""" |
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if api_key: |
|
|
try: |
|
|
self.client = OpenAI(api_key=api_key) |
|
|
logger.info("OpenAI client initialized successfully") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to initialize OpenAI client: {e}") |
|
|
self.client = None |
|
|
else: |
|
|
logger.warning("OPENAI_API_KEY not found, using mock embeddings") |
|
|
|
|
|
def get_embedding(self, text: str) -> list[float]: |
|
|
"""Generate embedding for text.""" |
|
|
if self.client: |
|
|
return self._get_openai_embedding(text) |
|
|
return self._get_mock_embedding(text) |
|
|
|
|
|
def _get_openai_embedding(self, text: str) -> list[float]: |
|
|
"""Get embedding from OpenAI API.""" |
|
|
try: |
|
|
response = self.client.embeddings.create( |
|
|
model="text-embedding-3-small", input=text, encoding_format="float" |
|
|
) |
|
|
return response.data[0].embedding |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get OpenAI embedding: {e}") |
|
|
return self._get_mock_embedding(text) |
|
|
|
|
|
def _get_mock_embedding(self, text: str) -> list[float]: |
|
|
"""Generate mock embedding for testing/demo purposes.""" |
|
|
|
|
|
|
|
|
np.random.seed(hash(text) % 2**32) |
|
|
embedding = np.random.rand(384).tolist() |
|
|
|
|
|
|
|
|
keywords = { |
|
|
"summarize": [1.0, 0.8, 0.6], |
|
|
"sentiment": [0.8, 1.0, 0.7], |
|
|
"image": [0.6, 0.7, 1.0], |
|
|
"translate": [0.7, 0.6, 0.8], |
|
|
"analyze": [0.9, 0.8, 0.7], |
|
|
} |
|
|
|
|
|
text_lower = text.lower() |
|
|
for keyword, weights in keywords.items(): |
|
|
if keyword in text_lower: |
|
|
for i, weight in enumerate(weights): |
|
|
if i < len(embedding): |
|
|
embedding[i] *= weight |
|
|
|
|
|
return embedding |
|
|
|
|
|
def calculate_similarity( |
|
|
self, embedding1: list[float], embedding2: list[float] |
|
|
) -> float: |
|
|
"""Calculate cosine similarity between two embeddings.""" |
|
|
vec1 = np.array(embedding1) |
|
|
vec2 = np.array(embedding2) |
|
|
|
|
|
|
|
|
norm1 = np.linalg.norm(vec1) |
|
|
norm2 = np.linalg.norm(vec2) |
|
|
|
|
|
if norm1 == 0 or norm2 == 0: |
|
|
return 0.0 |
|
|
|
|
|
vec1_normalized = vec1 / norm1 |
|
|
vec2_normalized = vec2 / norm2 |
|
|
|
|
|
|
|
|
similarity = np.dot(vec1_normalized, vec2_normalized) |
|
|
return float(similarity) |
|
|
|
|
|
|
|
|
class KnowledgeGraphService: |
|
|
"""In-memory Knowledge Graph for tool metadata.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize in-memory storage.""" |
|
|
self.tools: dict[str, dict[str, Any]] = {} |
|
|
self.embeddings: dict[str, list[float]] = {} |
|
|
self.embedding_service = EmbeddingService() |
|
|
|
|
|
def store_tool(self, tool_data: dict[str, Any]) -> None: |
|
|
"""Store tool metadata and generate embedding.""" |
|
|
tool_id = tool_data["id"] |
|
|
self.tools[tool_id] = tool_data |
|
|
|
|
|
|
|
|
description = tool_data.get("description", "") |
|
|
embedding = self.embedding_service.get_embedding(description) |
|
|
self.embeddings[tool_id] = embedding |
|
|
|
|
|
logger.info(f"Stored tool: {tool_id}") |
|
|
|
|
|
def get_tool(self, tool_id: str) -> dict[str, Any] | None: |
|
|
"""Retrieve tool by ID.""" |
|
|
return self.tools.get(tool_id) |
|
|
|
|
|
def find_tools_by_capability(self, capability: str) -> list[dict[str, Any]]: |
|
|
"""Find tools that have a specific capability.""" |
|
|
matching_tools = [] |
|
|
for tool_data in self.tools.values(): |
|
|
capabilities = tool_data.get("capabilities", []) |
|
|
if capability in capabilities: |
|
|
matching_tools.append(tool_data) |
|
|
return matching_tools |
|
|
|
|
|
def search_by_similarity( |
|
|
self, query_embedding: list[float], max_results: int = 3 |
|
|
) -> list[tuple[str, float]]: |
|
|
"""Search tools by embedding similarity.""" |
|
|
similarities = [] |
|
|
|
|
|
for tool_id, tool_embedding in self.embeddings.items(): |
|
|
similarity = self.embedding_service.calculate_similarity( |
|
|
query_embedding, tool_embedding |
|
|
) |
|
|
similarities.append((tool_id, similarity)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
return similarities[:max_results] |
|
|
|
|
|
def get_all_tools(self) -> list[dict[str, Any]]: |
|
|
"""Get all stored tools.""" |
|
|
return list(self.tools.values()) |
|
|
|
|
|
|
|
|
class ToolDiscoveryEngine: |
|
|
"""Main tool discovery engine for MVP 1.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize discovery engine with curated tools.""" |
|
|
self.kg_service = KnowledgeGraphService() |
|
|
self.embedding_service = EmbeddingService() |
|
|
self._load_curated_tools() |
|
|
|
|
|
def _load_curated_tools(self) -> None: |
|
|
"""Load curated mini-KG with 3-5 diverse MCP tools for MVP 1.""" |
|
|
curated_tools = [ |
|
|
{ |
|
|
"id": "summarizer", |
|
|
"name": "Text Summarizer", |
|
|
"description": "Summarizes long text documents into concise key points and bullet points. Perfect for news articles, research papers, and lengthy content.", |
|
|
"category": "text_processing", |
|
|
"capabilities": ["summarization", "text_analysis", "key_extraction"], |
|
|
"input_types": ["text", "document"], |
|
|
"output_types": ["text", "bullet_points"], |
|
|
"tags": ["summarize", "compress", "key points", "extract"], |
|
|
"complexity": "easy", |
|
|
}, |
|
|
{ |
|
|
"id": "sentiment_analyzer", |
|
|
"name": "Sentiment Analyzer", |
|
|
"description": "Analyzes the emotional tone and sentiment of text content. Detects positive, negative, or neutral sentiment with confidence scores.", |
|
|
"category": "text_analysis", |
|
|
"capabilities": [ |
|
|
"sentiment_analysis", |
|
|
"emotion_detection", |
|
|
"mood_analysis", |
|
|
], |
|
|
"input_types": ["text", "social_media"], |
|
|
"output_types": ["sentiment_score", "emotion_labels"], |
|
|
"tags": ["sentiment", "emotion", "mood", "analyze", "feeling"], |
|
|
"complexity": "medium", |
|
|
}, |
|
|
{ |
|
|
"id": "image_generator", |
|
|
"name": "Image Generator", |
|
|
"description": "Generates creative images from text descriptions using AI. Creates artwork, illustrations, and visual content from natural language prompts.", |
|
|
"category": "creative", |
|
|
"capabilities": ["image_generation", "art_creation", "visual_design"], |
|
|
"input_types": ["text_prompt", "description"], |
|
|
"output_types": ["image", "artwork"], |
|
|
"tags": ["image", "generate", "create", "art", "visual", "picture"], |
|
|
"complexity": "medium", |
|
|
}, |
|
|
{ |
|
|
"id": "translator", |
|
|
"name": "Language Translator", |
|
|
"description": "Translates text between multiple languages with high accuracy. Supports over 100 languages and preserves context and meaning.", |
|
|
"category": "language", |
|
|
"capabilities": ["translation", "language_detection", "multilingual"], |
|
|
"input_types": ["text", "document"], |
|
|
"output_types": ["translated_text"], |
|
|
"tags": ["translate", "language", "multilingual", "convert"], |
|
|
"complexity": "easy", |
|
|
}, |
|
|
{ |
|
|
"id": "code_analyzer", |
|
|
"name": "Code Analyzer", |
|
|
"description": "Analyzes code quality, detects bugs, suggests improvements, and provides security recommendations for various programming languages.", |
|
|
"category": "development", |
|
|
"capabilities": [ |
|
|
"code_analysis", |
|
|
"bug_detection", |
|
|
"security_audit", |
|
|
"quality_assessment", |
|
|
], |
|
|
"input_types": ["source_code", "repository"], |
|
|
"output_types": ["analysis_report", "recommendations"], |
|
|
"tags": ["code", "analyze", "bugs", "security", "quality", "review"], |
|
|
"complexity": "advanced", |
|
|
}, |
|
|
] |
|
|
|
|
|
|
|
|
for tool_data in curated_tools: |
|
|
self.kg_service.store_tool(tool_data) |
|
|
|
|
|
logger.info(f"Loaded {len(curated_tools)} curated MCP tools") |
|
|
|
|
|
def load_curated_tools(self) -> list[MCPTool]: |
|
|
"""Return curated tools as MCPTool objects.""" |
|
|
tools = [] |
|
|
for tool_data in self.kg_service.get_all_tools(): |
|
|
tool = MCPTool(**tool_data) |
|
|
tools.append(tool) |
|
|
return tools |
|
|
|
|
|
def get_tool_by_id(self, tool_id: str) -> MCPTool | None: |
|
|
"""Get specific tool by ID.""" |
|
|
tool_data = self.kg_service.get_tool(tool_id) |
|
|
if tool_data: |
|
|
return MCPTool(**tool_data) |
|
|
return None |
|
|
|
|
|
def search_tools(self, criteria: ToolSearchCriteria) -> list[ToolMatch]: |
|
|
"""Search for tools based on user query using semantic similarity.""" |
|
|
|
|
|
query_embedding = self.embedding_service.get_embedding(criteria.query) |
|
|
|
|
|
|
|
|
similar_tools = self.kg_service.search_by_similarity( |
|
|
query_embedding, criteria.max_results |
|
|
) |
|
|
|
|
|
results = [] |
|
|
for tool_id, similarity_score in similar_tools: |
|
|
tool_data = self.kg_service.get_tool(tool_id) |
|
|
if tool_data: |
|
|
|
|
|
if ( |
|
|
criteria.category_filter |
|
|
and tool_data.get("category") != criteria.category_filter |
|
|
): |
|
|
continue |
|
|
if ( |
|
|
criteria.complexity_filter |
|
|
and tool_data.get("complexity") != criteria.complexity_filter |
|
|
): |
|
|
continue |
|
|
|
|
|
tool = MCPTool(**tool_data) |
|
|
match = ToolMatch( |
|
|
tool=tool, similarity_score=similarity_score, tool_id=tool_id |
|
|
) |
|
|
results.append(match) |
|
|
|
|
|
|
|
|
results.sort(key=lambda x: x.similarity_score, reverse=True) |
|
|
|
|
|
logger.info( |
|
|
f"Found {len(results)} matching tools for query: '{criteria.query}'" |
|
|
) |
|
|
return results |
|
|
|
|
|
def filter_recipes(self, tools: list[MCPTool], **filters) -> list[MCPTool]: |
|
|
"""Filter tools by various criteria.""" |
|
|
filtered_tools = tools |
|
|
|
|
|
if "category" in filters: |
|
|
filtered_tools = [ |
|
|
t for t in filtered_tools if t.category == filters["category"] |
|
|
] |
|
|
|
|
|
if "complexity" in filters: |
|
|
filtered_tools = [ |
|
|
t for t in filtered_tools if t.complexity == filters["complexity"] |
|
|
] |
|
|
|
|
|
if "capabilities" in filters: |
|
|
required_caps = filters["capabilities"] |
|
|
filtered_tools = [ |
|
|
t |
|
|
for t in filtered_tools |
|
|
if any(cap in t.capabilities for cap in required_caps) |
|
|
] |
|
|
|
|
|
return filtered_tools |
|
|
|
|
|
def sort_recipes( |
|
|
self, tools: list[MCPTool], sort_by: str = "name" |
|
|
) -> list[MCPTool]: |
|
|
"""Sort tools by specified criteria.""" |
|
|
if sort_by == "name": |
|
|
return sorted(tools, key=lambda t: t.name) |
|
|
if sort_by == "complexity": |
|
|
complexity_order = {"easy": 1, "medium": 2, "advanced": 3} |
|
|
return sorted(tools, key=lambda t: complexity_order.get(t.complexity, 2)) |
|
|
if sort_by == "category": |
|
|
return sorted(tools, key=lambda t: t.category) |
|
|
return tools |
|
|
|
|
|
|
|
|
|
|
|
RecipeRecommendationEngine = ToolDiscoveryEngine |
|
|
SearchResult = ToolMatch |
|
|
RecommendationScore = float |
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"EmbeddingService", |
|
|
"KnowledgeGraphService", |
|
|
"MCPTool", |
|
|
"RecipeRecommendationEngine", |
|
|
"RecommendationScore", |
|
|
"SearchResult", |
|
|
"ToolDiscoveryEngine", |
|
|
"ToolMatch", |
|
|
"ToolSearchCriteria", |
|
|
] |
|
|
|