Spaces:
Runtime error
Runtime error
| """ | |
| In-Memory MCP Services for Hugging Face Spaces | |
| These services run in-memory without requiring separate server processes | |
| """ | |
| import json | |
| import asyncio | |
| from typing import Dict, List, Optional, Any | |
| from pathlib import Path | |
| from datetime import datetime | |
| from services.web_search import get_search_service | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class InMemoryStoreService: | |
| """In-memory store service (replaces store_server.py for HF Spaces)""" | |
| def __init__(self, data_dir: Optional[Path] = None): | |
| self.data_dir = data_dir or Path(__file__).parent.parent / "data" | |
| self.data_dir.mkdir(exist_ok=True) | |
| # In-memory storage | |
| self.prospects = [] | |
| self.companies = [] | |
| self.facts = [] | |
| self.contacts = [] | |
| self.handoffs = [] | |
| self.suppressions = [] | |
| # Lock for thread safety | |
| self.lock = asyncio.Lock() | |
| # Load initial data | |
| self._load_data() | |
| def _load_data(self): | |
| """Load data from JSON files""" | |
| try: | |
| # Load prospects | |
| prospects_file = self.data_dir / "prospects.json" | |
| if prospects_file.exists(): | |
| with open(prospects_file) as f: | |
| content = json.load(f) | |
| self.prospects = content if content else [] | |
| # Load companies | |
| companies_file = self.data_dir / "companies_store.json" | |
| if companies_file.exists(): | |
| with open(companies_file) as f: | |
| content = json.load(f) | |
| self.companies = content if content else [] | |
| # Load facts | |
| facts_file = self.data_dir / "facts.json" | |
| if facts_file.exists(): | |
| with open(facts_file) as f: | |
| content = json.load(f) | |
| self.facts = content if content else [] | |
| # Load contacts | |
| contacts_file = self.data_dir / "contacts.json" | |
| if contacts_file.exists(): | |
| with open(contacts_file) as f: | |
| content = json.load(f) | |
| self.contacts = content if content else [] | |
| # Load handoffs | |
| handoffs_file = self.data_dir / "handoffs.json" | |
| if handoffs_file.exists(): | |
| with open(handoffs_file) as f: | |
| content = json.load(f) | |
| self.handoffs = content if content else [] | |
| # Load suppressions | |
| supp_file = self.data_dir / "suppression.json" | |
| if supp_file.exists(): | |
| with open(supp_file) as f: | |
| content = json.load(f) | |
| self.suppressions = content if content else [] | |
| logger.info("In-memory store loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading store data: {e}") | |
| async def save_prospect(self, prospect: Dict) -> str: | |
| """Save or update a prospect (prevents duplicates by domain)""" | |
| async with self.lock: | |
| # Check for duplicate by ID first | |
| found = False | |
| for i, p in enumerate(self.prospects): | |
| if p["id"] == prospect["id"]: | |
| self.prospects[i] = prospect | |
| found = True | |
| break | |
| # If not found by ID, check for duplicate by domain | |
| if not found: | |
| company = prospect.get("company", {}) | |
| domain = company.get("domain", "") | |
| if domain: | |
| for existing in self.prospects: | |
| existing_domain = existing.get("company", {}).get("domain", "") | |
| if existing_domain and existing_domain.lower() == domain.lower(): | |
| logger.warning(f"Duplicate prospect detected for domain: {domain}. Updating existing prospect.") | |
| # Update the existing prospect instead of creating duplicate | |
| for i, p in enumerate(self.prospects): | |
| if p.get("company", {}).get("domain", "").lower() == domain.lower(): | |
| self.prospects[i] = prospect | |
| found = True | |
| break | |
| break | |
| if not found: | |
| self.prospects.append(prospect) | |
| return "saved" | |
| async def get_prospect(self, prospect_id: str) -> Optional[Dict]: | |
| """Get a prospect by ID""" | |
| for p in self.prospects: | |
| if p["id"] == prospect_id: | |
| return p | |
| return None | |
| async def list_prospects(self) -> List[Dict]: | |
| """List all prospects""" | |
| return self.prospects | |
| async def save_company(self, company: Dict) -> str: | |
| """Save or update a company""" | |
| async with self.lock: | |
| found = False | |
| for i, c in enumerate(self.companies): | |
| if c["id"] == company["id"]: | |
| self.companies[i] = company | |
| found = True | |
| break | |
| if not found: | |
| self.companies.append(company) | |
| return "saved" | |
| async def get_company(self, company_id: str) -> Optional[Dict]: | |
| """Get a company by ID""" | |
| # Check in-memory | |
| for c in self.companies: | |
| if c["id"] == company_id: | |
| return c | |
| # Check seed file | |
| seed_file = self.data_dir / "companies.json" | |
| if seed_file.exists(): | |
| with open(seed_file) as f: | |
| seeds = json.load(f) | |
| for c in seeds: | |
| if c["id"] == company_id: | |
| return c | |
| return None | |
| async def save_fact(self, fact: Dict) -> str: | |
| """Save a fact""" | |
| async with self.lock: | |
| existing_ids = {f.get("id") for f in self.facts if f.get("id")} | |
| if fact.get("id") not in existing_ids: | |
| self.facts.append(fact) | |
| return "saved" | |
| async def save_contact(self, contact: Dict) -> str: | |
| """Save a contact (prevents duplicates by email)""" | |
| async with self.lock: | |
| # Check for duplicate by ID first | |
| found = False | |
| for i, c in enumerate(self.contacts): | |
| if c.get("id") == contact.get("id"): | |
| self.contacts[i] = contact | |
| found = True | |
| break | |
| # If not found by ID, check for duplicate by email | |
| if not found: | |
| email = contact.get("email", "").lower() | |
| if email: | |
| for existing in self.contacts: | |
| existing_email = existing.get("email", "").lower() | |
| if existing_email and existing_email == email: | |
| logger.warning(f"Duplicate contact detected for email: {email}. Skipping.") | |
| # Don't add duplicate, return existing | |
| return "duplicate_skipped" | |
| # Not a duplicate, add it | |
| self.contacts.append(contact) | |
| return "saved" | |
| async def list_contacts_by_domain(self, domain: str) -> List[Dict]: | |
| """List contacts by domain""" | |
| results = [] | |
| for c in self.contacts: | |
| if isinstance(c, dict) and "email" in c: | |
| email = c["email"] | |
| if email.endswith(f"@{domain}"): | |
| results.append(c) | |
| return results | |
| async def check_suppression(self, supp_type: str, value: str) -> bool: | |
| """Check if an email/domain is suppressed""" | |
| for supp in self.suppressions: | |
| if isinstance(supp, dict): | |
| if supp.get("type") == supp_type and supp.get("value") == value: | |
| # Check expiry | |
| if supp.get("expires_at"): | |
| try: | |
| expires = datetime.fromisoformat(supp["expires_at"].replace("Z", "+00:00")) | |
| if expires < datetime.utcnow(): | |
| continue | |
| except: | |
| pass | |
| return True | |
| return False | |
| async def save_handoff(self, packet: Dict) -> str: | |
| """Save a handoff packet""" | |
| async with self.lock: | |
| self.handoffs.append(packet) | |
| return "saved" | |
| async def clear_all(self) -> str: | |
| """Clear all data""" | |
| async with self.lock: | |
| self.prospects = [] | |
| self.companies = [] | |
| self.facts = [] | |
| self.contacts = [] | |
| self.handoffs = [] | |
| return "cleared" | |
| class InMemorySearchService: | |
| """In-memory search service using web search""" | |
| def __init__(self): | |
| self.search = get_search_service() | |
| logger.info("In-memory search service initialized") | |
| async def query(self, q: str, max_results: int = 5) -> List[Dict]: | |
| """Perform search query""" | |
| if not q: | |
| return [] | |
| logger.info(f"In-memory search query: '{q}'") | |
| # Perform real web search | |
| search_results = await self.search.search(q, max_results=max_results) | |
| # Format results for MCP protocol (with backward compatibility) | |
| results = [] | |
| for result in search_results: | |
| body_text = result.get('body', '') | |
| results.append({ | |
| "text": body_text, # MCP protocol format | |
| "body": body_text, # Backward compatibility with WebSearchService | |
| "title": result.get('title', ''), | |
| "source": result.get('source', ''), | |
| "url": result.get('url', ''), | |
| "ts": datetime.utcnow().isoformat(), | |
| "confidence": 0.8 | |
| }) | |
| logger.info(f"Returning {len(results)} search results") | |
| return results | |
| class InMemoryEmailService: | |
| """In-memory email service (mock for Gradio demo)""" | |
| def __init__(self): | |
| self.threads = {} | |
| self.messages = [] | |
| logger.info("In-memory email service initialized") | |
| async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str: | |
| """Send an email (simulated)""" | |
| thread_id = f"thread_{prospect_id}_{datetime.utcnow().timestamp()}" | |
| # Create thread | |
| self.threads[prospect_id] = { | |
| "id": thread_id, | |
| "prospect_id": prospect_id, | |
| "messages": [] | |
| } | |
| # Create message | |
| message = { | |
| "id": f"msg_{len(self.messages)}", | |
| "thread_id": thread_id, | |
| "prospect_id": prospect_id, | |
| "direction": "outbound", | |
| "subject": subject, | |
| "body": body, | |
| "sent_at": datetime.utcnow().isoformat() | |
| } | |
| self.threads[prospect_id]["messages"].append(message) | |
| self.messages.append(message) | |
| logger.info(f"Simulated email sent to {to}") | |
| return thread_id | |
| async def get_thread(self, prospect_id: str) -> Optional[Dict]: | |
| """Get email thread for a prospect""" | |
| return self.threads.get(prospect_id) | |
| class InMemoryCalendarService: | |
| """In-memory calendar service (mock for Gradio demo)""" | |
| def __init__(self): | |
| logger.info("In-memory calendar service initialized") | |
| async def suggest_slots(self) -> List[Dict[str, str]]: | |
| """Suggest calendar slots""" | |
| # Return mock slots with correct key names | |
| from datetime import datetime, timedelta | |
| base_date = datetime.now() + timedelta(days=1) | |
| return [ | |
| { | |
| "start_iso": (base_date.replace(hour=10, minute=0, second=0)).isoformat(), | |
| "end_iso": (base_date.replace(hour=11, minute=0, second=0)).isoformat(), | |
| "title": "Initial consultation" | |
| }, | |
| { | |
| "start_iso": (base_date + timedelta(days=1)).replace(hour=14, minute=0, second=0).isoformat(), | |
| "end_iso": (base_date + timedelta(days=1)).replace(hour=15, minute=0, second=0).isoformat(), | |
| "title": "Product demo" | |
| }, | |
| { | |
| "start_iso": (base_date + timedelta(days=2)).replace(hour=9, minute=0, second=0).isoformat(), | |
| "end_iso": (base_date + timedelta(days=2)).replace(hour=10, minute=0, second=0).isoformat(), | |
| "title": "Follow-up discussion" | |
| } | |
| ] | |
| async def generate_ics(self, slot: Dict) -> str: | |
| """Generate ICS calendar file""" | |
| # Mock ICS generation | |
| return f"BEGIN:VCALENDAR\nVERSION:2.0\nEND:VCALENDAR" | |
| # Singleton instances | |
| _store_service: Optional[InMemoryStoreService] = None | |
| _search_service: Optional[InMemorySearchService] = None | |
| _email_service: Optional[InMemoryEmailService] = None | |
| _calendar_service: Optional[InMemoryCalendarService] = None | |
| def get_in_memory_store() -> InMemoryStoreService: | |
| """Get or create in-memory store service""" | |
| global _store_service | |
| if _store_service is None: | |
| _store_service = InMemoryStoreService() | |
| return _store_service | |
| def get_in_memory_search() -> InMemorySearchService: | |
| """Get or create in-memory search service""" | |
| global _search_service | |
| if _search_service is None: | |
| _search_service = InMemorySearchService() | |
| return _search_service | |
| def get_in_memory_email() -> InMemoryEmailService: | |
| """Get or create in-memory email service""" | |
| global _email_service | |
| if _email_service is None: | |
| _email_service = InMemoryEmailService() | |
| return _email_service | |
| def get_in_memory_calendar() -> InMemoryCalendarService: | |
| """Get or create in-memory calendar service""" | |
| global _calendar_service | |
| if _calendar_service is None: | |
| _calendar_service = InMemoryCalendarService() | |
| return _calendar_service | |