Spaces:
Runtime error
Runtime error
| """ | |
| Productivity-Enhancing MCP Services | |
| Real-world services that increase sales automation efficiency | |
| """ | |
| import asyncio | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, timedelta | |
| import re | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class MCPAnalyticsService: | |
| """ | |
| Analytics Service - Track metrics, conversions, and performance | |
| Real-world use case: Monitor pipeline health and ROI | |
| """ | |
| def __init__(self): | |
| self.metrics = { | |
| "pipeline_runs": 0, | |
| "prospects_discovered": 0, | |
| "contacts_found": 0, | |
| "emails_generated": 0, | |
| "emails_sent": 0, | |
| "replies_received": 0, | |
| "meetings_booked": 0, | |
| "conversion_rate": 0.0, | |
| "average_response_time": 0.0, | |
| "top_performing_sequences": [], | |
| "daily_stats": {} | |
| } | |
| self.events = [] | |
| logger.info("MCP Analytics Service initialized") | |
| async def track_event(self, event_type: str, data: Dict) -> str: | |
| """Track an event for analytics""" | |
| event = { | |
| "type": event_type, | |
| "data": data, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| self.events.append(event) | |
| # Update aggregated metrics | |
| today = datetime.utcnow().strftime('%Y-%m-%d') | |
| if today not in self.metrics["daily_stats"]: | |
| self.metrics["daily_stats"][today] = { | |
| "pipeline_runs": 0, | |
| "prospects": 0, | |
| "contacts": 0, | |
| "emails": 0 | |
| } | |
| if event_type == "pipeline_run": | |
| self.metrics["pipeline_runs"] += 1 | |
| self.metrics["daily_stats"][today]["pipeline_runs"] += 1 | |
| elif event_type == "prospect_discovered": | |
| self.metrics["prospects_discovered"] += 1 | |
| self.metrics["daily_stats"][today]["prospects"] += 1 | |
| elif event_type == "contact_found": | |
| self.metrics["contacts_found"] += 1 | |
| self.metrics["daily_stats"][today]["contacts"] += 1 | |
| elif event_type == "email_generated": | |
| self.metrics["emails_generated"] += 1 | |
| self.metrics["daily_stats"][today]["emails"] += 1 | |
| elif event_type == "email_sent": | |
| self.metrics["emails_sent"] += 1 | |
| elif event_type == "reply_received": | |
| self.metrics["replies_received"] += 1 | |
| elif event_type == "meeting_booked": | |
| self.metrics["meetings_booked"] += 1 | |
| # Calculate conversion rate | |
| if self.metrics["emails_sent"] > 0: | |
| self.metrics["conversion_rate"] = ( | |
| self.metrics["meetings_booked"] / self.metrics["emails_sent"] | |
| ) * 100 | |
| logger.info(f"Analytics event tracked: {event_type}") | |
| return "tracked" | |
| async def get_metrics(self) -> Dict: | |
| """Get current metrics""" | |
| return self.metrics | |
| async def get_dashboard_data(self) -> Dict: | |
| """Get formatted dashboard data""" | |
| return { | |
| "summary": { | |
| "Total Pipeline Runs": self.metrics["pipeline_runs"], | |
| "Prospects Discovered": self.metrics["prospects_discovered"], | |
| "Contacts Found": self.metrics["contacts_found"], | |
| "Emails Generated": self.metrics["emails_generated"], | |
| "Emails Sent": self.metrics["emails_sent"], | |
| "Replies Received": self.metrics["replies_received"], | |
| "Meetings Booked": self.metrics["meetings_booked"], | |
| "Conversion Rate": f"{self.metrics['conversion_rate']:.2f}%" | |
| }, | |
| "daily_stats": self.metrics["daily_stats"], | |
| "recent_events": self.events[-10:] # Last 10 events | |
| } | |
| class MCPEnrichmentService: | |
| """ | |
| Enrichment Service - Enrich prospect and contact data | |
| Real-world use case: Add company info, social profiles, tech stack | |
| """ | |
| def __init__(self): | |
| # Mock enrichment database | |
| self.enrichment_db = { | |
| "shopify.com": { | |
| "employee_count": "10,000+", | |
| "founded_year": 2006, | |
| "funding": "$2.9B", | |
| "tech_stack": ["Ruby on Rails", "React", "MySQL", "Redis"], | |
| "social_profiles": { | |
| "linkedin": "https://linkedin.com/company/shopify", | |
| "twitter": "https://twitter.com/shopify" | |
| }, | |
| "industry_tags": ["E-commerce", "SaaS", "Retail Tech"], | |
| "revenue_range": "$1B - $5B" | |
| }, | |
| "stripe.com": { | |
| "employee_count": "8,000+", | |
| "founded_year": 2010, | |
| "funding": "$2.2B", | |
| "tech_stack": ["Ruby", "Scala", "Go", "React"], | |
| "social_profiles": { | |
| "linkedin": "https://linkedin.com/company/stripe", | |
| "twitter": "https://twitter.com/stripe" | |
| }, | |
| "industry_tags": ["Fintech", "Payments", "SaaS"], | |
| "revenue_range": "$5B+" | |
| } | |
| } | |
| logger.info("MCP Enrichment Service initialized") | |
| async def enrich_company(self, domain: str) -> Dict: | |
| """Enrich company data with additional information""" | |
| logger.info(f"Enriching company data for: {domain}") | |
| # Check if we have enrichment data | |
| enriched_data = self.enrichment_db.get(domain, {}) | |
| if not enriched_data: | |
| # Generate estimated data based on domain | |
| enriched_data = { | |
| "employee_count": "Unknown", | |
| "founded_year": None, | |
| "funding": "Unknown", | |
| "tech_stack": [], | |
| "social_profiles": { | |
| "linkedin": f"https://linkedin.com/company/{domain.split('.')[0]}", | |
| "twitter": f"https://twitter.com/{domain.split('.')[0]}" | |
| }, | |
| "industry_tags": [], | |
| "revenue_range": "Unknown", | |
| "enrichment_source": "estimated" | |
| } | |
| else: | |
| enriched_data["enrichment_source"] = "database" | |
| return enriched_data | |
| async def enrich_contact(self, email: str, name: str) -> Dict: | |
| """Enrich contact data with social profiles and background""" | |
| logger.info(f"Enriching contact data for: {email}") | |
| # Extract info from email | |
| domain = email.split('@')[1] if '@' in email else '' | |
| username = email.split('@')[0] if '@' in email else '' | |
| return { | |
| "email": email, | |
| "name": name, | |
| "linkedin_profile": f"https://linkedin.com/in/{username.replace('.', '-')}", | |
| "twitter_profile": f"https://twitter.com/{username.replace('.', '_')}", | |
| "github_profile": f"https://github.com/{username.replace('.', '')}", | |
| "estimated_seniority": self._estimate_seniority(email, name), | |
| "enrichment_timestamp": datetime.utcnow().isoformat() | |
| } | |
| def _estimate_seniority(self, email: str, name: str) -> str: | |
| """Estimate seniority based on email patterns""" | |
| email_lower = email.lower() | |
| if any(x in email_lower for x in ['ceo', 'founder', 'chief']): | |
| return "Executive" | |
| elif any(x in email_lower for x in ['vp', 'director', 'head']): | |
| return "Senior" | |
| elif any(x in email_lower for x in ['manager', 'lead']): | |
| return "Mid-Level" | |
| else: | |
| return "Individual Contributor" | |
| class MCPValidationService: | |
| """ | |
| Validation Service - Validate emails, domains, and contact information | |
| Real-world use case: Reduce bounce rates and improve deliverability | |
| """ | |
| def __init__(self): | |
| # Common disposable email domains | |
| self.disposable_domains = [ | |
| "tempmail.com", "throwaway.email", "guerrillamail.com", | |
| "10minutemail.com", "mailinator.com" | |
| ] | |
| # Known invalid patterns | |
| self.invalid_patterns = [ | |
| "noreply@", "no-reply@", "donotreply@", | |
| "info@", "admin@", "support@", "sales@" | |
| ] | |
| self.validation_cache = {} | |
| logger.info("MCP Validation Service initialized") | |
| async def validate_email(self, email: str) -> Dict: | |
| """Validate email address""" | |
| logger.info(f"Validating email: {email}") | |
| result = { | |
| "email": email, | |
| "is_valid": False, | |
| "is_disposable": False, | |
| "is_role_based": False, | |
| "is_catchall": False, | |
| "deliverability_score": 0, | |
| "validation_issues": [], | |
| "validated_at": datetime.utcnow().isoformat() | |
| } | |
| # Basic format validation | |
| email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| if not re.match(email_regex, email): | |
| result["validation_issues"].append("Invalid email format") | |
| return result | |
| # Extract domain | |
| domain = email.split('@')[1] if '@' in email else '' | |
| # Check if disposable | |
| if domain in self.disposable_domains: | |
| result["is_disposable"] = True | |
| result["validation_issues"].append("Disposable email domain") | |
| # Check if role-based | |
| for pattern in self.invalid_patterns: | |
| if email.lower().startswith(pattern): | |
| result["is_role_based"] = True | |
| result["validation_issues"].append("Role-based email (low engagement)") | |
| break | |
| # Calculate deliverability score | |
| score = 100 | |
| if result["is_disposable"]: | |
| score -= 50 | |
| if result["is_role_based"]: | |
| score -= 30 | |
| if len(result["validation_issues"]) == 0: | |
| result["is_valid"] = True | |
| result["deliverability_score"] = max(0, score) | |
| return result | |
| async def validate_domain(self, domain: str) -> Dict: | |
| """Validate domain""" | |
| logger.info(f"Validating domain: {domain}") | |
| result = { | |
| "domain": domain, | |
| "is_valid": False, | |
| "has_mx_records": False, | |
| "is_active": False, | |
| "validation_issues": [], | |
| "validated_at": datetime.utcnow().isoformat() | |
| } | |
| # Basic domain format validation | |
| domain_regex = r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' | |
| if not re.match(domain_regex, domain): | |
| result["validation_issues"].append("Invalid domain format") | |
| return result | |
| # For demo purposes, assume most domains are valid | |
| # In production, would do DNS lookups | |
| result["is_valid"] = True | |
| result["has_mx_records"] = True | |
| result["is_active"] = True | |
| return result | |
| async def batch_validate_emails(self, emails: List[str]) -> List[Dict]: | |
| """Batch validate multiple emails""" | |
| logger.info(f"Batch validating {len(emails)} emails") | |
| results = [] | |
| for email in emails: | |
| validation = await self.validate_email(email) | |
| results.append(validation) | |
| return results | |
| class MCPSummaryService: | |
| """ | |
| Summary Service - Generate AI-powered summaries for companies and prospects | |
| Real-world use case: Create comprehensive, informative summaries for sales teams | |
| ENHANCED: Now uses LLM service with strict grounding to prevent hallucination | |
| """ | |
| def __init__(self): | |
| from services.llm_service import get_llm_service | |
| self.llm = get_llm_service() | |
| logger.info("MCP Summary Service initialized with LLM grounding support") | |
| async def generate_company_summary(self, company_data: Dict, enrichment_data: Dict = None) -> str: | |
| """ | |
| ENHANCED: Generate comprehensive AI summary for a prospect company | |
| Now uses LLM service with strict grounding | |
| Args: | |
| company_data: Basic company information (INCLUDING raw_facts if available) | |
| enrichment_data: Optional enriched data from enrichment service | |
| Returns: | |
| Detailed summary string grounded in facts | |
| """ | |
| logger.info(f"Generating GROUNDED summary for company: {company_data.get('name', 'Unknown')}") | |
| name = company_data.get('name', 'Unknown Company') | |
| # Merge enrichment data if available | |
| if enrichment_data: | |
| company_data = {**company_data, **enrichment_data} | |
| # Get raw facts for grounding | |
| raw_facts = company_data.get('raw_facts', []) | |
| # Use LLM service for grounded summarization | |
| summary = await self.llm.generate_grounded_summary( | |
| company_name=name, | |
| extracted_data=company_data, | |
| raw_facts=raw_facts, | |
| summary_type="prospect" | |
| ) | |
| return summary | |
| async def generate_prospect_summary( | |
| self, | |
| prospect_data: Dict, | |
| company_enrichment: Dict = None, | |
| contact_data: List[Dict] = None | |
| ) -> str: | |
| """ | |
| Generate comprehensive AI summary for a sales prospect | |
| Args: | |
| prospect_data: Basic prospect information with company data | |
| company_enrichment: Enriched company data | |
| contact_data: List of contacts found | |
| Returns: | |
| Detailed prospect summary | |
| """ | |
| logger.info("Generating prospect summary") | |
| company = prospect_data.get('company', {}) | |
| company_name = company.get('name', 'Unknown Company') | |
| domain = company.get('domain', '') | |
| industry = company.get('industry', 'Unknown') | |
| # Start with company summary | |
| company_summary = await self.generate_company_summary(company, company_enrichment) | |
| # Add prospect-specific insights | |
| fit_score = prospect_data.get('fit_score', 0.0) | |
| status = prospect_data.get('status', 'new') | |
| # Contacts analysis | |
| contact_summary = "" | |
| if contact_data: | |
| contact_count = len(contact_data) | |
| contact_summary = f" We have identified {contact_count} key contact{'s' if contact_count > 1 else ''}" | |
| # Identify decision makers | |
| decision_makers = [c for c in contact_data if any( | |
| title_word in c.get('title', '').lower() | |
| for title_word in ['ceo', 'cto', 'cfo', 'vp', 'director', 'head', 'chief'] | |
| )] | |
| if decision_makers: | |
| contact_summary += f", including {len(decision_makers)} decision-maker{'s' if len(decision_makers) > 1 else ''}" | |
| contact_summary += "." | |
| # Fit assessment | |
| fit_assessment = "" | |
| if fit_score > 0: | |
| if fit_score >= 0.8: | |
| fit_assessment = " **High Priority:** This prospect shows excellent fit based on company size, industry, and technology profile." | |
| elif fit_score >= 0.6: | |
| fit_assessment = " **Good Fit:** This prospect demonstrates strong alignment with our ideal customer profile." | |
| elif fit_score >= 0.4: | |
| fit_assessment = " **Moderate Fit:** This prospect shows potential but may require additional qualification." | |
| else: | |
| fit_assessment = " **Low Priority:** This prospect shows limited fit with our target criteria." | |
| # Combine all sections | |
| full_summary = company_summary + contact_summary + fit_assessment | |
| return full_summary | |
| async def generate_client_summary( | |
| self, | |
| client_data: Dict, | |
| enrichment_data: Dict = None | |
| ) -> str: | |
| """ | |
| ENHANCED: Generate comprehensive AI summary for CLIENT company (the company we're selling FOR) | |
| Now uses LLM service with strict grounding to prevent hallucination | |
| Args: | |
| client_data: Client profile data with offerings, value props, etc. (INCLUDING raw_facts) | |
| enrichment_data: Optional enriched data | |
| Returns: | |
| Detailed client summary grounded in extracted facts | |
| """ | |
| logger.info(f"Generating GROUNDED client summary for: {client_data.get('name', 'Unknown')}") | |
| name = client_data.get('name', 'Unknown Company') | |
| # Merge enrichment data if available | |
| if enrichment_data: | |
| client_data = {**client_data, **enrichment_data} | |
| # Get raw facts for grounding (if available) | |
| raw_facts = client_data.get('raw_facts', []) | |
| # Use LLM service for grounded summarization | |
| summary = await self.llm.generate_grounded_summary( | |
| company_name=name, | |
| extracted_data=client_data, | |
| raw_facts=raw_facts, | |
| summary_type="client" | |
| ) | |
| return summary | |
| # Singleton instances | |
| _analytics_service: Optional[MCPAnalyticsService] = None | |
| _enrichment_service: Optional[MCPEnrichmentService] = None | |
| _validation_service: Optional[MCPValidationService] = None | |
| _summary_service: Optional[MCPSummaryService] = None | |
| def get_analytics_service() -> MCPAnalyticsService: | |
| """Get or create analytics service instance""" | |
| global _analytics_service | |
| if _analytics_service is None: | |
| _analytics_service = MCPAnalyticsService() | |
| return _analytics_service | |
| def get_enrichment_service() -> MCPEnrichmentService: | |
| """Get or create enrichment service instance""" | |
| global _enrichment_service | |
| if _enrichment_service is None: | |
| _enrichment_service = MCPEnrichmentService() | |
| return _enrichment_service | |
| def get_validation_service() -> MCPValidationService: | |
| """Get or create validation service instance""" | |
| global _validation_service | |
| if _validation_service is None: | |
| _validation_service = MCPValidationService() | |
| return _validation_service | |
| def get_summary_service() -> MCPSummaryService: | |
| """Get or create summary service instance""" | |
| global _summary_service | |
| if _summary_service is None: | |
| _summary_service = MCPSummaryService() | |
| return _summary_service | |