cuga-agent / docs /examples /cuga_with_runtime_tools /langchain_example_tool.py
Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
from langchain_core.tools import StructuredTool
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime, timedelta
import uuid
class Email(BaseModel):
"""Model for an individual email"""
id: str
sender: str
recipient: str
subject: str
body: str
timestamp: datetime
is_read: bool = False
class EmailsResponse(BaseModel):
"""Response model containing list of emails"""
emails: List[Email]
total_emails: int
class SendEmailRequest(BaseModel):
"""Request model for sending emails"""
recipient: str
subject: str
body: str
class SendEmailResponse(BaseModel):
"""Response model for sending emails"""
success: bool
message: str
email_id: Optional[str] = None
# Dummy email storage
DUMMY_EMAILS = [
Email(
id="email_001",
sender="[email protected]",
recipient="[email protected]",
subject="Weekly Team Meeting",
body="Hi there! Just a reminder about our weekly team meeting scheduled for Friday at 2 PM. Please come prepared with your project updates.",
timestamp=datetime.now() - timedelta(hours=2),
is_read=False,
),
Email(
id="email_002",
sender="[email protected]",
recipient="[email protected]",
subject="Your Account Has Been Updated",
body="We've successfully updated your account settings as requested. If you have any questions, please don't hesitate to contact our support team.",
timestamp=datetime.now() - timedelta(days=1),
is_read=True,
),
Email(
id="email_003",
sender="[email protected]",
recipient="[email protected]",
subject="Top 10 AI Trends for 2024",
body="Discover the latest AI trends that are shaping the future of technology. From machine learning breakthroughs to ethical AI considerations.",
timestamp=datetime.now() - timedelta(days=2),
is_read=False,
),
Email(
id="email_004",
sender="[email protected]",
recipient="[email protected]",
subject="Holiday Schedule Announcement",
body="Please find attached the holiday schedule for the upcoming quarter. Note the changes to the Thanksgiving week schedule.",
timestamp=datetime.now() - timedelta(days=3),
is_read=True,
),
]
def read_emails(limit: int = 10, unread_only: bool = False) -> EmailsResponse:
"""Read emails from Gmail inbox with dummy data"""
emails = DUMMY_EMAILS.copy()
if unread_only:
emails = [email for email in emails if not email.is_read]
# Sort by timestamp (newest first)
emails.sort(key=lambda x: x.timestamp, reverse=True)
# Apply limit
emails = emails[:limit]
return EmailsResponse(emails=emails, total_emails=len(emails))
def send_email(recipient: str, subject: str, body: str) -> SendEmailResponse:
"""Send an email using Gmail with dummy data"""
try:
# Generate a unique email ID
email_id = f"email_{uuid.uuid4().hex[:8]}"
# Create the email object
new_email = Email(
id=email_id,
sender="[email protected]",
recipient=recipient,
subject=subject,
body=body,
timestamp=datetime.now(),
is_read=True, # Sent emails are marked as read
)
# In a real implementation, this would send the email
# For now, we'll just add it to our dummy storage
DUMMY_EMAILS.insert(0, new_email)
return SendEmailResponse(
success=True, message=f"Email sent successfully to {recipient}", email_id=email_id
)
except Exception as e:
return SendEmailResponse(success=False, message=f"Failed to send email: {str(e)}")
read_tool = StructuredTool.from_function(read_emails)
send_tool = StructuredTool.from_function(send_email)
tools = [read_tool, send_tool]