|
|
|
|
|
"""End-to-End Testing for Real MCP Execution Flow. |
|
|
|
|
|
This module implements comprehensive end-to-end testing for Task 65: MVP4 Sprint 3 - End to End Testing. |
|
|
It covers the complete flow from user query to real MCP tool execution, including: |
|
|
|
|
|
- Full workflow: Query → Planning → Input Collection → Real MCP Execution |
|
|
- Live MCP server communication testing |
|
|
- Error handling and fallback scenarios |
|
|
- Performance and reliability testing |
|
|
- Integration testing across all system components |
|
|
|
|
|
This test suite validates that the complete KGraph-MCP system works end-to-end |
|
|
with real MCP servers running on localhost. |
|
|
""" |
|
|
|
|
|
import time |
|
|
from typing import Any |
|
|
from unittest.mock import Mock, patch |
|
|
|
|
|
import pytest |
|
|
import requests |
|
|
from fastapi.testclient import TestClient |
|
|
|
|
|
from agents.executor import McpExecutorAgent |
|
|
from app import app, app_with_ui, initialize_agent_system |
|
|
from kg_services.ontology import MCPPrompt, MCPTool, PlannedStep |
|
|
|
|
|
|
|
|
class TestE2EMcpExecutionFlow: |
|
|
"""Test complete end-to-end MCP execution workflows.""" |
|
|
|
|
|
@pytest.fixture |
|
|
def client(self): |
|
|
"""Provide test client with initialized system.""" |
|
|
return TestClient(app) |
|
|
|
|
|
@pytest.fixture |
|
|
def initialized_system(self): |
|
|
"""Provide fully initialized system with real agents.""" |
|
|
import app as app_module |
|
|
|
|
|
planner, executor = initialize_agent_system() |
|
|
app_module.planner_agent = planner |
|
|
app_module.executor_agent = executor |
|
|
|
|
|
return { |
|
|
"planner": planner, |
|
|
"executor": executor, |
|
|
"client": TestClient(app_with_ui) |
|
|
} |
|
|
|
|
|
@pytest.fixture |
|
|
def mcp_executor(self): |
|
|
"""Provide McpExecutorAgent for direct testing.""" |
|
|
return McpExecutorAgent() |
|
|
|
|
|
def test_complete_sentiment_analysis_mcp_flow(self, initialized_system): |
|
|
"""Test complete flow: query → plan → real MCP execution for sentiment analysis.""" |
|
|
client = initialized_system["client"] |
|
|
|
|
|
|
|
|
if initialized_system["planner"] is None: |
|
|
pytest.skip("Agent system not initialized - missing data files or API keys") |
|
|
|
|
|
|
|
|
plan_request = { |
|
|
"query": "I need to analyze the sentiment of customer feedback about our new product", |
|
|
"top_k": 3 |
|
|
} |
|
|
|
|
|
response = client.post("/api/plan/generate", json=plan_request) |
|
|
|
|
|
|
|
|
if response.status_code == 503: |
|
|
pytest.skip("Agent system not available - check initialization and API keys") |
|
|
|
|
|
assert response.status_code == 200 |
|
|
|
|
|
plan_data = response.json() |
|
|
|
|
|
|
|
|
if "detail" in plan_data: |
|
|
pytest.skip(f"System error: {plan_data['detail']}") |
|
|
|
|
|
assert plan_data["status"] == "success" |
|
|
assert len(plan_data["planned_steps"]) > 0 |
|
|
|
|
|
|
|
|
sentiment_step = None |
|
|
for step in plan_data["planned_steps"]: |
|
|
tool_name = step["tool"]["name"].lower() |
|
|
tool_desc = step["tool"]["description"].lower() |
|
|
if "sentiment" in tool_name or "sentiment" in tool_desc: |
|
|
sentiment_step = step |
|
|
break |
|
|
|
|
|
assert sentiment_step is not None, "No sentiment analysis tool found in plan" |
|
|
|
|
|
|
|
|
tool_info = sentiment_step["tool"] |
|
|
assert tool_info.get("execution_type") == "remote_mcp_gradio" |
|
|
assert tool_info.get("mcp_endpoint_url") is not None |
|
|
|
|
|
|
|
|
if self._is_mcp_server_available(tool_info["mcp_endpoint_url"]): |
|
|
|
|
|
planned_step = self._create_planned_step_from_api_response(sentiment_step) |
|
|
|
|
|
|
|
|
test_inputs = { |
|
|
"input_text": "This new product is absolutely amazing! I love how easy it is to use and the quality is outstanding. Highly recommend!" |
|
|
} |
|
|
|
|
|
|
|
|
executor = initialized_system["executor"] |
|
|
if hasattr(executor, "execute_plan_step"): |
|
|
result = executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] == "success_live_mcp" |
|
|
assert "tool_specific_output" in result |
|
|
assert result["execution_mode"] == "live_mcp" |
|
|
assert "mcp_endpoint" in result |
|
|
|
|
|
|
|
|
output = result["tool_specific_output"] |
|
|
assert output is not None |
|
|
assert len(output) > 0 |
|
|
|
|
|
print(f"✅ Real MCP Sentiment Analysis Result: {output[:200]}...") |
|
|
else: |
|
|
pytest.skip("Executor doesn't support real MCP execution") |
|
|
else: |
|
|
pytest.skip(f"MCP server not available at {tool_info['mcp_endpoint_url']}") |
|
|
|
|
|
def test_complete_text_summarization_mcp_flow(self, initialized_system): |
|
|
"""Test complete flow: query → plan → real MCP execution for text summarization.""" |
|
|
client = initialized_system["client"] |
|
|
|
|
|
if initialized_system["planner"] is None: |
|
|
pytest.skip("Agent system not initialized") |
|
|
|
|
|
|
|
|
plan_request = { |
|
|
"query": "I need to summarize a long technical document for my team meeting", |
|
|
"top_k": 3 |
|
|
} |
|
|
|
|
|
response = client.post("/api/plan/generate", json=plan_request) |
|
|
|
|
|
|
|
|
if response.status_code == 503: |
|
|
pytest.skip("Agent system not available - check initialization and API keys") |
|
|
|
|
|
assert response.status_code == 200 |
|
|
|
|
|
plan_data = response.json() |
|
|
|
|
|
|
|
|
if "detail" in plan_data: |
|
|
pytest.skip(f"System error: {plan_data['detail']}") |
|
|
|
|
|
assert plan_data["status"] == "success" |
|
|
|
|
|
|
|
|
summarizer_step = None |
|
|
for step in plan_data["planned_steps"]: |
|
|
tool_name = step["tool"]["name"].lower() |
|
|
tool_desc = step["tool"]["description"].lower() |
|
|
if "summar" in tool_name or "summar" in tool_desc: |
|
|
summarizer_step = step |
|
|
break |
|
|
|
|
|
assert summarizer_step is not None, "No text summarization tool found in plan" |
|
|
|
|
|
|
|
|
tool_info = summarizer_step["tool"] |
|
|
if (tool_info.get("execution_type") == "remote_mcp_gradio" and |
|
|
self._is_mcp_server_available(tool_info["mcp_endpoint_url"])): |
|
|
|
|
|
planned_step = self._create_planned_step_from_api_response(summarizer_step) |
|
|
|
|
|
|
|
|
long_text = """ |
|
|
Artificial Intelligence (AI) has revolutionized numerous industries and continues to shape the future of technology. |
|
|
Machine learning algorithms, particularly deep learning neural networks, have achieved remarkable breakthroughs in |
|
|
computer vision, natural language processing, and predictive analytics. These advancements have enabled applications |
|
|
ranging from autonomous vehicles and medical diagnosis to personalized recommendations and automated customer service. |
|
|
|
|
|
The integration of AI into business processes has led to increased efficiency, reduced costs, and improved decision-making |
|
|
capabilities. Companies across various sectors are leveraging AI to optimize operations, enhance customer experiences, |
|
|
and gain competitive advantages. However, the rapid adoption of AI also raises important considerations regarding ethics, |
|
|
privacy, and the future of work. |
|
|
|
|
|
As AI technology continues to evolve, it is crucial for organizations to develop comprehensive strategies for responsible |
|
|
AI implementation, ensuring that these powerful tools are used to benefit society while mitigating potential risks and |
|
|
challenges. The future of AI promises even more sophisticated applications and transformative impacts across all aspects |
|
|
of human life and business operations. |
|
|
""" |
|
|
|
|
|
test_inputs = { |
|
|
"text": long_text.strip(), |
|
|
"max_length": "100", |
|
|
"min_length": "50" |
|
|
} |
|
|
|
|
|
executor = initialized_system["executor"] |
|
|
if hasattr(executor, "execute_plan_step"): |
|
|
result = executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
if result["status"] == "success_live_mcp": |
|
|
|
|
|
assert "tool_specific_output" in result |
|
|
assert result["execution_mode"] == "live_mcp" |
|
|
output = result["tool_specific_output"] |
|
|
assert output is not None |
|
|
assert len(output) > 0 |
|
|
print(f"✅ Real MCP Summarization Result: {output[:200]}...") |
|
|
elif result["status"] == "simulated_success": |
|
|
|
|
|
assert "tool_specific_output" in result |
|
|
assert result["execution_mode"] == "simulated" |
|
|
output = result["tool_specific_output"] |
|
|
assert output is not None |
|
|
assert len(output) > 0 |
|
|
print(f"✅ Simulated Summarization Result (cold start fallback): {output[:200]}...") |
|
|
elif result["status"].startswith("error_live_mcp") and "timeout" in result.get("message", "").lower(): |
|
|
|
|
|
print(f"✅ Expected HuggingFace Space cold start timeout: {result.get('message', 'timeout')[:100]}...") |
|
|
pytest.skip("HuggingFace Space experiencing cold start timeout - normal for serverless deployments") |
|
|
else: |
|
|
|
|
|
pytest.fail(f"Unexpected execution result: {result['status']} - {result.get('message', 'No message')}") |
|
|
|
|
|
|
|
|
output = result["tool_specific_output"] |
|
|
assert len(output) < len(long_text) |
|
|
else: |
|
|
pytest.skip("Executor doesn't support real MCP execution") |
|
|
else: |
|
|
pytest.skip("MCP server not available or not configured for real execution") |
|
|
|
|
|
def test_mcp_execution_error_handling(self, mcp_executor): |
|
|
"""Test error handling in real MCP execution scenarios.""" |
|
|
|
|
|
invalid_tool = MCPTool( |
|
|
tool_id="test_invalid", |
|
|
name="Invalid Test Tool", |
|
|
description="Tool for testing error handling", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url="http://localhost:9999/invalid", |
|
|
timeout_seconds=5 |
|
|
) |
|
|
|
|
|
test_prompt = MCPPrompt( |
|
|
prompt_id="test_prompt", |
|
|
name="Test Prompt", |
|
|
description="Test prompt", |
|
|
target_tool_id="test_invalid", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=invalid_tool, |
|
|
prompt=test_prompt, |
|
|
relevance_score=0.9 |
|
|
) |
|
|
|
|
|
test_inputs = {"input": "test data"} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"].startswith("error_") |
|
|
assert "error_information" in result |
|
|
assert "recovery_suggestions" in result["error_information"] |
|
|
assert result["execution_mode"] == "live_mcp_failed" |
|
|
|
|
|
|
|
|
error_info = result["error_information"] |
|
|
assert error_info["error_category"] in ["network", "server_error", "connection"] |
|
|
assert isinstance(error_info["recovery_suggestions"], list) |
|
|
assert len(error_info["recovery_suggestions"]) > 0 |
|
|
|
|
|
def test_mcp_execution_timeout_handling(self, mcp_executor): |
|
|
"""Test timeout handling in MCP execution.""" |
|
|
|
|
|
timeout_tool = MCPTool( |
|
|
tool_id="test_timeout", |
|
|
name="Timeout Test Tool", |
|
|
description="Tool for testing timeout handling", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url="http://httpbin.org/delay/10", |
|
|
timeout_seconds=1 |
|
|
) |
|
|
|
|
|
test_prompt = MCPPrompt( |
|
|
prompt_id="timeout_prompt", |
|
|
name="Timeout Test Prompt", |
|
|
description="Test timeout prompt", |
|
|
target_tool_id="test_timeout", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=timeout_tool, |
|
|
prompt=test_prompt, |
|
|
relevance_score=0.9 |
|
|
) |
|
|
|
|
|
test_inputs = {"input": "test data"} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] == "error_live_mcp_timeout" |
|
|
assert "timeout" in result["message"].lower() |
|
|
assert result["error_information"]["error_category"] == "network" |
|
|
|
|
|
|
|
|
suggestions = result["error_information"]["recovery_suggestions"] |
|
|
assert any("timeout" in suggestion.lower() for suggestion in suggestions) |
|
|
|
|
|
def test_mcp_execution_retry_mechanism(self, mcp_executor): |
|
|
"""Test retry mechanism for MCP execution failures.""" |
|
|
with patch("requests.Session.post") as mock_post: |
|
|
|
|
|
mock_response_fail = Mock() |
|
|
mock_response_fail.raise_for_status.side_effect = requests.exceptions.HTTPError( |
|
|
response=Mock(status_code=503, text="Service Unavailable") |
|
|
) |
|
|
mock_response_fail.status_code = 503 |
|
|
mock_response_fail.text = "Service Unavailable" |
|
|
|
|
|
mock_response_success = Mock() |
|
|
mock_response_success.raise_for_status.return_value = None |
|
|
mock_response_success.json.return_value = {"data": ["Success after retry"]} |
|
|
mock_response_success.status_code = 200 |
|
|
|
|
|
|
|
|
mock_post.side_effect = [ |
|
|
mock_response_fail, |
|
|
mock_response_fail, |
|
|
mock_response_success |
|
|
] |
|
|
|
|
|
|
|
|
retry_tool = MCPTool( |
|
|
tool_id="test_retry", |
|
|
name="Retry Test Tool", |
|
|
description="Tool for testing retry mechanism", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url="http://localhost:7860/test", |
|
|
timeout_seconds=30 |
|
|
) |
|
|
|
|
|
test_prompt = MCPPrompt( |
|
|
prompt_id="retry_prompt", |
|
|
name="Retry Test Prompt", |
|
|
description="Test retry prompt", |
|
|
target_tool_id="test_retry", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=retry_tool, |
|
|
prompt=test_prompt, |
|
|
relevance_score=0.9 |
|
|
) |
|
|
|
|
|
test_inputs = {"input": "test data"} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] == "success_live_mcp" |
|
|
assert result["attempts_made"] == 3 |
|
|
assert mock_post.call_count == 3 |
|
|
|
|
|
def test_fallback_to_simulation(self, mcp_executor): |
|
|
"""Test fallback to simulation when MCP execution fails.""" |
|
|
|
|
|
sim_tool = MCPTool( |
|
|
tool_id="test_simulation", |
|
|
name="Simulation Test Tool", |
|
|
description="Tool for testing simulation fallback", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
test_prompt = MCPPrompt( |
|
|
prompt_id="sim_prompt", |
|
|
name="Simulation Test Prompt", |
|
|
description="Test simulation prompt", |
|
|
target_tool_id="test_simulation", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=sim_tool, |
|
|
prompt=test_prompt, |
|
|
relevance_score=0.9 |
|
|
) |
|
|
|
|
|
test_inputs = {"input": "test data"} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] in ["simulated_success", "simulated_error_timeout", "simulated_error_rate_limit", |
|
|
"simulated_error_invalid_input", "simulated_error_service_unavailable", |
|
|
"simulated_error_authentication_failed"] |
|
|
assert result["execution_mode"] in ["simulated", "simulated_error"] |
|
|
|
|
|
|
|
|
if result["status"] == "simulated_success": |
|
|
assert "tool_specific_output" in result |
|
|
assert result["tool_specific_output"] is not None |
|
|
|
|
|
def test_input_parameter_ordering(self, mcp_executor): |
|
|
"""Test that input parameters are correctly ordered for MCP calls.""" |
|
|
with patch("requests.Session.post") as mock_post: |
|
|
mock_response = Mock() |
|
|
mock_response.raise_for_status.return_value = None |
|
|
mock_response.json.return_value = {"data": ["Parameter order test result"]} |
|
|
mock_post.return_value = mock_response |
|
|
|
|
|
|
|
|
ordered_tool = MCPTool( |
|
|
tool_id="test_order", |
|
|
name="Parameter Order Test Tool", |
|
|
description="Tool for testing parameter ordering", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url="http://localhost:7860/test", |
|
|
input_parameter_order=["text", "max_length", "min_length"], |
|
|
timeout_seconds=30 |
|
|
) |
|
|
|
|
|
test_prompt = MCPPrompt( |
|
|
prompt_id="order_prompt", |
|
|
name="Order Test Prompt", |
|
|
description="Test parameter order prompt", |
|
|
target_tool_id="test_order", |
|
|
template_string="Summarize: {{text}} with max {{max_length}} and min {{min_length}}", |
|
|
input_variables=["text", "max_length", "min_length"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=ordered_tool, |
|
|
prompt=test_prompt, |
|
|
relevance_score=0.9 |
|
|
) |
|
|
|
|
|
test_inputs = { |
|
|
"min_length": "50", |
|
|
"text": "Test document content", |
|
|
"max_length": "150" |
|
|
} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] == "success_live_mcp" |
|
|
|
|
|
|
|
|
call_args = mock_post.call_args |
|
|
payload = call_args[1]["json"] |
|
|
expected_order = ["Test document content", 150, 50] |
|
|
assert payload["data"] == expected_order |
|
|
|
|
|
def test_performance_requirements(self, initialized_system): |
|
|
"""Test that end-to-end execution meets performance requirements.""" |
|
|
client = initialized_system["client"] |
|
|
|
|
|
if initialized_system["planner"] is None: |
|
|
pytest.skip("Agent system not initialized") |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
plan_request = { |
|
|
"query": "I need sentiment analysis for customer reviews", |
|
|
"top_k": 3 |
|
|
} |
|
|
|
|
|
response = client.post("/api/plan/generate", json=plan_request) |
|
|
planning_time = time.time() - start_time |
|
|
|
|
|
|
|
|
if response.status_code == 503: |
|
|
pytest.skip("Agent system not available - check initialization and API keys") |
|
|
|
|
|
assert response.status_code == 200 |
|
|
assert planning_time < 2.0 |
|
|
|
|
|
plan_data = response.json() |
|
|
|
|
|
|
|
|
if "detail" in plan_data: |
|
|
pytest.skip(f"System error: {plan_data['detail']}") |
|
|
|
|
|
assert plan_data["status"] == "success" |
|
|
|
|
|
print(f"✅ Planning completed in {planning_time:.2f}s") |
|
|
|
|
|
def test_mcp_executor_direct_testing(self, mcp_executor): |
|
|
"""Test MCP executor directly without requiring external APIs.""" |
|
|
|
|
|
|
|
|
|
|
|
sim_tool = MCPTool( |
|
|
tool_id="test_sentiment_sim", |
|
|
name="Test Sentiment Analyzer", |
|
|
description="Test sentiment analysis tool", |
|
|
tags=["sentiment", "test"], |
|
|
invocation_command_stub="test_sentiment", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
sim_prompt = MCPPrompt( |
|
|
prompt_id="test_sentiment_prompt", |
|
|
name="Test Sentiment Prompt", |
|
|
description="Test sentiment analysis prompt", |
|
|
target_tool_id="test_sentiment_sim", |
|
|
template_string="Analyze sentiment: {{text}}", |
|
|
input_variables=["text"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep( |
|
|
tool=sim_tool, |
|
|
prompt=sim_prompt, |
|
|
relevance_score=0.95 |
|
|
) |
|
|
|
|
|
test_inputs = {"text": "This product is amazing and I love it!"} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] in ["simulated_success", "simulated_error_timeout", "simulated_error_rate_limit", |
|
|
"simulated_error_invalid_input", "simulated_error_service_unavailable", |
|
|
"simulated_error_authentication_failed"] |
|
|
assert result["execution_mode"] in ["simulated", "simulated_error"] |
|
|
assert result["tool_id_used"] == "test_sentiment_sim" |
|
|
assert result["tool_name_used"] == "Test Sentiment Analyzer" |
|
|
|
|
|
|
|
|
if result["status"] == "simulated_success": |
|
|
assert "tool_specific_output" in result |
|
|
assert result["tool_specific_output"] is not None |
|
|
assert "sentiment" in result["tool_specific_output"].lower() |
|
|
|
|
|
print(f"✅ Simulated execution test passed (status: {result['status']})") |
|
|
|
|
|
|
|
|
mcp_tool_unreachable = MCPTool( |
|
|
tool_id="test_mcp_unreachable", |
|
|
name="MCP Tool Unreachable", |
|
|
description="MCP tool with unreachable endpoint", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url="http://localhost:9999/unreachable", |
|
|
timeout_seconds=5 |
|
|
) |
|
|
|
|
|
|
|
|
mcp_prompt = MCPPrompt( |
|
|
prompt_id="test_mcp_unreachable_prompt", |
|
|
name="Test MCP Unreachable Prompt", |
|
|
description="Test prompt for unreachable MCP endpoint", |
|
|
target_tool_id="test_mcp_unreachable", |
|
|
template_string="Test unreachable endpoint: {{text}}", |
|
|
input_variables=["text"] |
|
|
) |
|
|
|
|
|
mcp_planned_step = PlannedStep( |
|
|
tool=mcp_tool_unreachable, |
|
|
prompt=mcp_prompt, |
|
|
relevance_score=0.8 |
|
|
) |
|
|
|
|
|
|
|
|
mcp_result = mcp_executor.execute_plan_step(mcp_planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert mcp_result["status"].startswith("error_") |
|
|
assert mcp_result["execution_mode"] == "live_mcp_failed" |
|
|
assert "error_information" in mcp_result |
|
|
|
|
|
print(f"✅ Unreachable endpoint handling test passed (status: {mcp_result['status']})") |
|
|
|
|
|
|
|
|
with pytest.raises(ValueError, match="Plan must be a PlannedStep instance"): |
|
|
mcp_executor.execute_plan_step("invalid_plan", test_inputs) |
|
|
|
|
|
with pytest.raises(ValueError, match="Inputs must be a dictionary"): |
|
|
mcp_executor.execute_plan_step(planned_step, "invalid_inputs") |
|
|
|
|
|
print("✅ Input validation test passed") |
|
|
|
|
|
|
|
|
summarizer_tool = MCPTool( |
|
|
tool_id="test_summarizer", |
|
|
name="Test Text Summarizer", |
|
|
description="Test text summarization tool", |
|
|
tags=["summarization", "test"], |
|
|
invocation_command_stub="test_summarize", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
summarizer_prompt = MCPPrompt( |
|
|
prompt_id="test_summarizer_prompt", |
|
|
name="Test Summarization Prompt", |
|
|
description="Test summarization prompt", |
|
|
target_tool_id="test_summarizer", |
|
|
template_string="Summarize: {{text}} with max length {{max_length}}", |
|
|
input_variables=["text", "max_length"] |
|
|
) |
|
|
|
|
|
summarizer_step = PlannedStep( |
|
|
tool=summarizer_tool, |
|
|
prompt=summarizer_prompt, |
|
|
relevance_score=0.9 |
|
|
) |
|
|
|
|
|
summarizer_inputs = { |
|
|
"text": "This is a long document that needs to be summarized for better understanding.", |
|
|
"max_length": "50" |
|
|
} |
|
|
|
|
|
summarizer_result = mcp_executor.execute_plan_step(summarizer_step, summarizer_inputs) |
|
|
|
|
|
|
|
|
assert summarizer_result["status"] in ["simulated_success", "simulated_error_timeout", "simulated_error_rate_limit", |
|
|
"simulated_error_invalid_input", "simulated_error_service_unavailable", |
|
|
"simulated_error_authentication_failed"] |
|
|
|
|
|
|
|
|
if summarizer_result["status"] == "simulated_success": |
|
|
assert "summary" in summarizer_result["tool_specific_output"].lower() |
|
|
|
|
|
print(f"✅ Multi-tool type test passed (status: {summarizer_result['status']})") |
|
|
|
|
|
def _is_mcp_server_available(self, endpoint_url: str) -> bool: |
|
|
"""Check if MCP server is available at the given endpoint.""" |
|
|
try: |
|
|
response = requests.get(endpoint_url.replace("/gradio_api/mcp/sse", "/"), timeout=5) |
|
|
return response.status_code == 200 |
|
|
except: |
|
|
return False |
|
|
|
|
|
def _create_planned_step_from_api_response(self, step_data: dict[str, Any]) -> PlannedStep: |
|
|
"""Create PlannedStep object from API response data.""" |
|
|
tool_info = step_data["tool"] |
|
|
prompt_info = step_data["prompt"] |
|
|
|
|
|
tool = MCPTool( |
|
|
tool_id=tool_info["tool_id"], |
|
|
name=tool_info["name"], |
|
|
description=tool_info["description"], |
|
|
tags=tool_info.get("tags", []), |
|
|
invocation_command_stub=tool_info.get("invocation_command_stub", ""), |
|
|
execution_type=tool_info.get("execution_type", "simulated"), |
|
|
mcp_endpoint_url=tool_info.get("mcp_endpoint_url"), |
|
|
input_parameter_order=tool_info.get("input_parameter_order", []), |
|
|
timeout_seconds=tool_info.get("timeout_seconds", 30) |
|
|
) |
|
|
|
|
|
prompt = MCPPrompt( |
|
|
prompt_id=prompt_info["prompt_id"], |
|
|
name=prompt_info["name"], |
|
|
description=prompt_info["description"], |
|
|
target_tool_id=prompt_info.get("target_tool_id", tool_info["tool_id"]), |
|
|
template_string=prompt_info["template_string"], |
|
|
input_variables=prompt_info["input_variables"], |
|
|
difficulty_level=prompt_info.get("difficulty_level", "beginner") |
|
|
) |
|
|
|
|
|
return PlannedStep( |
|
|
tool=tool, |
|
|
prompt=prompt, |
|
|
relevance_score=step_data["relevance_score"] |
|
|
) |
|
|
|
|
|
|
|
|
class TestE2EMcpIntegrationScenarios: |
|
|
"""Test integration scenarios across the complete system.""" |
|
|
|
|
|
@pytest.fixture |
|
|
def client(self): |
|
|
"""Provide test client with Gradio UI mounted.""" |
|
|
return TestClient(app_with_ui) |
|
|
|
|
|
def test_health_check_before_execution(self, client): |
|
|
"""Test that system health check works before attempting execution.""" |
|
|
response = client.get("/health") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
health_data = response.json() |
|
|
assert health_data["status"] == "healthy" |
|
|
assert "timestamp" in health_data |
|
|
|
|
|
def test_api_documentation_accessibility(self, client): |
|
|
"""Test that API documentation is accessible.""" |
|
|
response = client.get("/docs") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
def test_gradio_ui_integration(self, client): |
|
|
"""Test that Gradio UI is accessible.""" |
|
|
response = client.get("/ui/") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
def test_error_propagation_through_system(self, client): |
|
|
"""Test that errors propagate correctly through the system.""" |
|
|
|
|
|
response = client.post("/api/plan/generate", json={}) |
|
|
assert response.status_code == 422 |
|
|
|
|
|
|
|
|
response = client.post( |
|
|
"/api/plan/generate", |
|
|
json={"query": 123, "top_k": "invalid"} |
|
|
) |
|
|
assert response.status_code == 422 |
|
|
|
|
|
def test_system_resilience_under_load(self, client): |
|
|
"""Test system resilience under concurrent load.""" |
|
|
import concurrent.futures |
|
|
|
|
|
def make_request(): |
|
|
return client.post( |
|
|
"/api/plan/generate", |
|
|
json={"query": "test sentiment analysis", "top_k": 1} |
|
|
) |
|
|
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
|
|
futures = [executor.submit(make_request) for _ in range(20)] |
|
|
responses = [future.result() for future in concurrent.futures.as_completed(futures)] |
|
|
|
|
|
|
|
|
assert len(responses) == 20 |
|
|
for response in responses: |
|
|
assert response.status_code in [200, 503] |
|
|
|
|
|
def test_data_consistency_across_components(self, client): |
|
|
"""Test that data remains consistent across system components.""" |
|
|
|
|
|
response = client.post( |
|
|
"/api/plan/generate", |
|
|
json={"query": "sentiment analysis", "top_k": 1} |
|
|
) |
|
|
assert response.status_code == 200 |
|
|
|
|
|
plan_data = response.json() |
|
|
if len(plan_data["planned_steps"]) > 0: |
|
|
step = plan_data["planned_steps"][0] |
|
|
|
|
|
|
|
|
assert "tool" in step |
|
|
assert "prompt" in step |
|
|
assert "relevance_score" in step |
|
|
|
|
|
|
|
|
tool_id = step["tool"]["tool_id"] |
|
|
target_tool_id = step["prompt"].get("target_tool_id") |
|
|
if target_tool_id: |
|
|
assert tool_id == target_tool_id |
|
|
|
|
|
|
|
|
class TestE2EMcpEdgeCases: |
|
|
"""Test edge cases in end-to-end MCP execution.""" |
|
|
|
|
|
@pytest.fixture |
|
|
def mcp_executor(self): |
|
|
"""Provide McpExecutorAgent for testing.""" |
|
|
return McpExecutorAgent() |
|
|
|
|
|
def test_empty_input_handling(self, mcp_executor): |
|
|
"""Test handling of empty inputs.""" |
|
|
tool = MCPTool( |
|
|
tool_id="empty_test", |
|
|
name="Empty Input Test Tool", |
|
|
description="Tool for testing empty inputs", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
prompt = MCPPrompt( |
|
|
prompt_id="empty_prompt", |
|
|
name="Empty Test Prompt", |
|
|
description="Test empty prompt", |
|
|
target_tool_id="empty_test", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep(tool=tool, prompt=prompt, relevance_score=0.9) |
|
|
|
|
|
|
|
|
empty_inputs = {} |
|
|
result = mcp_executor.execute_plan_step(planned_step, empty_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] in ["simulated_success", "simulated_error_missing_input", |
|
|
"simulated_error_timeout", "simulated_error_rate_limit", |
|
|
"simulated_error_invalid_input", "simulated_error_service_unavailable", |
|
|
"simulated_error_authentication_failed"] |
|
|
|
|
|
def test_large_input_handling(self, mcp_executor): |
|
|
"""Test handling of very large inputs.""" |
|
|
tool = MCPTool( |
|
|
tool_id="large_test", |
|
|
name="Large Input Test Tool", |
|
|
description="Tool for testing large inputs", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
prompt = MCPPrompt( |
|
|
prompt_id="large_prompt", |
|
|
name="Large Test Prompt", |
|
|
description="Test large prompt", |
|
|
target_tool_id="large_test", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep(tool=tool, prompt=prompt, relevance_score=0.9) |
|
|
|
|
|
|
|
|
large_input = "x" * 100000 |
|
|
large_inputs = {"input": large_input} |
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, large_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] in ["simulated_success", "simulated_error_input_too_large", |
|
|
"simulated_error_timeout", "simulated_error_rate_limit", |
|
|
"simulated_error_invalid_input", "simulated_error_service_unavailable", |
|
|
"simulated_error_authentication_failed"] |
|
|
|
|
|
def test_special_characters_in_input(self, mcp_executor): |
|
|
"""Test handling of special characters and Unicode in inputs.""" |
|
|
tool = MCPTool( |
|
|
tool_id="unicode_test", |
|
|
name="Unicode Test Tool", |
|
|
description="Tool for testing Unicode inputs", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
prompt = MCPPrompt( |
|
|
prompt_id="unicode_prompt", |
|
|
name="Unicode Test Prompt", |
|
|
description="Test Unicode prompt", |
|
|
target_tool_id="unicode_test", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep(tool=tool, prompt=prompt, relevance_score=0.9) |
|
|
|
|
|
|
|
|
special_inputs = { |
|
|
"input": "Test with émojis 🎯, special chars @#$%, and Unicode: 你好世界" |
|
|
} |
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, special_inputs) |
|
|
|
|
|
|
|
|
assert result["status"] in ["simulated_success", "simulated_error_timeout", "simulated_error_rate_limit", |
|
|
"simulated_error_invalid_input", "simulated_error_service_unavailable", |
|
|
"simulated_error_authentication_failed"] |
|
|
assert special_inputs["input"] in str(result["inputs_received"]) |
|
|
|
|
|
def test_malformed_tool_configuration(self, mcp_executor): |
|
|
"""Test handling of malformed tool configurations.""" |
|
|
|
|
|
with pytest.raises(ValueError, match="name cannot be empty"): |
|
|
MCPTool( |
|
|
tool_id="malformed_test", |
|
|
name="", |
|
|
description="Tool with malformed config", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
|
|
|
with pytest.raises(ValueError, match="tool_id cannot be empty"): |
|
|
MCPTool( |
|
|
tool_id="", |
|
|
name="Valid Name", |
|
|
description="Tool with malformed config", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="simulated" |
|
|
) |
|
|
|
|
|
|
|
|
with pytest.raises(ValueError, match="execution_type must be"): |
|
|
MCPTool( |
|
|
tool_id="malformed_test", |
|
|
name="Valid Name", |
|
|
description="Tool with malformed config", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="invalid_type" |
|
|
) |
|
|
|
|
|
|
|
|
with pytest.raises(ValueError, match="mcp_endpoint_url is required"): |
|
|
MCPTool( |
|
|
tool_id="malformed_test", |
|
|
name="Valid Name", |
|
|
description="Tool with malformed config", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url=None |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
problematic_tool = MCPTool( |
|
|
tool_id="problematic_test", |
|
|
name="Problematic Test Tool", |
|
|
description="Tool that will fail during execution", |
|
|
tags=["test"], |
|
|
invocation_command_stub="test", |
|
|
execution_type="remote_mcp_gradio", |
|
|
mcp_endpoint_url="http://nonexistent.invalid/endpoint", |
|
|
timeout_seconds=1 |
|
|
) |
|
|
|
|
|
prompt = MCPPrompt( |
|
|
prompt_id="problematic_prompt", |
|
|
name="Problematic Test Prompt", |
|
|
description="Test prompt for problematic tool", |
|
|
target_tool_id="problematic_test", |
|
|
template_string="Test: {{input}}", |
|
|
input_variables=["input"] |
|
|
) |
|
|
|
|
|
planned_step = PlannedStep(tool=problematic_tool, prompt=prompt, relevance_score=0.9) |
|
|
test_inputs = {"input": "test"} |
|
|
|
|
|
|
|
|
result = mcp_executor.execute_plan_step(planned_step, test_inputs) |
|
|
|
|
|
|
|
|
assert result["status"].startswith("error_") |
|
|
assert "error_information" in result |
|
|
assert result["execution_mode"] == "live_mcp_failed" |
|
|
|