Spaces:
Runtime error
Runtime error
| """ | |
| Test Contact Finder - Verify real person discovery works | |
| """ | |
| import asyncio | |
| import logging | |
| from services.enhanced_contact_finder import get_enhanced_contact_finder | |
| from services.prospect_discovery import get_prospect_discovery_service | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| async def test_enhanced_finder(): | |
| """Test the enhanced contact finder with real companies""" | |
| print("\n" + "="*80) | |
| print("TESTING ENHANCED CONTACT FINDER") | |
| print("="*80 + "\n") | |
| # Test companies | |
| test_cases = [ | |
| { | |
| "name": "Shopify", | |
| "domain": "shopify.com", | |
| "titles": ["CEO", "Chief Customer Officer", "VP Customer Experience"] | |
| }, | |
| { | |
| "name": "Stripe", | |
| "domain": "stripe.com", | |
| "titles": ["CEO", "Head of Customer Success", "VP Support"] | |
| }, | |
| { | |
| "name": "Airbnb", | |
| "domain": "airbnb.com", | |
| "titles": ["CEO", "Chief Customer Officer", "VP Community"] | |
| } | |
| ] | |
| finder = get_enhanced_contact_finder() | |
| for test in test_cases: | |
| print(f"\n{'-'*80}") | |
| print(f"Testing: {test['name']} ({test['domain']})") | |
| print(f"{'-'*80}\n") | |
| try: | |
| contacts = await finder.find_real_contacts( | |
| company_name=test['name'], | |
| domain=test['domain'], | |
| target_titles=test['titles'], | |
| max_contacts=3 | |
| ) | |
| if contacts: | |
| print(f"[OK] Found {len(contacts)} REAL contacts:\n") | |
| for i, contact in enumerate(contacts, 1): | |
| print(f" {i}. {contact.name}") | |
| print(f" Title: {contact.title}") | |
| print(f" Email: {contact.email}") | |
| print() | |
| else: | |
| print(f"[FAIL] No contacts found (will use fallback)\n") | |
| except Exception as e: | |
| print(f"[ERROR] {str(e)}\n") | |
| logger.error(f"Error testing {test['name']}: {e}") | |
| async def test_prospect_discovery(): | |
| """Test the full prospect discovery service""" | |
| print("\n" + "="*80) | |
| print("TESTING PROSPECT DISCOVERY SERVICE") | |
| print("="*80 + "\n") | |
| test_cases = [ | |
| { | |
| "name": "Zapier", | |
| "domain": "zapier.com", | |
| "size": 500 | |
| }, | |
| { | |
| "name": "Notion", | |
| "domain": "notion.so", | |
| "size": 200 | |
| } | |
| ] | |
| discovery = get_prospect_discovery_service() | |
| for test in test_cases: | |
| print(f"\n{'-'*80}") | |
| print(f"Testing: {test['name']} ({test['domain']}) - {test['size']} employees") | |
| print(f"{'-'*80}\n") | |
| try: | |
| contacts = await discovery.discover_contacts( | |
| company_name=test['name'], | |
| domain=test['domain'], | |
| company_size=test['size'], | |
| max_contacts=3, | |
| skip_search=False # Use real search | |
| ) | |
| if contacts: | |
| print(f"[OK] Found {len(contacts)} contacts:\n") | |
| for i, contact in enumerate(contacts, 1): | |
| print(f" {i}. {contact.name}") | |
| print(f" Title: {contact.title}") | |
| print(f" Email: {contact.email}") | |
| print() | |
| else: | |
| print(f"[FAIL] No contacts found\n") | |
| except Exception as e: | |
| print(f"[ERROR] {str(e)}\n") | |
| logger.error(f"Error testing {test['name']}: {e}") | |
| async def test_email_generation_with_contacts(): | |
| """Test that emails use contact names""" | |
| print("\n" + "="*80) | |
| print("TESTING EMAIL GENERATION WITH CONTACTS") | |
| print("="*80 + "\n") | |
| from app.schema import Prospect, Company, Contact | |
| from agents.writer import Writer | |
| from mcp.registry import MCPRegistry | |
| import uuid | |
| # Create a test prospect with real-looking contact | |
| prospect = Prospect( | |
| id=str(uuid.uuid4()), | |
| company=Company( | |
| id=str(uuid.uuid4()), | |
| name="Test E-commerce Co", | |
| domain="testecommerce.com", | |
| industry="E-commerce", | |
| size=150, | |
| pains=["High customer churn", "Poor support response times"], | |
| notes=["Growing fast, needs to scale support"] | |
| ), | |
| contacts=[ | |
| Contact( | |
| id=str(uuid.uuid4()), | |
| name="Sarah Johnson", | |
| email="[email protected]", | |
| title="VP Customer Experience", | |
| prospect_id="" | |
| ), | |
| Contact( | |
| id=str(uuid.uuid4()), | |
| name="Michael Chen", | |
| email="[email protected]", | |
| title="Director of Customer Success", | |
| prospect_id="" | |
| ) | |
| ] | |
| ) | |
| print(f"Company: {prospect.company.name}") | |
| print(f"Contacts:") | |
| for contact in prospect.contacts: | |
| print(f" - {contact.name} ({contact.title}) - {contact.email}") | |
| print() | |
| # Generate email | |
| registry = MCPRegistry() | |
| writer = Writer(registry) | |
| print("Generating personalized email...\n") | |
| try: | |
| result = await writer.run(prospect) | |
| if result.email_draft: | |
| print(f"[OK] Email Generated:\n") | |
| print(f"Subject: {result.email_draft.get('subject', 'N/A')}") | |
| print(f"\nBody:\n{result.email_draft.get('body', 'N/A')}") | |
| print() | |
| # Check if contact name is used | |
| body = result.email_draft.get('body', '') | |
| first_name = prospect.contacts[0].name.split()[0] | |
| if first_name in body: | |
| print(f"[OK] Contact first name '{first_name}' found in email!") | |
| else: | |
| print(f"[FAIL] Contact first name '{first_name}' NOT found in email!") | |
| print(f" This means personalization failed!") | |
| else: | |
| print(f"[FAIL] No email draft generated") | |
| except Exception as e: | |
| print(f"[ERROR] {str(e)}") | |
| logger.error(f"Error generating email: {e}") | |
| async def main(): | |
| """Run all tests""" | |
| print("\n[TEST] CONTACT FINDER TEST SUITE") | |
| print("=" * 80) | |
| # Test 1: Enhanced Contact Finder | |
| print("\n\n[TEST 1] Enhanced Contact Finder") | |
| await test_enhanced_finder() | |
| # Test 2: Prospect Discovery Service | |
| print("\n\n[TEST 2] Prospect Discovery Service") | |
| await test_prospect_discovery() | |
| # Test 3: Email Generation with Contacts | |
| print("\n\n[TEST 3] Email Generation with Contacts") | |
| await test_email_generation_with_contacts() | |
| print("\n\n" + "="*80) | |
| print("[DONE] ALL TESTS COMPLETE") | |
| print("="*80 + "\n") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |