|
|
"""SimplePlannerAgent implementation for KGraph-MCP.""" |
|
|
|
|
|
import logging |
|
|
from typing import Any |
|
|
|
|
|
from kg_services.embedder import EmbeddingService |
|
|
from kg_services.knowledge_graph import InMemoryKG |
|
|
from kg_services.ontology import MCPPrompt, MCPTool, PlannedStep |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class SimplePlannerAgent: |
|
|
""" |
|
|
A simplified planner agent that suggests tools and prompts based on user queries |
|
|
using semantic similarity search. |
|
|
|
|
|
This agent orchestrates the EmbeddingService and InMemoryKG to process |
|
|
natural language queries and return relevant MCPTool+MCPPrompt combinations |
|
|
as PlannedStep objects. |
|
|
""" |
|
|
|
|
|
def __init__(self, kg: InMemoryKG, embedder: EmbeddingService) -> None: |
|
|
""" |
|
|
Initialize the planner with knowledge graph and embedding service. |
|
|
|
|
|
Args: |
|
|
kg: InMemoryKG instance for tool/prompt storage and similarity search |
|
|
embedder: EmbeddingService instance for generating query embeddings |
|
|
""" |
|
|
self.kg = kg |
|
|
self.embedder = embedder |
|
|
|
|
|
def construct_conceptual_sampling_request( |
|
|
self, |
|
|
plan: PlannedStep, |
|
|
task_context_text: str |
|
|
) -> dict[str, Any]: |
|
|
""" |
|
|
Constructs MCP sampling/createMessage request parameters based on KG-stored preferences. |
|
|
|
|
|
This method demonstrates MVP5's KG-informed model selection by extracting |
|
|
sampling preferences from the prompt in the planned step and building |
|
|
a valid MCP sampling request structure. |
|
|
|
|
|
Args: |
|
|
plan: PlannedStep containing tool and prompt with sampling preferences |
|
|
task_context_text: User's task context for the sampling request |
|
|
|
|
|
Returns: |
|
|
Dict containing MCP-compliant sampling/createMessage parameters |
|
|
|
|
|
Example: |
|
|
>>> plan = PlannedStep(tool=some_tool, prompt=some_prompt) |
|
|
>>> params = agent.construct_conceptual_sampling_request(plan, "Analyze this text") |
|
|
>>> # Returns: {"messages": [...], "maxTokens": 256, "modelPreferences": {...}} |
|
|
""" |
|
|
prompt_prefs = plan.prompt |
|
|
|
|
|
|
|
|
messages = [{ |
|
|
"role": "user", |
|
|
"content": { |
|
|
"type": "text", |
|
|
"text": task_context_text |
|
|
} |
|
|
}] |
|
|
|
|
|
|
|
|
sampling_params = { |
|
|
"messages": messages, |
|
|
"maxTokens": prompt_prefs.default_max_tokens_sampling or 256 |
|
|
} |
|
|
|
|
|
|
|
|
if prompt_prefs.default_system_prompt_hint: |
|
|
sampling_params["systemPrompt"] = prompt_prefs.default_system_prompt_hint |
|
|
|
|
|
|
|
|
if prompt_prefs.default_sampling_temperature is not None: |
|
|
sampling_params["temperature"] = prompt_prefs.default_sampling_temperature |
|
|
|
|
|
|
|
|
model_preferences = {} |
|
|
|
|
|
|
|
|
if prompt_prefs.preferred_model_hints: |
|
|
model_preferences["hints"] = [ |
|
|
{"name": hint} for hint in prompt_prefs.preferred_model_hints |
|
|
] |
|
|
|
|
|
|
|
|
priorities = {} |
|
|
if prompt_prefs.cost_priority_score is not None: |
|
|
priorities["cost"] = prompt_prefs.cost_priority_score |
|
|
if prompt_prefs.speed_priority_score is not None: |
|
|
priorities["speed"] = prompt_prefs.speed_priority_score |
|
|
if prompt_prefs.intelligence_priority_score is not None: |
|
|
priorities["intelligence"] = prompt_prefs.intelligence_priority_score |
|
|
|
|
|
if priorities: |
|
|
model_preferences["priorities"] = priorities |
|
|
|
|
|
|
|
|
if prompt_prefs.sampling_context_inclusion_hint: |
|
|
model_preferences["contextInclusion"] = prompt_prefs.sampling_context_inclusion_hint |
|
|
|
|
|
|
|
|
if model_preferences: |
|
|
sampling_params["modelPreferences"] = model_preferences |
|
|
|
|
|
|
|
|
sampling_params["_kgraph_mcp_metadata"] = { |
|
|
"tool_id": plan.tool.tool_id, |
|
|
"tool_name": plan.tool.name, |
|
|
"prompt_id": plan.prompt.prompt_id, |
|
|
"prompt_name": plan.prompt.name, |
|
|
"relevance_score": plan.relevance_score |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
f"Constructed sampling request for {plan.tool.name} using {plan.prompt.name} prompt. " |
|
|
f"Model hints: {prompt_prefs.preferred_model_hints}, " |
|
|
f"Priorities: cost={prompt_prefs.cost_priority_score}, " |
|
|
f"speed={prompt_prefs.speed_priority_score}, " |
|
|
f"intelligence={prompt_prefs.intelligence_priority_score}" |
|
|
) |
|
|
|
|
|
return sampling_params |
|
|
|
|
|
def generate_plan(self, user_query: str, top_k: int = 3) -> list[PlannedStep]: |
|
|
""" |
|
|
Generate a plan with tool+prompt combinations based on user query. |
|
|
|
|
|
This method: |
|
|
1. Validates the user query |
|
|
2. Generates an embedding for the query |
|
|
3. Finds similar tools using semantic search |
|
|
4. For each tool, finds the most relevant prompts |
|
|
5. Creates PlannedStep objects with relevance scores |
|
|
6. Returns ranked list of tool+prompt combinations |
|
|
|
|
|
Args: |
|
|
user_query: Natural language query from user |
|
|
top_k: Maximum number of planned steps to return (default: 3) |
|
|
|
|
|
Returns: |
|
|
List of PlannedStep objects, ordered by relevance. |
|
|
Returns empty list if query is invalid or no matches found. |
|
|
""" |
|
|
|
|
|
if not user_query or not user_query.strip(): |
|
|
logger.warning("Empty or whitespace-only query provided") |
|
|
return [] |
|
|
|
|
|
|
|
|
query_embedding = self.embedder.get_embedding(user_query) |
|
|
if query_embedding is None: |
|
|
logger.warning(f"Could not generate embedding for query: {user_query}") |
|
|
|
|
|
query_embedding = self._create_mock_query_embedding(user_query) |
|
|
if not query_embedding: |
|
|
return [] |
|
|
|
|
|
|
|
|
similar_tool_ids = self.kg.find_similar_tools( |
|
|
query_embedding, top_k=top_k * 2 |
|
|
) |
|
|
|
|
|
|
|
|
planned_steps: list[PlannedStep] = [] |
|
|
|
|
|
for tool_id in similar_tool_ids: |
|
|
tool = self.kg.get_tool_by_id(tool_id) |
|
|
if tool is None: |
|
|
continue |
|
|
|
|
|
|
|
|
tool_prompts = self.kg.find_similar_prompts_for_tool( |
|
|
query_embedding, tool_id, top_k=2 |
|
|
) |
|
|
|
|
|
|
|
|
if not tool_prompts: |
|
|
all_tool_prompts = self.kg.find_prompts_by_tool_id(tool_id) |
|
|
if all_tool_prompts: |
|
|
|
|
|
tool_prompts = [all_tool_prompts[0].prompt_id] |
|
|
|
|
|
|
|
|
for prompt_id in tool_prompts: |
|
|
prompt = self.kg.get_prompt_by_id(prompt_id) |
|
|
if prompt is None: |
|
|
continue |
|
|
|
|
|
try: |
|
|
|
|
|
relevance_score = self._calculate_relevance_score( |
|
|
query_embedding, tool, prompt |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=tool, prompt=prompt, relevance_score=relevance_score |
|
|
) |
|
|
planned_steps.append(planned_step) |
|
|
|
|
|
except ValueError as e: |
|
|
logger.warning( |
|
|
f"Could not create PlannedStep for tool {tool_id} and prompt {prompt_id}: {e}" |
|
|
) |
|
|
continue |
|
|
|
|
|
|
|
|
min_relevance_threshold = 0.1 |
|
|
filtered_steps = [ |
|
|
step for step in planned_steps |
|
|
if step.relevance_score is not None and step.relevance_score >= min_relevance_threshold |
|
|
] |
|
|
|
|
|
|
|
|
filtered_steps.sort(key=lambda x: x.relevance_score or 0.0, reverse=True) |
|
|
final_steps = filtered_steps[:top_k] |
|
|
|
|
|
|
|
|
if final_steps: |
|
|
step_summaries = [step.summary for step in final_steps] |
|
|
scores_summary = [f"{step.relevance_score:.3f}" for step in final_steps] |
|
|
logger.info( |
|
|
f"Planner generated {len(final_steps)} planned steps (scores: {scores_summary}): " |
|
|
f"{step_summaries} for query: '{user_query}'" |
|
|
) |
|
|
elif planned_steps: |
|
|
logger.info( |
|
|
f"Planner filtered out all {len(planned_steps)} planned steps below " |
|
|
f"relevance threshold {min_relevance_threshold} for query: '{user_query}'" |
|
|
) |
|
|
else: |
|
|
logger.info( |
|
|
f"Planner could not generate any planned steps for query: '{user_query}'" |
|
|
) |
|
|
|
|
|
return final_steps |
|
|
|
|
|
def _calculate_relevance_score( |
|
|
self, query_embedding: list[float], tool: MCPTool, prompt: MCPPrompt |
|
|
) -> float: |
|
|
""" |
|
|
Calculate relevance score for a tool+prompt combination using actual similarity scores. |
|
|
|
|
|
This combines: |
|
|
1. Tool semantic similarity to query (actual cosine similarity) |
|
|
2. Prompt semantic similarity to query (actual cosine similarity) |
|
|
3. Difficulty level weighting (simpler prompts ranked higher for ambiguous queries) |
|
|
|
|
|
Args: |
|
|
query_embedding: Embedding vector for the user query |
|
|
tool: MCPTool instance |
|
|
prompt: MCPPrompt instance |
|
|
|
|
|
Returns: |
|
|
Relevance score between 0.0 and 1.0 |
|
|
""" |
|
|
try: |
|
|
|
|
|
tool_similarities_with_scores = self.kg.find_similar_tools_with_scores( |
|
|
query_embedding, top_k=50, min_similarity=0.0 |
|
|
) |
|
|
|
|
|
tool_score = 0.0 |
|
|
for tool_id, similarity in tool_similarities_with_scores: |
|
|
if tool_id == tool.tool_id: |
|
|
tool_score = similarity |
|
|
break |
|
|
|
|
|
|
|
|
prompt_similarities_with_scores = self.kg.find_similar_prompts_with_scores( |
|
|
query_embedding, top_k=50, min_similarity=0.0 |
|
|
) |
|
|
|
|
|
prompt_score = 0.0 |
|
|
for prompt_id, similarity in prompt_similarities_with_scores: |
|
|
if prompt_id == prompt.prompt_id: |
|
|
prompt_score = similarity |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
difficulty_bonus = { |
|
|
"beginner": 0.05, |
|
|
"intermediate": 0.02, |
|
|
"advanced": 0.0, |
|
|
}.get(prompt.difficulty_level, 0.0) |
|
|
|
|
|
|
|
|
|
|
|
tool_score = max(0.0, tool_score) |
|
|
prompt_score = max(0.0, prompt_score) |
|
|
|
|
|
relevance_score = ( |
|
|
(0.7 * tool_score) + (0.25 * prompt_score) + difficulty_bonus |
|
|
) |
|
|
|
|
|
|
|
|
final_score = min(1.0, relevance_score) |
|
|
|
|
|
|
|
|
logger.debug( |
|
|
f"Relevance score for {tool.tool_id}+{prompt.prompt_id}: " |
|
|
f"tool_sim={tool_score:.3f}, prompt_sim={prompt_score:.3f}, " |
|
|
f"difficulty_bonus={difficulty_bonus:.3f}, final={final_score:.3f}" |
|
|
) |
|
|
|
|
|
return final_score |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error calculating relevance score: {e}") |
|
|
return 0.0 |
|
|
|
|
|
def suggest_tools(self, user_query: str, top_k: int = 3) -> list[MCPTool]: |
|
|
""" |
|
|
Suggest relevant tools based on user query using semantic similarity. |
|
|
|
|
|
DEPRECATED: This method is maintained for backward compatibility. |
|
|
New code should use generate_plan() instead. |
|
|
|
|
|
This method: |
|
|
1. Validates the user query |
|
|
2. Generates an embedding for the query |
|
|
3. Finds similar tools using the knowledge graph |
|
|
4. Retrieves and returns the actual MCPTool objects |
|
|
|
|
|
Args: |
|
|
user_query: Natural language query from user |
|
|
top_k: Maximum number of tools to suggest (default: 3) |
|
|
|
|
|
Returns: |
|
|
List of relevant MCPTool objects, ordered by relevance. |
|
|
Returns empty list if query is invalid or no tools found. |
|
|
""" |
|
|
|
|
|
if not user_query or not user_query.strip(): |
|
|
logger.warning("Empty or whitespace-only query provided") |
|
|
return [] |
|
|
|
|
|
|
|
|
query_embedding = self.embedder.get_embedding(user_query) |
|
|
if query_embedding is None: |
|
|
logger.warning(f"Could not generate embedding for query: {user_query}") |
|
|
|
|
|
query_embedding = self._create_mock_query_embedding(user_query) |
|
|
if not query_embedding: |
|
|
return [] |
|
|
|
|
|
|
|
|
similar_tool_ids = self.kg.find_similar_tools(query_embedding, top_k=top_k) |
|
|
|
|
|
|
|
|
suggested_tools: list[MCPTool] = [] |
|
|
for tool_id in similar_tool_ids: |
|
|
tool = self.kg.get_tool_by_id(tool_id) |
|
|
if tool is not None: |
|
|
suggested_tools.append(tool) |
|
|
|
|
|
|
|
|
logger.info( |
|
|
f"Planner suggested tools: {[t.name for t in suggested_tools]} for query: '{user_query}'" |
|
|
) |
|
|
return suggested_tools |
|
|
|
|
|
def _create_mock_query_embedding(self, query: str) -> list[float]: |
|
|
""" |
|
|
Create a mock embedding for a query when real embeddings aren't available. |
|
|
|
|
|
This is a simple fallback for demo purposes that creates embeddings |
|
|
based on basic text characteristics. |
|
|
|
|
|
Args: |
|
|
query: The user query string |
|
|
|
|
|
Returns: |
|
|
Mock embedding vector as list of floats |
|
|
""" |
|
|
|
|
|
mock_embedding = [] |
|
|
|
|
|
|
|
|
words = query.lower().split() |
|
|
mock_embedding.extend([float(len(query)) / 100.0]) |
|
|
mock_embedding.extend([float(len(words)) / 10.0]) |
|
|
|
|
|
|
|
|
key_words = [ |
|
|
"text", |
|
|
"sentiment", |
|
|
"image", |
|
|
"code", |
|
|
"analyze", |
|
|
"summarize", |
|
|
"caption", |
|
|
"lint", |
|
|
] |
|
|
for word in key_words: |
|
|
if word in query.lower(): |
|
|
mock_embedding.append(0.8) |
|
|
else: |
|
|
mock_embedding.append(0.1) |
|
|
|
|
|
|
|
|
while len(mock_embedding) < 10: |
|
|
mock_embedding.append(0.1) |
|
|
|
|
|
logger.debug(f"Created mock embedding for query: '{query[:30]}...'") |
|
|
return mock_embedding |
|
|
|