Spaces:
Running
Running
| from langchain_core.tools import StructuredTool | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from datetime import datetime, timedelta | |
| import uuid | |
| class Email(BaseModel): | |
| """Model for an individual email""" | |
| id: str | |
| sender: str | |
| recipient: str | |
| subject: str | |
| body: str | |
| timestamp: datetime | |
| is_read: bool = False | |
| class EmailsResponse(BaseModel): | |
| """Response model containing list of emails""" | |
| emails: List[Email] | |
| total_emails: int | |
| class SendEmailRequest(BaseModel): | |
| """Request model for sending emails""" | |
| recipient: str | |
| subject: str | |
| body: str | |
| class SendEmailResponse(BaseModel): | |
| """Response model for sending emails""" | |
| success: bool | |
| message: str | |
| email_id: Optional[str] = None | |
| # Dummy email storage | |
| DUMMY_EMAILS = [ | |
| Email( | |
| id="email_001", | |
| sender="[email protected]", | |
| recipient="[email protected]", | |
| subject="Weekly Team Meeting", | |
| body="Hi there! Just a reminder about our weekly team meeting scheduled for Friday at 2 PM. Please come prepared with your project updates.", | |
| timestamp=datetime.now() - timedelta(hours=2), | |
| is_read=False, | |
| ), | |
| Email( | |
| id="email_002", | |
| sender="[email protected]", | |
| recipient="[email protected]", | |
| subject="Your Account Has Been Updated", | |
| body="We've successfully updated your account settings as requested. If you have any questions, please don't hesitate to contact our support team.", | |
| timestamp=datetime.now() - timedelta(days=1), | |
| is_read=True, | |
| ), | |
| Email( | |
| id="email_003", | |
| sender="[email protected]", | |
| recipient="[email protected]", | |
| subject="Top 10 AI Trends for 2024", | |
| body="Discover the latest AI trends that are shaping the future of technology. From machine learning breakthroughs to ethical AI considerations.", | |
| timestamp=datetime.now() - timedelta(days=2), | |
| is_read=False, | |
| ), | |
| Email( | |
| id="email_004", | |
| sender="[email protected]", | |
| recipient="[email protected]", | |
| subject="Holiday Schedule Announcement", | |
| body="Please find attached the holiday schedule for the upcoming quarter. Note the changes to the Thanksgiving week schedule.", | |
| timestamp=datetime.now() - timedelta(days=3), | |
| is_read=True, | |
| ), | |
| ] | |
| def read_emails(limit: int = 10, unread_only: bool = False) -> EmailsResponse: | |
| """Read emails from Gmail inbox with dummy data""" | |
| emails = DUMMY_EMAILS.copy() | |
| if unread_only: | |
| emails = [email for email in emails if not email.is_read] | |
| # Sort by timestamp (newest first) | |
| emails.sort(key=lambda x: x.timestamp, reverse=True) | |
| # Apply limit | |
| emails = emails[:limit] | |
| return EmailsResponse(emails=emails, total_emails=len(emails)) | |
| def send_email(recipient: str, subject: str, body: str) -> SendEmailResponse: | |
| """Send an email using Gmail with dummy data""" | |
| try: | |
| # Generate a unique email ID | |
| email_id = f"email_{uuid.uuid4().hex[:8]}" | |
| # Create the email object | |
| new_email = Email( | |
| id=email_id, | |
| sender="[email protected]", | |
| recipient=recipient, | |
| subject=subject, | |
| body=body, | |
| timestamp=datetime.now(), | |
| is_read=True, # Sent emails are marked as read | |
| ) | |
| # In a real implementation, this would send the email | |
| # For now, we'll just add it to our dummy storage | |
| DUMMY_EMAILS.insert(0, new_email) | |
| return SendEmailResponse( | |
| success=True, message=f"Email sent successfully to {recipient}", email_id=email_id | |
| ) | |
| except Exception as e: | |
| return SendEmailResponse(success=False, message=f"Failed to send email: {str(e)}") | |
| read_tool = StructuredTool.from_function(read_emails) | |
| send_tool = StructuredTool.from_function(send_email) | |
| tools = [read_tool, send_tool] | |