BasalGanglia's picture
๐Ÿ› ๏ธ Fix HuggingFace Space configuration - Remove quotes from frontmatter
64ced8b verified
"""SimplePlannerAgent implementation for KGraph-MCP."""
import logging
from typing import Any
from kg_services.embedder import EmbeddingService
from kg_services.knowledge_graph import InMemoryKG
from kg_services.ontology import MCPPrompt, MCPTool, PlannedStep
logger = logging.getLogger(__name__)
class SimplePlannerAgent:
"""
A simplified planner agent that suggests tools and prompts based on user queries
using semantic similarity search.
This agent orchestrates the EmbeddingService and InMemoryKG to process
natural language queries and return relevant MCPTool+MCPPrompt combinations
as PlannedStep objects.
"""
def __init__(self, kg: InMemoryKG, embedder: EmbeddingService) -> None:
"""
Initialize the planner with knowledge graph and embedding service.
Args:
kg: InMemoryKG instance for tool/prompt storage and similarity search
embedder: EmbeddingService instance for generating query embeddings
"""
self.kg = kg
self.embedder = embedder
def construct_conceptual_sampling_request(
self,
plan: PlannedStep,
task_context_text: str
) -> dict[str, Any]:
"""
Constructs MCP sampling/createMessage request parameters based on KG-stored preferences.
This method demonstrates MVP5's KG-informed model selection by extracting
sampling preferences from the prompt in the planned step and building
a valid MCP sampling request structure.
Args:
plan: PlannedStep containing tool and prompt with sampling preferences
task_context_text: User's task context for the sampling request
Returns:
Dict containing MCP-compliant sampling/createMessage parameters
Example:
>>> plan = PlannedStep(tool=some_tool, prompt=some_prompt)
>>> params = agent.construct_conceptual_sampling_request(plan, "Analyze this text")
>>> # Returns: {"messages": [...], "maxTokens": 256, "modelPreferences": {...}}
"""
prompt_prefs = plan.prompt
# Build messages array (required field)
messages = [{
"role": "user",
"content": {
"type": "text",
"text": task_context_text
}
}]
# Start with required fields
sampling_params = {
"messages": messages,
"maxTokens": prompt_prefs.default_max_tokens_sampling or 256
}
# Add system prompt if available
if prompt_prefs.default_system_prompt_hint:
sampling_params["systemPrompt"] = prompt_prefs.default_system_prompt_hint
# Add sampling temperature if specified
if prompt_prefs.default_sampling_temperature is not None:
sampling_params["temperature"] = prompt_prefs.default_sampling_temperature
# Build model preferences object if any preferences exist
model_preferences = {}
# Add model hints
if prompt_prefs.preferred_model_hints:
model_preferences["hints"] = [
{"name": hint} for hint in prompt_prefs.preferred_model_hints
]
# Add priority scores
priorities = {}
if prompt_prefs.cost_priority_score is not None:
priorities["cost"] = prompt_prefs.cost_priority_score
if prompt_prefs.speed_priority_score is not None:
priorities["speed"] = prompt_prefs.speed_priority_score
if prompt_prefs.intelligence_priority_score is not None:
priorities["intelligence"] = prompt_prefs.intelligence_priority_score
if priorities:
model_preferences["priorities"] = priorities
# Add context inclusion preference
if prompt_prefs.sampling_context_inclusion_hint:
model_preferences["contextInclusion"] = prompt_prefs.sampling_context_inclusion_hint
# Only add modelPreferences if we have any preferences
if model_preferences:
sampling_params["modelPreferences"] = model_preferences
# Add metadata for debugging/logging
sampling_params["_kgraph_mcp_metadata"] = {
"tool_id": plan.tool.tool_id,
"tool_name": plan.tool.name,
"prompt_id": plan.prompt.prompt_id,
"prompt_name": plan.prompt.name,
"relevance_score": plan.relevance_score
}
logger.info(
f"Constructed sampling request for {plan.tool.name} using {plan.prompt.name} prompt. "
f"Model hints: {prompt_prefs.preferred_model_hints}, "
f"Priorities: cost={prompt_prefs.cost_priority_score}, "
f"speed={prompt_prefs.speed_priority_score}, "
f"intelligence={prompt_prefs.intelligence_priority_score}"
)
return sampling_params
def generate_plan(self, user_query: str, top_k: int = 3) -> list[PlannedStep]:
"""
Generate a plan with tool+prompt combinations based on user query.
This method:
1. Validates the user query
2. Generates an embedding for the query
3. Finds similar tools using semantic search
4. For each tool, finds the most relevant prompts
5. Creates PlannedStep objects with relevance scores
6. Returns ranked list of tool+prompt combinations
Args:
user_query: Natural language query from user
top_k: Maximum number of planned steps to return (default: 3)
Returns:
List of PlannedStep objects, ordered by relevance.
Returns empty list if query is invalid or no matches found.
"""
# Handle empty or whitespace-only queries
if not user_query or not user_query.strip():
logger.warning("Empty or whitespace-only query provided")
return []
# Get embedding for the user query
query_embedding = self.embedder.get_embedding(user_query)
if query_embedding is None:
logger.warning(f"Could not generate embedding for query: {user_query}")
# For demo purposes, create a simple mock embedding for the query
query_embedding = self._create_mock_query_embedding(user_query)
if not query_embedding:
return []
# Phase 1: Find similar tools
similar_tool_ids = self.kg.find_similar_tools(
query_embedding, top_k=top_k * 2
) # Get more tools initially
# Phase 2: For each tool, find relevant prompts and create PlannedSteps
planned_steps: list[PlannedStep] = []
for tool_id in similar_tool_ids:
tool = self.kg.get_tool_by_id(tool_id)
if tool is None:
continue
# Find prompts for this tool using semantic similarity
tool_prompts = self.kg.find_similar_prompts_for_tool(
query_embedding, tool_id, top_k=2
)
# If no specific prompts found for this tool, try general prompts
if not tool_prompts:
all_tool_prompts = self.kg.find_prompts_by_tool_id(tool_id)
if all_tool_prompts:
# Take the first available prompt as fallback
tool_prompts = [all_tool_prompts[0].prompt_id]
# Create PlannedStep for each relevant prompt
for prompt_id in tool_prompts:
prompt = self.kg.get_prompt_by_id(prompt_id)
if prompt is None:
continue
try:
# Calculate relevance score based on semantic similarity
relevance_score = self._calculate_relevance_score(
query_embedding, tool, prompt
)
planned_step = PlannedStep(
tool=tool, prompt=prompt, relevance_score=relevance_score
)
planned_steps.append(planned_step)
except ValueError as e:
logger.warning(
f"Could not create PlannedStep for tool {tool_id} and prompt {prompt_id}: {e}"
)
continue
# Phase 3: Filter by minimum relevance threshold and sort
min_relevance_threshold = 0.1 # Filter out very low relevance results
filtered_steps = [
step for step in planned_steps
if step.relevance_score is not None and step.relevance_score >= min_relevance_threshold
]
# Sort by relevance score and return top_k
filtered_steps.sort(key=lambda x: x.relevance_score or 0.0, reverse=True)
final_steps = filtered_steps[:top_k]
# Log the planning results
if final_steps:
step_summaries = [step.summary for step in final_steps]
scores_summary = [f"{step.relevance_score:.3f}" for step in final_steps]
logger.info(
f"Planner generated {len(final_steps)} planned steps (scores: {scores_summary}): "
f"{step_summaries} for query: '{user_query}'"
)
elif planned_steps:
logger.info(
f"Planner filtered out all {len(planned_steps)} planned steps below "
f"relevance threshold {min_relevance_threshold} for query: '{user_query}'"
)
else:
logger.info(
f"Planner could not generate any planned steps for query: '{user_query}'"
)
return final_steps
def _calculate_relevance_score(
self, query_embedding: list[float], tool: MCPTool, prompt: MCPPrompt
) -> float:
"""
Calculate relevance score for a tool+prompt combination using actual similarity scores.
This combines:
1. Tool semantic similarity to query (actual cosine similarity)
2. Prompt semantic similarity to query (actual cosine similarity)
3. Difficulty level weighting (simpler prompts ranked higher for ambiguous queries)
Args:
query_embedding: Embedding vector for the user query
tool: MCPTool instance
prompt: MCPPrompt instance
Returns:
Relevance score between 0.0 and 1.0
"""
try:
# Get actual tool similarity score
tool_similarities_with_scores = self.kg.find_similar_tools_with_scores(
query_embedding, top_k=50, min_similarity=0.0 # Get all to find specific tool
)
tool_score = 0.0
for tool_id, similarity in tool_similarities_with_scores:
if tool_id == tool.tool_id:
tool_score = similarity
break
# Get actual prompt similarity score
prompt_similarities_with_scores = self.kg.find_similar_prompts_with_scores(
query_embedding, top_k=50, min_similarity=0.0 # Get all to find specific prompt
)
prompt_score = 0.0
for prompt_id, similarity in prompt_similarities_with_scores:
if prompt_id == prompt.prompt_id:
prompt_score = similarity
break
# Difficulty bonus (beginner=0.05, intermediate=0.02, advanced=0.0)
# Reduced bonus to prevent inflation of scores
difficulty_bonus = {
"beginner": 0.05,
"intermediate": 0.02,
"advanced": 0.0,
}.get(prompt.difficulty_level, 0.0)
# Weighted combination using actual similarity scores
# Normalize cosine similarity from [-1, 1] to [0, 1] if needed
tool_score = max(0.0, tool_score)
prompt_score = max(0.0, prompt_score)
relevance_score = (
(0.7 * tool_score) + (0.25 * prompt_score) + difficulty_bonus
)
# Cap at 1.0 and apply minimum threshold
final_score = min(1.0, relevance_score)
# Log score calculation for debugging
logger.debug(
f"Relevance score for {tool.tool_id}+{prompt.prompt_id}: "
f"tool_sim={tool_score:.3f}, prompt_sim={prompt_score:.3f}, "
f"difficulty_bonus={difficulty_bonus:.3f}, final={final_score:.3f}"
)
return final_score
except Exception as e:
logger.warning(f"Error calculating relevance score: {e}")
return 0.0 # Return 0.0 for failed calculations instead of 0.5
def suggest_tools(self, user_query: str, top_k: int = 3) -> list[MCPTool]:
"""
Suggest relevant tools based on user query using semantic similarity.
DEPRECATED: This method is maintained for backward compatibility.
New code should use generate_plan() instead.
This method:
1. Validates the user query
2. Generates an embedding for the query
3. Finds similar tools using the knowledge graph
4. Retrieves and returns the actual MCPTool objects
Args:
user_query: Natural language query from user
top_k: Maximum number of tools to suggest (default: 3)
Returns:
List of relevant MCPTool objects, ordered by relevance.
Returns empty list if query is invalid or no tools found.
"""
# Handle empty or whitespace-only queries
if not user_query or not user_query.strip():
logger.warning("Empty or whitespace-only query provided")
return []
# Get embedding for the user query
query_embedding = self.embedder.get_embedding(user_query)
if query_embedding is None:
logger.warning(f"Could not generate embedding for query: {user_query}")
# For demo purposes, create a simple mock embedding for the query
query_embedding = self._create_mock_query_embedding(user_query)
if not query_embedding:
return []
# Find similar tools using the knowledge graph
similar_tool_ids = self.kg.find_similar_tools(query_embedding, top_k=top_k)
# Retrieve actual MCPTool objects
suggested_tools: list[MCPTool] = []
for tool_id in similar_tool_ids:
tool = self.kg.get_tool_by_id(tool_id)
if tool is not None:
suggested_tools.append(tool)
# Log the suggestion for observability
logger.info(
f"Planner suggested tools: {[t.name for t in suggested_tools]} for query: '{user_query}'"
)
return suggested_tools
def _create_mock_query_embedding(self, query: str) -> list[float]:
"""
Create a mock embedding for a query when real embeddings aren't available.
This is a simple fallback for demo purposes that creates embeddings
based on basic text characteristics.
Args:
query: The user query string
Returns:
Mock embedding vector as list of floats
"""
# Create simple mock embedding based on query characteristics
mock_embedding = []
# Add dimensions based on query length and word count
words = query.lower().split()
mock_embedding.extend([float(len(query)) / 100.0])
mock_embedding.extend([float(len(words)) / 10.0])
# Add some values based on key words that might match tool tags
key_words = [
"text",
"sentiment",
"image",
"code",
"analyze",
"summarize",
"caption",
"lint",
]
for word in key_words:
if word in query.lower():
mock_embedding.append(0.8) # High similarity for matching keywords
else:
mock_embedding.append(0.1) # Low similarity for non-matching
# Pad to consistent length (same as mock tool embeddings)
while len(mock_embedding) < 10:
mock_embedding.append(0.1)
logger.debug(f"Created mock embedding for query: '{query[:30]}...'")
return mock_embedding