"""Performance tests for KGraph-MCP system optimization.""" import asyncio import concurrent.futures import time from unittest.mock import MagicMock, patch import pytest from fastapi.testclient import TestClient try: from kg_services.embedder_async import AsyncEmbeddingService from kg_services.ontology import MCPTool from kg_services.performance import ( AsyncBatchProcessor, EmbeddingCache, LRUCache, PerformanceMonitor, get_system_performance_report, optimize_memory_usage, ) PERFORMANCE_MODULES_AVAILABLE = True except ImportError: PERFORMANCE_MODULES_AVAILABLE = False @pytest.mark.skipif(not PERFORMANCE_MODULES_AVAILABLE, reason="Performance modules not available") @pytest.mark.performance class TestLRUCache: """Test LRU cache performance and functionality.""" @pytest.mark.asyncio async def test_cache_basic_operations(self): """Test basic cache operations.""" cache = LRUCache(max_size=3, ttl_seconds=60) # Test set and get await cache.set("key1", "value1") result = await cache.get("key1") assert result == "value1" # Test cache miss result = await cache.get("nonexistent") assert result is None @pytest.mark.asyncio async def test_cache_eviction(self): """Test LRU eviction when max size is reached.""" cache = LRUCache(max_size=2, ttl_seconds=60) # Fill cache to max size await cache.set("key1", "value1") await cache.set("key2", "value2") # Add third item (should evict oldest) await cache.set("key3", "value3") # key1 should be evicted assert await cache.get("key1") is None assert await cache.get("key2") == "value2" assert await cache.get("key3") == "value3" @pytest.mark.asyncio async def test_cache_ttl(self): """Test time-to-live functionality.""" cache = LRUCache(max_size=10, ttl_seconds=0.1) # Very short TTL await cache.set("key1", "value1") assert await cache.get("key1") == "value1" # Wait for TTL to expire await asyncio.sleep(0.2) assert await cache.get("key1") is None @pytest.mark.asyncio async def test_cache_stats(self): """Test cache statistics.""" cache = LRUCache(max_size=10, ttl_seconds=60) await cache.set("key1", "value1") await cache.set("key2", "value2") stats = cache.get_stats() assert stats["total_entries"] == 2 assert stats["max_size"] == 10 assert stats["ttl_seconds"] == 60 @pytest.mark.skipif(not PERFORMANCE_MODULES_AVAILABLE, reason="Performance modules not available") @pytest.mark.performance class TestEmbeddingCache: """Test embedding cache performance and functionality.""" @pytest.mark.asyncio async def test_embedding_cache_basic(self): """Test basic embedding cache operations.""" cache = EmbeddingCache(max_size=100, compression_precision=6) # Test set and get embedding = [0.1, 0.2, 0.3, 0.4, 0.5] await cache.set_embedding("test text", embedding) result = await cache.get_embedding("test text") assert result is not None assert len(result) == len(embedding) # Check compression precision assert result[0] == round(0.1, 6) @pytest.mark.asyncio async def test_embedding_cache_stats(self): """Test embedding cache statistics.""" cache = EmbeddingCache(max_size=100, compression_precision=6) # Initial stats stats = cache.get_cache_stats() assert stats["hit_count"] == 0 assert stats["miss_count"] == 0 assert stats["hit_ratio"] == 0 # Add embedding and test hit/miss embedding = [0.1, 0.2, 0.3] await cache.set_embedding("test", embedding) # Cache hit await cache.get_embedding("test") # Cache miss await cache.get_embedding("nonexistent") stats = cache.get_cache_stats() assert stats["hit_count"] == 1 assert stats["miss_count"] == 1 assert stats["hit_ratio"] == 0.5 @pytest.mark.skipif(not PERFORMANCE_MODULES_AVAILABLE, reason="Performance modules not available") @pytest.mark.performance class TestPerformanceMonitor: """Test performance monitoring functionality.""" def test_performance_monitor_basic(self): """Test basic performance monitoring.""" monitor = PerformanceMonitor() # Record some requests monitor.record_request(0.1, success=True) monitor.record_request(0.2, success=True) monitor.record_request(0.3, success=False) stats = monitor.get_performance_stats() assert stats["total_requests"] == 3 assert stats["error_count"] == 1 assert stats["error_rate"] == 1/3 assert stats["avg_response_time_ms"] == (0.1 + 0.2 + 0.3) / 3 * 1000 assert stats["max_response_time_ms"] == 0.3 * 1000 def test_performance_monitor_edge_cases(self): """Test performance monitor edge cases.""" monitor = PerformanceMonitor() # No requests recorded stats = monitor.get_performance_stats() assert stats["total_requests"] == 0 assert stats["avg_response_time_ms"] == 0 assert stats["error_rate"] == 0 @pytest.mark.skipif(not PERFORMANCE_MODULES_AVAILABLE, reason="Performance modules not available") @pytest.mark.performance class TestAsyncBatchProcessor: """Test async batch processor performance.""" @pytest.mark.asyncio async def test_batch_processor_basic(self): """Test basic batch processing.""" processor = AsyncBatchProcessor(batch_size=2, max_concurrent=2) def mock_processor(item): return item * 2 items = [1, 2, 3, 4, 5] results = await processor.process_batch(items, mock_processor) assert len(results) == 5 assert results == [2, 4, 6, 8, 10] @pytest.mark.asyncio async def test_batch_processor_with_async_function(self): """Test batch processing with async function.""" processor = AsyncBatchProcessor(batch_size=2, max_concurrent=2) async def async_processor(item): await asyncio.sleep(0.01) # Simulate async work return item * 3 items = [1, 2, 3] results = await processor.process_batch(items, async_processor) assert len(results) == 3 assert results == [3, 6, 9] @pytest.mark.asyncio async def test_batch_processor_error_handling(self): """Test batch processor error handling.""" processor = AsyncBatchProcessor(batch_size=2, max_concurrent=2) def error_processor(item): if item == 2: raise ValueError("Test error") return item * 2 items = [1, 2, 3] results = await processor.process_batch(items, error_processor) assert len(results) == 3 assert results[0] == 2 # 1 * 2 assert results[1] is None # Error case assert results[2] == 6 # 3 * 2 @pytest.mark.skipif(not PERFORMANCE_MODULES_AVAILABLE, reason="Performance modules not available") @pytest.mark.performance class TestAsyncEmbeddingService: """Test async embedding service performance.""" @pytest.mark.asyncio async def test_async_embedding_service_mock(self): """Test async embedding service with mock embeddings.""" service = AsyncEmbeddingService(embedding_dim=128) # Should use mock embeddings when no OpenAI key embedding = await service.get_embedding("test text") assert embedding is not None assert len(embedding) == 128 assert all(0 <= val <= 1 for val in embedding) @pytest.mark.asyncio async def test_async_embedding_service_caching(self): """Test embedding service caching.""" service = AsyncEmbeddingService(embedding_dim=64) # First request (cache miss) embedding1 = await service.get_embedding("test text", use_cache=True) assert embedding1 is not None # Second request (should be cache hit) embedding2 = await service.get_embedding("test text", use_cache=True) assert embedding2 == embedding1 # Check performance stats stats = service.get_performance_stats() assert stats["total_embedding_requests"] == 2 assert stats["cache_hits"] == 1 assert stats["cache_hit_rate"] == 0.5 @pytest.mark.asyncio async def test_async_embedding_service_tools(self): """Test embedding service with tools.""" service = AsyncEmbeddingService(embedding_dim=64) tool = MCPTool( tool_id="test-tool", name="Test Tool", description="A test tool for embeddings", tags=["test", "embedding"], invocation_command_stub="test_command" ) embedding = await service.embed_tool_description(tool) assert embedding is not None assert len(embedding) == 64 @pytest.mark.asyncio async def test_find_similar_tools_performance(self): """Test performance of finding similar tools.""" service = AsyncEmbeddingService(embedding_dim=32) # Smaller for speed # Create test tools tools = [] for i in range(10): tool = MCPTool( tool_id=f"tool-{i}", name=f"Tool {i}", description=f"Test tool number {i}", tags=["test"], invocation_command_stub=f"tool_{i}" ) tools.append(tool) # Measure performance start_time = time.time() similar_tools = await service.find_similar_tools("test tool", tools, top_k=3) end_time = time.time() # Should complete quickly assert end_time - start_time < 2.0 # Less than 2 seconds assert len(similar_tools) == 3 assert all(isinstance(tool, MCPTool) for tool, _ in similar_tools) assert all(isinstance(score, (int, float)) for _, score in similar_tools) @pytest.mark.performance class TestSystemPerformanceOptimization: """Test system-level performance optimization.""" @pytest.mark.asyncio async def test_memory_optimization(self): """Test memory optimization functionality.""" # Mock memory usage with patch("psutil.Process") as mock_process: mock_memory_info = MagicMock() mock_memory_info.rss = 600 * 1024 * 1024 # 600 MB mock_process.return_value.memory_info.return_value = mock_memory_info result = await optimize_memory_usage(target_memory_mb=500) assert "current_memory_mb" in result assert "optimization_needed" in result assert "actions_taken" in result def test_system_performance_report(self): """Test system performance report generation.""" report = get_system_performance_report() assert "performance_monitor" in report assert "embedding_cache" in report assert "main_cache" in report assert "system_info" in report # Check system info structure system_info = report["system_info"] assert "cpu_count" in system_info assert "available_memory_gb" in system_info assert "total_memory_gb" in system_info @pytest.mark.performance class TestConcurrentPerformance: """Test performance under concurrent load.""" @pytest.mark.asyncio async def test_concurrent_cache_access(self): """Test cache performance under concurrent access.""" cache = LRUCache(max_size=100, ttl_seconds=60) async def cache_worker(worker_id: int): for i in range(10): key = f"worker_{worker_id}_key_{i}" value = f"worker_{worker_id}_value_{i}" await cache.set(key, value) result = await cache.get(key) assert result == value # Run multiple workers concurrently tasks = [cache_worker(i) for i in range(5)] await asyncio.gather(*tasks) @pytest.mark.asyncio async def test_concurrent_embedding_requests(self): """Test embedding service under concurrent load.""" service = AsyncEmbeddingService(embedding_dim=32) async def embedding_worker(worker_id: int): text = f"worker {worker_id} test text" embedding = await service.get_embedding(text) assert embedding is not None assert len(embedding) == 32 # Run multiple workers concurrently start_time = time.time() tasks = [embedding_worker(i) for i in range(10)] await asyncio.gather(*tasks) end_time = time.time() # Should handle concurrent requests efficiently assert end_time - start_time < 5.0 # Less than 5 seconds for 10 concurrent requests def test_concurrent_performance_monitoring(self): """Test performance monitoring under concurrent load.""" monitor = PerformanceMonitor() def worker(): for _ in range(100): monitor.record_request(0.01, success=True) # Run multiple threads with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(worker) for _ in range(5)] concurrent.futures.wait(futures) stats = monitor.get_performance_stats() assert stats["total_requests"] == 500 # 5 workers * 100 requests each assert stats["error_count"] == 0 @pytest.mark.performance class TestAPIPerformanceEndpoints: """Test performance API endpoints.""" def test_performance_stats_endpoint(self, client: TestClient): """Test performance stats API endpoint.""" response = client.get("/api/performance/stats") assert response.status_code == 200 data = response.json() assert "performance_monitor" in data assert "embedding_cache" in data assert "main_cache" in data assert "system_info" in data def test_performance_health_endpoint(self, client: TestClient): """Test performance health check endpoint.""" response = client.get("/api/performance/health") assert response.status_code == 200 data = response.json() assert "status" in data assert "warnings" in data assert "key_metrics" in data assert data["status"] in ["healthy", "degraded", "error"] def test_memory_optimization_endpoint(self, client: TestClient): """Test memory optimization endpoint.""" response = client.post( "/api/performance/optimize-memory", json={"target_memory_mb": 400} ) assert response.status_code == 200 data = response.json() assert "current_memory_mb" in data assert "target_memory_mb" in data assert "optimization_needed" in data assert "actions_taken" in data def test_cache_stats_endpoint(self, client: TestClient): """Test cache statistics endpoint.""" response = client.get("/api/performance/cache/stats") assert response.status_code == 200 data = response.json() assert "embedding_cache" in data assert "main_cache" in data def test_clear_cache_endpoint(self, client: TestClient): """Test cache clearing endpoint.""" response = client.delete("/api/performance/cache") assert response.status_code == 200 data = response.json() assert "message" in data assert "cleared" in data["message"] @pytest.mark.performance class TestBasicPerformanceRequirements: """Test basic performance requirements are met.""" def test_health_endpoint_response_time(self, client): """Test that health endpoint responds quickly.""" start_time = time.time() response = client.get("/health") end_time = time.time() assert response.status_code == 200 assert end_time - start_time < 0.1 # Should respond in < 100ms def test_concurrent_requests_basic(self, client): """Test basic concurrent request handling.""" import concurrent.futures def make_request(): return client.get("/health") # Submit multiple requests concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(make_request) for _ in range(10)] responses = [future.result() for future in concurrent.futures.as_completed(futures)] # All requests should succeed for response in responses: assert response.status_code == 200 @pytest.fixture def client(): """Provide test client for API endpoints.""" try: from app import app return TestClient(app) except ImportError: pytest.skip("App module not available")