File size: 16,224 Bytes
1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a 64ced8b 1f2d50a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
"""SimplePlannerAgent implementation for KGraph-MCP."""
import logging
from typing import Any
from kg_services.embedder import EmbeddingService
from kg_services.knowledge_graph import InMemoryKG
from kg_services.ontology import MCPPrompt, MCPTool, PlannedStep
logger = logging.getLogger(__name__)
class SimplePlannerAgent:
"""
A simplified planner agent that suggests tools and prompts based on user queries
using semantic similarity search.
This agent orchestrates the EmbeddingService and InMemoryKG to process
natural language queries and return relevant MCPTool+MCPPrompt combinations
as PlannedStep objects.
"""
def __init__(self, kg: InMemoryKG, embedder: EmbeddingService) -> None:
"""
Initialize the planner with knowledge graph and embedding service.
Args:
kg: InMemoryKG instance for tool/prompt storage and similarity search
embedder: EmbeddingService instance for generating query embeddings
"""
self.kg = kg
self.embedder = embedder
def construct_conceptual_sampling_request(
self,
plan: PlannedStep,
task_context_text: str
) -> dict[str, Any]:
"""
Constructs MCP sampling/createMessage request parameters based on KG-stored preferences.
This method demonstrates MVP5's KG-informed model selection by extracting
sampling preferences from the prompt in the planned step and building
a valid MCP sampling request structure.
Args:
plan: PlannedStep containing tool and prompt with sampling preferences
task_context_text: User's task context for the sampling request
Returns:
Dict containing MCP-compliant sampling/createMessage parameters
Example:
>>> plan = PlannedStep(tool=some_tool, prompt=some_prompt)
>>> params = agent.construct_conceptual_sampling_request(plan, "Analyze this text")
>>> # Returns: {"messages": [...], "maxTokens": 256, "modelPreferences": {...}}
"""
prompt_prefs = plan.prompt
# Build messages array (required field)
messages = [{
"role": "user",
"content": {
"type": "text",
"text": task_context_text
}
}]
# Start with required fields
sampling_params = {
"messages": messages,
"maxTokens": prompt_prefs.default_max_tokens_sampling or 256
}
# Add system prompt if available
if prompt_prefs.default_system_prompt_hint:
sampling_params["systemPrompt"] = prompt_prefs.default_system_prompt_hint
# Add sampling temperature if specified
if prompt_prefs.default_sampling_temperature is not None:
sampling_params["temperature"] = prompt_prefs.default_sampling_temperature
# Build model preferences object if any preferences exist
model_preferences = {}
# Add model hints
if prompt_prefs.preferred_model_hints:
model_preferences["hints"] = [
{"name": hint} for hint in prompt_prefs.preferred_model_hints
]
# Add priority scores
priorities = {}
if prompt_prefs.cost_priority_score is not None:
priorities["cost"] = prompt_prefs.cost_priority_score
if prompt_prefs.speed_priority_score is not None:
priorities["speed"] = prompt_prefs.speed_priority_score
if prompt_prefs.intelligence_priority_score is not None:
priorities["intelligence"] = prompt_prefs.intelligence_priority_score
if priorities:
model_preferences["priorities"] = priorities
# Add context inclusion preference
if prompt_prefs.sampling_context_inclusion_hint:
model_preferences["contextInclusion"] = prompt_prefs.sampling_context_inclusion_hint
# Only add modelPreferences if we have any preferences
if model_preferences:
sampling_params["modelPreferences"] = model_preferences
# Add metadata for debugging/logging
sampling_params["_kgraph_mcp_metadata"] = {
"tool_id": plan.tool.tool_id,
"tool_name": plan.tool.name,
"prompt_id": plan.prompt.prompt_id,
"prompt_name": plan.prompt.name,
"relevance_score": plan.relevance_score
}
logger.info(
f"Constructed sampling request for {plan.tool.name} using {plan.prompt.name} prompt. "
f"Model hints: {prompt_prefs.preferred_model_hints}, "
f"Priorities: cost={prompt_prefs.cost_priority_score}, "
f"speed={prompt_prefs.speed_priority_score}, "
f"intelligence={prompt_prefs.intelligence_priority_score}"
)
return sampling_params
def generate_plan(self, user_query: str, top_k: int = 3) -> list[PlannedStep]:
"""
Generate a plan with tool+prompt combinations based on user query.
This method:
1. Validates the user query
2. Generates an embedding for the query
3. Finds similar tools using semantic search
4. For each tool, finds the most relevant prompts
5. Creates PlannedStep objects with relevance scores
6. Returns ranked list of tool+prompt combinations
Args:
user_query: Natural language query from user
top_k: Maximum number of planned steps to return (default: 3)
Returns:
List of PlannedStep objects, ordered by relevance.
Returns empty list if query is invalid or no matches found.
"""
# Handle empty or whitespace-only queries
if not user_query or not user_query.strip():
logger.warning("Empty or whitespace-only query provided")
return []
# Get embedding for the user query
query_embedding = self.embedder.get_embedding(user_query)
if query_embedding is None:
logger.warning(f"Could not generate embedding for query: {user_query}")
# For demo purposes, create a simple mock embedding for the query
query_embedding = self._create_mock_query_embedding(user_query)
if not query_embedding:
return []
# Phase 1: Find similar tools
similar_tool_ids = self.kg.find_similar_tools(
query_embedding, top_k=top_k * 2
) # Get more tools initially
# Phase 2: For each tool, find relevant prompts and create PlannedSteps
planned_steps: list[PlannedStep] = []
for tool_id in similar_tool_ids:
tool = self.kg.get_tool_by_id(tool_id)
if tool is None:
continue
# Find prompts for this tool using semantic similarity
tool_prompts = self.kg.find_similar_prompts_for_tool(
query_embedding, tool_id, top_k=2
)
# If no specific prompts found for this tool, try general prompts
if not tool_prompts:
all_tool_prompts = self.kg.find_prompts_by_tool_id(tool_id)
if all_tool_prompts:
# Take the first available prompt as fallback
tool_prompts = [all_tool_prompts[0].prompt_id]
# Create PlannedStep for each relevant prompt
for prompt_id in tool_prompts:
prompt = self.kg.get_prompt_by_id(prompt_id)
if prompt is None:
continue
try:
# Calculate relevance score based on semantic similarity
relevance_score = self._calculate_relevance_score(
query_embedding, tool, prompt
)
planned_step = PlannedStep(
tool=tool, prompt=prompt, relevance_score=relevance_score
)
planned_steps.append(planned_step)
except ValueError as e:
logger.warning(
f"Could not create PlannedStep for tool {tool_id} and prompt {prompt_id}: {e}"
)
continue
# Phase 3: Filter by minimum relevance threshold and sort
min_relevance_threshold = 0.1 # Filter out very low relevance results
filtered_steps = [
step for step in planned_steps
if step.relevance_score is not None and step.relevance_score >= min_relevance_threshold
]
# Sort by relevance score and return top_k
filtered_steps.sort(key=lambda x: x.relevance_score or 0.0, reverse=True)
final_steps = filtered_steps[:top_k]
# Log the planning results
if final_steps:
step_summaries = [step.summary for step in final_steps]
scores_summary = [f"{step.relevance_score:.3f}" for step in final_steps]
logger.info(
f"Planner generated {len(final_steps)} planned steps (scores: {scores_summary}): "
f"{step_summaries} for query: '{user_query}'"
)
elif planned_steps:
logger.info(
f"Planner filtered out all {len(planned_steps)} planned steps below "
f"relevance threshold {min_relevance_threshold} for query: '{user_query}'"
)
else:
logger.info(
f"Planner could not generate any planned steps for query: '{user_query}'"
)
return final_steps
def _calculate_relevance_score(
self, query_embedding: list[float], tool: MCPTool, prompt: MCPPrompt
) -> float:
"""
Calculate relevance score for a tool+prompt combination using actual similarity scores.
This combines:
1. Tool semantic similarity to query (actual cosine similarity)
2. Prompt semantic similarity to query (actual cosine similarity)
3. Difficulty level weighting (simpler prompts ranked higher for ambiguous queries)
Args:
query_embedding: Embedding vector for the user query
tool: MCPTool instance
prompt: MCPPrompt instance
Returns:
Relevance score between 0.0 and 1.0
"""
try:
# Get actual tool similarity score
tool_similarities_with_scores = self.kg.find_similar_tools_with_scores(
query_embedding, top_k=50, min_similarity=0.0 # Get all to find specific tool
)
tool_score = 0.0
for tool_id, similarity in tool_similarities_with_scores:
if tool_id == tool.tool_id:
tool_score = similarity
break
# Get actual prompt similarity score
prompt_similarities_with_scores = self.kg.find_similar_prompts_with_scores(
query_embedding, top_k=50, min_similarity=0.0 # Get all to find specific prompt
)
prompt_score = 0.0
for prompt_id, similarity in prompt_similarities_with_scores:
if prompt_id == prompt.prompt_id:
prompt_score = similarity
break
# Difficulty bonus (beginner=0.05, intermediate=0.02, advanced=0.0)
# Reduced bonus to prevent inflation of scores
difficulty_bonus = {
"beginner": 0.05,
"intermediate": 0.02,
"advanced": 0.0,
}.get(prompt.difficulty_level, 0.0)
# Weighted combination using actual similarity scores
# Normalize cosine similarity from [-1, 1] to [0, 1] if needed
tool_score = max(0.0, tool_score)
prompt_score = max(0.0, prompt_score)
relevance_score = (
(0.7 * tool_score) + (0.25 * prompt_score) + difficulty_bonus
)
# Cap at 1.0 and apply minimum threshold
final_score = min(1.0, relevance_score)
# Log score calculation for debugging
logger.debug(
f"Relevance score for {tool.tool_id}+{prompt.prompt_id}: "
f"tool_sim={tool_score:.3f}, prompt_sim={prompt_score:.3f}, "
f"difficulty_bonus={difficulty_bonus:.3f}, final={final_score:.3f}"
)
return final_score
except Exception as e:
logger.warning(f"Error calculating relevance score: {e}")
return 0.0 # Return 0.0 for failed calculations instead of 0.5
def suggest_tools(self, user_query: str, top_k: int = 3) -> list[MCPTool]:
"""
Suggest relevant tools based on user query using semantic similarity.
DEPRECATED: This method is maintained for backward compatibility.
New code should use generate_plan() instead.
This method:
1. Validates the user query
2. Generates an embedding for the query
3. Finds similar tools using the knowledge graph
4. Retrieves and returns the actual MCPTool objects
Args:
user_query: Natural language query from user
top_k: Maximum number of tools to suggest (default: 3)
Returns:
List of relevant MCPTool objects, ordered by relevance.
Returns empty list if query is invalid or no tools found.
"""
# Handle empty or whitespace-only queries
if not user_query or not user_query.strip():
logger.warning("Empty or whitespace-only query provided")
return []
# Get embedding for the user query
query_embedding = self.embedder.get_embedding(user_query)
if query_embedding is None:
logger.warning(f"Could not generate embedding for query: {user_query}")
# For demo purposes, create a simple mock embedding for the query
query_embedding = self._create_mock_query_embedding(user_query)
if not query_embedding:
return []
# Find similar tools using the knowledge graph
similar_tool_ids = self.kg.find_similar_tools(query_embedding, top_k=top_k)
# Retrieve actual MCPTool objects
suggested_tools: list[MCPTool] = []
for tool_id in similar_tool_ids:
tool = self.kg.get_tool_by_id(tool_id)
if tool is not None:
suggested_tools.append(tool)
# Log the suggestion for observability
logger.info(
f"Planner suggested tools: {[t.name for t in suggested_tools]} for query: '{user_query}'"
)
return suggested_tools
def _create_mock_query_embedding(self, query: str) -> list[float]:
"""
Create a mock embedding for a query when real embeddings aren't available.
This is a simple fallback for demo purposes that creates embeddings
based on basic text characteristics.
Args:
query: The user query string
Returns:
Mock embedding vector as list of floats
"""
# Create simple mock embedding based on query characteristics
mock_embedding = []
# Add dimensions based on query length and word count
words = query.lower().split()
mock_embedding.extend([float(len(query)) / 100.0])
mock_embedding.extend([float(len(words)) / 10.0])
# Add some values based on key words that might match tool tags
key_words = [
"text",
"sentiment",
"image",
"code",
"analyze",
"summarize",
"caption",
"lint",
]
for word in key_words:
if word in query.lower():
mock_embedding.append(0.8) # High similarity for matching keywords
else:
mock_embedding.append(0.1) # Low similarity for non-matching
# Pad to consistent length (same as mock tool embeddings)
while len(mock_embedding) < 10:
mock_embedding.append(0.1)
logger.debug(f"Created mock embedding for query: '{query[:30]}...'")
return mock_embedding
|