Spaces:
Running
Running
File size: 3,947 Bytes
0646b18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
from langchain_core.tools import StructuredTool
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime, timedelta
import uuid
class Email(BaseModel):
"""Model for an individual email"""
id: str
sender: str
recipient: str
subject: str
body: str
timestamp: datetime
is_read: bool = False
class EmailsResponse(BaseModel):
"""Response model containing list of emails"""
emails: List[Email]
total_emails: int
class SendEmailRequest(BaseModel):
"""Request model for sending emails"""
recipient: str
subject: str
body: str
class SendEmailResponse(BaseModel):
"""Response model for sending emails"""
success: bool
message: str
email_id: Optional[str] = None
# Dummy email storage
DUMMY_EMAILS = [
Email(
id="email_001",
sender="[email protected]",
recipient="[email protected]",
subject="Weekly Team Meeting",
body="Hi there! Just a reminder about our weekly team meeting scheduled for Friday at 2 PM. Please come prepared with your project updates.",
timestamp=datetime.now() - timedelta(hours=2),
is_read=False,
),
Email(
id="email_002",
sender="[email protected]",
recipient="[email protected]",
subject="Your Account Has Been Updated",
body="We've successfully updated your account settings as requested. If you have any questions, please don't hesitate to contact our support team.",
timestamp=datetime.now() - timedelta(days=1),
is_read=True,
),
Email(
id="email_003",
sender="[email protected]",
recipient="[email protected]",
subject="Top 10 AI Trends for 2024",
body="Discover the latest AI trends that are shaping the future of technology. From machine learning breakthroughs to ethical AI considerations.",
timestamp=datetime.now() - timedelta(days=2),
is_read=False,
),
Email(
id="email_004",
sender="[email protected]",
recipient="[email protected]",
subject="Holiday Schedule Announcement",
body="Please find attached the holiday schedule for the upcoming quarter. Note the changes to the Thanksgiving week schedule.",
timestamp=datetime.now() - timedelta(days=3),
is_read=True,
),
]
def read_emails(limit: int = 10, unread_only: bool = False) -> EmailsResponse:
"""Read emails from Gmail inbox with dummy data"""
emails = DUMMY_EMAILS.copy()
if unread_only:
emails = [email for email in emails if not email.is_read]
# Sort by timestamp (newest first)
emails.sort(key=lambda x: x.timestamp, reverse=True)
# Apply limit
emails = emails[:limit]
return EmailsResponse(emails=emails, total_emails=len(emails))
def send_email(recipient: str, subject: str, body: str) -> SendEmailResponse:
"""Send an email using Gmail with dummy data"""
try:
# Generate a unique email ID
email_id = f"email_{uuid.uuid4().hex[:8]}"
# Create the email object
new_email = Email(
id=email_id,
sender="[email protected]",
recipient=recipient,
subject=subject,
body=body,
timestamp=datetime.now(),
is_read=True, # Sent emails are marked as read
)
# In a real implementation, this would send the email
# For now, we'll just add it to our dummy storage
DUMMY_EMAILS.insert(0, new_email)
return SendEmailResponse(
success=True, message=f"Email sent successfully to {recipient}", email_id=email_id
)
except Exception as e:
return SendEmailResponse(success=False, message=f"Failed to send email: {str(e)}")
read_tool = StructuredTool.from_function(read_emails)
send_tool = StructuredTool.from_function(send_email)
tools = [read_tool, send_tool]
|