# MCP Enterprise Upgrade Guide ## Overview This guide documents the comprehensive enterprise-grade upgrades to the CX AI Agent MCP (Model Context Protocol) servers. The upgrades transform the basic MCP implementation into production-ready, scalable, and secure enterprise infrastructure. --- ## Table of Contents 1. [Architecture Overview](#architecture-overview) 2. [Database Layer](#database-layer) 3. [Authentication & Authorization](#authentication--authorization) 4. [Observability](#observability) 5. [Deployment](#deployment) 6. [Configuration](#configuration) 7. [Migration Guide](#migration-guide) 8. [API Reference](#api-reference) --- ## Architecture Overview ### Before: Basic JSON Storage ``` ┌─────────────────────┐ │ MCP Server │ │ (HTTP/JSON-RPC) │ │ │ │ ┌─────────────┐ │ │ │ JSON Files │ │ │ └─────────────┘ │ └─────────────────────┘ ``` ### After: Enterprise Architecture ``` ┌──────────────────────────────────────────┐ │ Load Balancer / API Gateway │ └──────────────┬───────────────────────────┘ │ ┌──────────┼──────────┐ │ │ │ ┌───▼───┐ ┌──▼────┐ ┌──▼────┐ │ MCP │ │ MCP │ │ MCP │ │Server │ │Server │ │Server │ │ #1 │ │ #2 │ │ #3 │ └───┬───┘ └──┬────┘ └──┬────┘ │ │ │ └─────────┼──────────┘ │ ┌─────────▼──────────┐ │ │ │ ┌────────────┐ │ │ │PostgreSQL │ │ │ │ +ACID │ │ │ └────────────┘ │ │ │ │ ┌────────────┐ │ │ │ Redis │ │ │ │ (Cache) │ │ │ └────────────┘ │ │ │ │ ┌────────────┐ │ │ │Prometheus │ │ │ │(Metrics) │ │ │ └────────────┘ │ └────────────────────┘ ``` --- ## Database Layer ### Features ✅ **SQLAlchemy ORM with Async Support** - Async database operations with `asyncio` and `asyncpg` - Type-safe models with SQLAlchemy 2.0 - Automatic relationship loading ✅ **Multi-Database Support** - SQLite (development/single-instance) - PostgreSQL (production/multi-instance) - MySQL (optional) ✅ **Enterprise Schema Design** - Proper foreign keys and relationships - Comprehensive indexes for performance - Audit trail with `AuditLog` table - Multi-tenancy support built-in ✅ **Connection Pooling** - Configurable pool size - Pool pre-ping for connection health - Automatic connection recycling ✅ **Database Migrations** - Alembic integration for schema versioning - Automatic migration generation - Rollback support ### Database Models #### Core Models - `Company` - Company/account information - `Prospect` - Sales prospects with scoring - `Contact` - Decision-maker contacts - `Fact` - Enrichment data and insights - `Activity` - All prospect interactions (emails, calls, meetings) - `Suppression` - Compliance (opt-outs, bounces) - `Handoff` - AI-to-human transitions - `AuditLog` - Compliance and security audit trail #### Key Features ```python # Multi-tenancy tenant_id: Optional[str] # On all tenant-aware models # Automatic timestamps created_at: datetime updated_at: datetime # Soft deletes is_active: bool # Rich relationships company.prospects # All prospects for a company prospect.activities # All activities for a prospect ``` ### Usage #### Initialize Database ```python from mcp.database import init_database # Create tables await init_database() ``` #### Using Repositories ```python from mcp.database import get_db_manager, CompanyRepository # Get database session db_manager = get_db_manager() async with db_manager.get_session() as session: repo = CompanyRepository(session, tenant_id="acme_corp") # Create company company = await repo.create({ "id": "shopify", "name": "Shopify", "domain": "shopify.com", "industry": "E-commerce", "employee_count": 10000 }) # Get company company = await repo.get_by_domain("shopify.com") # List companies companies = await repo.list(industry="E-commerce", limit=100) ``` #### Using Database Store Service ```python from mcp.database import DatabaseStoreService # Create service instance store = DatabaseStoreService(tenant_id="acme_corp") # Save prospect await store.save_prospect({ "id": "prospect_123", "company_id": "shopify", "fit_score": 85.0, "status": "new" }) # Get prospect prospect = await store.get_prospect("prospect_123") # List prospects prospects = await store.list_prospects() ``` ### Migrations #### Create Migration ```bash python -m mcp.database.migrate create "add_new_field" ``` #### Apply Migrations ```bash # Upgrade to latest python -m mcp.database.migrate upgrade # Upgrade to specific revision python -m mcp.database.migrate upgrade abc123 ``` #### Rollback ```bash python -m mcp.database.migrate downgrade ``` ### Configuration ```bash # Database URL (SQLite) DATABASE_URL=sqlite+aiosqlite:///./data/cx_agent.db # Database URL (PostgreSQL) DATABASE_URL=postgresql+asyncpg://user:password@localhost/cx_agent # Connection pool settings DB_POOL_SIZE=20 DB_MAX_OVERFLOW=10 DB_POOL_TIMEOUT=30 DB_POOL_RECYCLE=3600 DB_POOL_PRE_PING=true # SQLite WAL mode (better concurrency) SQLITE_WAL=true # Echo SQL (debugging) DB_ECHO=false ``` --- ## Authentication & Authorization ### Features ✅ **API Key Authentication** - Secure key generation (`mcp_<32-char-hex>`) - SHA-256 key hashing (never store plain keys) - Key expiration support - Per-key rate limiting - Multiple authentication methods (header, bearer token) ✅ **Request Signing (HMAC)** - HMAC-SHA256 request signing - Timestamp verification (5-minute window) - Replay attack prevention ✅ **Rate Limiting** - Token bucket algorithm - Per-client rate limiting - Per-endpoint rate limiting - Global rate limiting (optional) - Redis-based distributed rate limiting ✅ **Multi-Tenancy** - Tenant isolation at data layer - Tenant-specific API keys - Tenant-aware rate limits ### API Key Authentication #### Generate API Key ```python from mcp.auth import get_key_manager manager = get_key_manager() # Generate new key plain_key, api_key_obj = manager.create_key( name="Production API Key", tenant_id="acme_corp", expires_in_days=365, rate_limit=1000 # requests per minute ) # Save plain_key securely! It's shown only once print(f"API Key: {plain_key}") ``` #### Validate API Key ```python api_key = manager.validate_key(plain_key) if api_key and api_key.is_valid(): print(f"Valid key: {api_key.name}") ``` #### Revoke API Key ```python manager.revoke_key(key_hash) ``` ### Using API Keys #### HTTP Header ```bash curl -H "X-API-Key: mcp_abc123..." http://localhost:9004/rpc ``` #### Bearer Token ```bash curl -H "Authorization: Bearer mcp_abc123..." http://localhost:9004/rpc ``` ### Request Signing ```python from mcp.auth import RequestSigningAuth import time import json signer = RequestSigningAuth(secret_key="your_secret_key") # Sign request method = "POST" path = "/rpc" body = json.dumps({"method": "store.get_prospect", "params": {"id": "123"}}) timestamp = datetime.utcnow().isoformat() + "Z" signature = signer.sign_request(method, path, body, timestamp) # Send request with signature headers = { "X-Signature": signature, "X-Timestamp": timestamp, "Content-Type": "application/json" } ``` ### Rate Limiting #### Configure Limits ```python from mcp.auth import get_rate_limiter limiter = get_rate_limiter() # Set endpoint-specific limits limiter.endpoint_limits["/rpc"] = { "capacity": 100, # Max 100 requests "refill_rate": 10.0 # Refill 10 per second } ``` #### Check Rate Limit ```python allowed, retry_after = await limiter.check_rate_limit(request) if not allowed: print(f"Rate limited! Retry after {retry_after} seconds") ``` ### Configuration ```bash # Primary API key MCP_API_KEY=mcp_your_primary_key_here # Additional API keys (comma-separated) MCP_API_KEYS=mcp_key1,mcp_key2,mcp_key3 # Secret key for request signing MCP_SECRET_KEY=your_hmac_secret_key_here ``` --- ## Observability ### Features ✅ **Structured Logging** - JSON logging for production - Correlation ID tracking - Request/response logging - Performance timing - ELK/Datadog/Splunk compatible ✅ **Prometheus Metrics** - HTTP request metrics (count, duration, size) - MCP-specific metrics - Business metrics (prospects, contacts, emails) - Database metrics - Cache metrics - Authentication metrics - Error tracking ✅ **Performance Tracking** - Automatic request timing - MCP call duration tracking - Database query performance - Context managers for custom tracking ### Structured Logging #### Configuration ```python from mcp.observability import configure_logging # Development (human-readable) configure_logging(level="DEBUG", json_output=False) # Production (JSON) configure_logging(level="INFO", json_output=True) ``` #### Usage ```python from mcp.observability import get_logger, set_correlation_id logger = get_logger(__name__) # Set correlation ID set_correlation_id("request-abc-123") # Log messages logger.info("Processing request", user_id="user123", action="create_prospect") logger.warning("Rate limit approaching", remaining=10) logger.error("Database error", exc_info=True) ``` #### Log Output (Development) ``` 2025-01-20 10:30:15 [info ] Processing request [cx_ai_agent] correlation_id=request-abc-123 user_id=user123 action=create_prospect ``` #### Log Output (Production JSON) ```json { "event": "Processing request", "timestamp": "2025-01-20T10:30:15", "level": "info", "correlation_id": "request-abc-123", "service": "cx_ai_agent", "environment": "production", "user_id": "user123", "action": "create_prospect" } ``` ### Prometheus Metrics #### Available Metrics **HTTP Metrics:** - `mcp_http_requests_total` - Total requests by method, path, status - `mcp_http_request_duration_seconds` - Request duration histogram - `mcp_http_request_size_bytes` - Request size - `mcp_http_response_size_bytes` - Response size **MCP Metrics:** - `mcp_calls_total` - Total MCP calls by server, method, status - `mcp_call_duration_seconds` - MCP call duration histogram **Business Metrics:** - `mcp_prospects_total` - Total prospects by status, tenant - `mcp_contacts_total` - Total contacts by tenant - `mcp_companies_total` - Total companies by tenant - `mcp_emails_sent_total` - Total emails sent - `mcp_meetings_booked_total` - Total meetings booked **Database Metrics:** - `mcp_db_connections` - Active database connections - `mcp_db_queries_total` - Total queries by operation, table - `mcp_db_query_duration_seconds` - Query duration histogram **Cache Metrics:** - `mcp_cache_hits_total` - Total cache hits - `mcp_cache_misses_total` - Total cache misses **Auth Metrics:** - `mcp_auth_attempts_total` - Auth attempts by result - `mcp_rate_limit_exceeded_total` - Rate limit exceeded events #### Usage ```python from mcp.observability import get_metrics metrics = get_metrics() # Record HTTP request metrics.record_http_request( method="POST", path="/rpc", status=200, duration=0.05 ) # Record MCP call metrics.record_mcp_call( server="search", method="search.query", duration=0.1, success=True ) # Update business metrics metrics.prospects_total.labels(status="qualified", tenant_id="acme").set(150) ``` #### Metrics Endpoint ```bash curl http://localhost:9004/metrics ``` #### Grafana Dashboard Example Prometheus queries: ```promql # Request rate rate(mcp_http_requests_total[5m]) # P95 latency histogram_quantile(0.95, rate(mcp_http_request_duration_seconds_bucket[5m])) # Error rate rate(mcp_http_requests_total{status=~"5.."}[5m]) # MCP call success rate rate(mcp_calls_total{status="success"}[5m]) / rate(mcp_calls_total[5m]) ``` ### Configuration ```bash # Service name (for logging and metrics) SERVICE_NAME=cx_ai_agent # Environment ENVIRONMENT=production # Version VERSION=2.0.0 # Log level LOG_LEVEL=INFO ``` --- ## Deployment ### Development (Local) #### 1. Install Dependencies ```bash pip install -r requirements.txt ``` #### 2. Set Environment Variables ```bash export DATABASE_URL=sqlite+aiosqlite:///./data/cx_agent.db export MCP_API_KEY=mcp_dev_key_for_testing_only export LOG_LEVEL=DEBUG ``` #### 3. Initialize Database ```python python -c " import asyncio from mcp.database import init_database asyncio.run(init_database()) " ``` #### 4. Start MCP Server ```bash python mcp/servers/store_server_enterprise.py ``` ### Production (Docker) #### Dockerfile ```dockerfile FROM python:3.11-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY . . # Initialize database RUN python -c "import asyncio; from mcp.database import init_database; asyncio.run(init_database())" # Expose port EXPOSE 9004 # Health check HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD curl -f http://localhost:9004/health || exit 1 # Run server CMD ["python", "mcp/servers/store_server_enterprise.py"] ``` #### docker-compose.yml ```yaml version: '3.8' services: postgres: image: postgres:15-alpine environment: POSTGRES_DB: cx_agent POSTGRES_USER: cx_user POSTGRES_PASSWORD: ${DB_PASSWORD} volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U cx_user"] interval: 10s timeout: 5s retries: 5 redis: image: redis:7-alpine healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 3s retries: 3 mcp-store: build: . ports: - "9004:9004" environment: DATABASE_URL: postgresql+asyncpg://cx_user:${DB_PASSWORD}@postgres/cx_agent REDIS_URL: redis://redis:6379/0 MCP_API_KEY: ${MCP_API_KEY} MCP_SECRET_KEY: ${MCP_SECRET_KEY} SERVICE_NAME: mcp-store ENVIRONMENT: production LOG_LEVEL: INFO depends_on: postgres: condition: service_healthy redis: condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9004/health"] interval: 30s timeout: 10s retries: 3 prometheus: image: prom/prometheus:latest volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus ports: - "9090:9090" command: - '--config.file=/etc/prometheus/prometheus.yml' grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_PASSWORD} volumes: - grafana_data:/var/lib/grafana volumes: postgres_data: prometheus_data: grafana_data: ``` ### Kubernetes Deployment #### deployment.yaml ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: mcp-store labels: app: mcp-store spec: replicas: 3 selector: matchLabels: app: mcp-store template: metadata: labels: app: mcp-store spec: containers: - name: mcp-store image: cx-agent/mcp-store:latest ports: - containerPort: 9004 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: db-credentials key: url - name: MCP_API_KEY valueFrom: secretKeyRef: name: mcp-credentials key: api_key - name: REDIS_URL value: redis://redis-service:6379/0 resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 9004 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health port: 9004 initialDelaySeconds: 5 periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: mcp-store-service spec: selector: app: mcp-store ports: - port: 9004 targetPort: 9004 type: LoadBalancer ``` --- ## Configuration ### Environment Variables #### Database ```bash DATABASE_URL=postgresql+asyncpg://user:pass@localhost/cx_agent DB_POOL_SIZE=20 DB_MAX_OVERFLOW=10 DB_POOL_TIMEOUT=30 DB_POOL_RECYCLE=3600 DB_POOL_PRE_PING=true SQLITE_WAL=true DB_ECHO=false ``` #### Authentication ```bash MCP_API_KEY=mcp_primary_key_here MCP_API_KEYS=mcp_key1,mcp_key2,mcp_key3 MCP_SECRET_KEY=hmac_secret_key_here ``` #### Observability ```bash SERVICE_NAME=cx_ai_agent ENVIRONMENT=production VERSION=2.0.0 LOG_LEVEL=INFO ``` #### Redis (Optional) ```bash REDIS_URL=redis://localhost:6379/0 ``` --- ## Migration Guide ### From JSON to Database #### 1. Backup JSON Files ```bash cp data/prospects.json data/prospects.json.backup cp data/companies_store.json data/companies_store.json.backup cp data/contacts.json data/contacts.json.backup ``` #### 2. Initialize Database ```bash python -m mcp.database.migrate upgrade ``` #### 3. Migrate Data ```python import json import asyncio from pathlib import Path from mcp.database import DatabaseStoreService async def migrate(): store = DatabaseStoreService() # Migrate prospects with open("data/prospects.json") as f: prospects = json.load(f) for prospect in prospects: await store.save_prospect(prospect) # Migrate companies with open("data/companies_store.json") as f: companies = json.load(f) for company in companies: await store.save_company(company) # Migrate contacts with open("data/contacts.json") as f: contacts = json.load(f) for contact in contacts: await store.save_contact(contact) print("Migration completed!") asyncio.run(migrate()) ``` #### 4. Test ```bash # Test database access python -c " import asyncio from mcp.database import DatabaseStoreService async def test(): store = DatabaseStoreService() prospects = await store.list_prospects() print(f'Migrated {len(prospects)} prospects') asyncio.run(test()) " ``` #### 5. Switch to Database Backend ```bash # Update environment export USE_IN_MEMORY_MCP=false export DATABASE_URL=sqlite+aiosqlite:///./data/cx_agent.db ``` --- ## API Reference ### MCP Store Methods #### `store.save_prospect(prospect: Dict) -> str` Save or update a prospect. #### `store.get_prospect(id: str) -> Optional[Dict]` Get a prospect by ID. #### `store.list_prospects() -> List[Dict]` List all prospects (tenant-filtered). #### `store.save_company(company: Dict) -> str` Save or update a company. #### `store.get_company(id: str) -> Optional[Dict]` Get a company by ID. #### `store.save_contact(contact: Dict) -> str` Save a contact. #### `store.list_contacts_by_domain(domain: str) -> List[Dict]` List contacts by email domain. #### `store.check_suppression(type: str, value: str) -> bool` Check if email/domain is suppressed. #### `store.save_handoff(packet: Dict) -> str` Save a handoff packet. #### `store.clear_all() -> str` Clear all data (use with caution!). --- ## Next Steps 1. **Review Performance**: Monitor metrics in Grafana 2. **Scale Up**: Add more replicas in Kubernetes 3. **Add More Features**: - Real email sending (AWS SES) - Real calendar integration (Google/Outlook) - Advanced analytics - Machine learning scoring 4. **Security Hardening**: - TLS/SSL certificates - WAF (Web Application Firewall) - DDoS protection 5. **Compliance**: - GDPR compliance features - Data retention policies - Privacy controls --- ## Support For issues or questions: 1. Check logs: `docker logs mcp-store` 2. Check metrics: `http://localhost:9004/metrics` 3. Check health: `http://localhost:9004/health` --- ## License Enterprise License - All Rights Reserved