Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import logging | |
| import asyncio | |
| import time | |
| from datetime import datetime | |
| from enum import Enum | |
| from typing import Dict, List, Optional, Set, Union, Any | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import hashlib | |
| import tempfile | |
| import shutil | |
| import gradio as gr | |
| import networkx as nx | |
| from langchain.prompts import PromptTemplate, MessagesPlaceholder | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.agents import Tool, AgentType | |
| from langchain_community.llms import HuggingFacePipeline | |
| from langchain_community.agents import initialize_agent | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| import subprocess | |
| import asyncio | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # Load the LLM and tokenizer | |
| def load_model(): | |
| """Load the model and tokenizer.""" | |
| try: | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| model_name = "gpt2" # Using a smaller model for testing | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| return tokenizer, model | |
| except Exception as e: | |
| logger.error(f"Failed to load model: {str(e)}") | |
| raise | |
| # Initialize models lazily | |
| tokenizer = None | |
| model = None | |
| hf_pipeline = None | |
| llm = None | |
| def get_llm(): | |
| """Get or initialize the language model.""" | |
| global llm, tokenizer, model, hf_pipeline | |
| try: | |
| if llm is None: | |
| tokenizer, model = load_model() | |
| hf_pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=500, | |
| pad_token_id=tokenizer.eos_token_id, | |
| temperature=0.7, | |
| return_full_text=False, | |
| ) | |
| llm = HuggingFacePipeline( | |
| pipeline=hf_pipeline, | |
| model_kwargs={"max_length": 2048} | |
| ) | |
| return llm | |
| except Exception as e: | |
| logger.error(f"Failed to get LLM: {str(e)}") | |
| raise | |
| def get_agent(agent_type): | |
| """Get or initialize an agent with the specified type.""" | |
| try: | |
| llm = get_llm() | |
| tools = [ | |
| Tool( | |
| name="Code Formatter", | |
| func=lambda x: subprocess.run(["black", "-"], input=x.encode(), capture_output=True).stdout.decode(), | |
| description="Formats code using Black.", | |
| ), | |
| Tool( | |
| name="API Generator", | |
| func=lambda x: json.dumps({"endpoints": {"example": "POST - Example endpoint."}}), | |
| description="Generates API details from code.", | |
| ), | |
| Tool( | |
| name="Task Decomposer", | |
| func=lambda x: json.dumps({"tasks": ["Design UI", "Develop Backend", "Test App", "Deploy App"]}), | |
| description="Breaks down app requirements into smaller tasks.", | |
| ), | |
| ] | |
| memory = ConversationBufferMemory( | |
| memory_key="chat_history", | |
| return_messages=True | |
| ) | |
| agent_kwargs = { | |
| "extra_prompt_messages": [MessagesPlaceholder(variable_name="chat_history")], | |
| } | |
| return initialize_agent( | |
| tools=tools, | |
| llm=llm, | |
| agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION, | |
| verbose=True, | |
| memory=memory, | |
| agent_kwargs=agent_kwargs, | |
| handle_parsing_errors=True, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to get agent: {str(e)}") | |
| raise | |
| # Enhanced prompt templates with more specific instructions | |
| ui_designer_prompt = PromptTemplate( | |
| input_variables=["input"], | |
| template="""You are an expert UI Designer specializing in modern, responsive web applications. | |
| Task: {input} | |
| Focus on: | |
| 1. Clean, intuitive user interface | |
| 2. Responsive design principles | |
| 3. Modern UI components | |
| 4. Accessibility standards | |
| 5. Cross-browser compatibility | |
| Generate code using: | |
| - HTML5 semantic elements | |
| - Modern CSS (Flexbox/Grid) | |
| - React/Vue.js best practices | |
| - Material UI or Tailwind CSS | |
| Provide detailed component structure and styling.""" | |
| ) | |
| backend_developer_prompt = PromptTemplate( | |
| input_variables=["input"], | |
| template="""You are an expert Backend Developer specializing in scalable applications. | |
| Task: {input} | |
| Focus on: | |
| 1. RESTful API design | |
| 2. Database schema optimization | |
| 3. Security best practices | |
| 4. Error handling | |
| 5. Performance optimization | |
| Include: | |
| - API endpoint definitions | |
| - Database models | |
| - Authentication/Authorization | |
| - Input validation | |
| - Error handling middleware | |
| - Rate limiting | |
| - Logging | |
| Use modern backend frameworks (FastAPI/Django/Express).""" | |
| ) | |
| qa_engineer_prompt = PromptTemplate( | |
| input_variables=["input"], | |
| template="""You are an expert QA Engineer focusing on comprehensive testing. | |
| Task: {input} | |
| Implement: | |
| 1. Unit tests | |
| 2. Integration tests | |
| 3. API endpoint tests | |
| 4. UI component tests | |
| 5. Performance tests | |
| Include: | |
| - Test cases for edge cases | |
| - Input validation tests | |
| - Error handling tests | |
| - Load testing scenarios | |
| - Security testing checks""" | |
| ) | |
| devops_engineer_prompt = PromptTemplate( | |
| input_variables=["input"], | |
| template="""You are an expert DevOps Engineer specializing in modern deployment practices. | |
| Task: {input} | |
| Provide: | |
| 1. Dockerfile configuration | |
| 2. Docker Compose setup | |
| 3. CI/CD pipeline configuration | |
| 4. Environment configuration | |
| 5. Monitoring setup | |
| Include: | |
| - Development/Production configs | |
| - Environment variables | |
| - Health checks | |
| - Logging setup | |
| - Monitoring integration | |
| - Backup strategies""" | |
| ) | |
| def generate_project_structure(app_name, features): | |
| """Generate a complete project structure based on features.""" | |
| return f""" | |
| {app_name}/ | |
| ├── frontend/ | |
| │ ├── src/ | |
| │ │ ├── components/ | |
| │ │ ├── pages/ | |
| │ │ ├── hooks/ | |
| │ │ ├── utils/ | |
| │ │ └── styles/ | |
| │ ├── package.json | |
| │ └── README.md | |
| ├── backend/ | |
| │ ├── src/ | |
| │ │ ├── routes/ | |
| │ │ ├── controllers/ | |
| │ │ ├── models/ | |
| │ │ ├── middleware/ | |
| │ │ └── utils/ | |
| │ ├── requirements.txt | |
| │ └── README.md | |
| ├── tests/ | |
| │ ├── unit/ | |
| │ ├── integration/ | |
| │ └── e2e/ | |
| ├── docs/ | |
| │ ├── API.md | |
| │ ├── SETUP.md | |
| │ └── DEPLOYMENT.md | |
| ├── docker-compose.yml | |
| ├── .env.example | |
| └── README.md | |
| """ | |
| def generate_documentation(app_name, features, api_details): | |
| """Generate comprehensive documentation.""" | |
| return f""" | |
| # {app_name} | |
| ## Overview | |
| A modern web application with the following features: | |
| {features} | |
| ## Quick Start | |
| ```bash | |
| # Clone the repository | |
| git clone <repository-url> | |
| # Install dependencies | |
| cd {app_name} | |
| # Frontend | |
| cd frontend && npm install | |
| # Backend | |
| cd ../backend && pip install -r requirements.txt | |
| # Run the application | |
| docker-compose up | |
| ``` | |
| ## API Documentation | |
| {api_details} | |
| ## Development | |
| - Frontend: React.js with TypeScript | |
| - Backend: Python with FastAPI | |
| - Database: PostgreSQL | |
| - Cache: Redis | |
| - Testing: Jest, Pytest | |
| ## Deployment | |
| Includes Docker configuration for easy deployment: | |
| - Frontend container | |
| - Backend container | |
| - Database container | |
| - Redis container | |
| ## Testing | |
| ```bash | |
| # Run frontend tests | |
| cd frontend && npm test | |
| # Run backend tests | |
| cd backend && pytest | |
| ``` | |
| ## Contributing | |
| Please read CONTRIBUTING.md for details on our code of conduct and the process for submitting pull requests. | |
| ## License | |
| This project is licensed under the MIT License - see the LICENSE.md file for details | |
| """ | |
| # AI Flow States and Types | |
| class FlowState(Enum): | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class AgentRole(Enum): | |
| ARCHITECT = "architect" | |
| UI_DESIGNER = "ui_designer" | |
| BACKEND_DEVELOPER = "backend_developer" | |
| DATABASE_ENGINEER = "database_engineer" | |
| SECURITY_EXPERT = "security_expert" | |
| QA_ENGINEER = "qa_engineer" | |
| DEVOPS_ENGINEER = "devops_engineer" | |
| DOCUMENTATION_WRITER = "documentation_writer" | |
| class AgentContext: | |
| """Context information for each agent in the flow.""" | |
| role: AgentRole | |
| state: FlowState | |
| artifacts: Dict[str, str] | |
| dependencies: List[AgentRole] | |
| feedback: List[str] | |
| class AIFlow: | |
| """Manages the flow of work between different AI agents.""" | |
| def __init__(self): | |
| self.flow_graph = nx.DiGraph() | |
| self.contexts: Dict[AgentRole, AgentContext] = {} | |
| self.global_context = {} | |
| def initialize_flow(self): | |
| """Initialize the AI Flow with agent relationships and dependencies.""" | |
| # Define agent relationships | |
| flow_structure = { | |
| AgentRole.ARCHITECT: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
| AgentRole.UI_DESIGNER: [AgentRole.QA_ENGINEER], | |
| AgentRole.BACKEND_DEVELOPER: [AgentRole.SECURITY_EXPERT, AgentRole.QA_ENGINEER], | |
| AgentRole.DATABASE_ENGINEER: [AgentRole.SECURITY_EXPERT], | |
| AgentRole.SECURITY_EXPERT: [AgentRole.QA_ENGINEER], | |
| AgentRole.QA_ENGINEER: [AgentRole.DEVOPS_ENGINEER], | |
| AgentRole.DEVOPS_ENGINEER: [AgentRole.DOCUMENTATION_WRITER], | |
| AgentRole.DOCUMENTATION_WRITER: [] | |
| } | |
| # Build the flow graph | |
| for role, dependencies in flow_structure.items(): | |
| self.flow_graph.add_node(role) | |
| for dep in dependencies: | |
| self.flow_graph.add_edge(role, dep) | |
| # Initialize context for each agent | |
| self.contexts[role] = AgentContext( | |
| role=role, | |
| state=FlowState.PENDING, | |
| artifacts={}, | |
| dependencies=dependencies, | |
| feedback=[] | |
| ) | |
| async def execute_flow(self, requirements: str): | |
| """Execute the AI Flow with parallel processing where possible.""" | |
| try: | |
| self.initialize_flow() | |
| self.global_context["requirements"] = requirements | |
| # Get all paths through the flow graph | |
| paths = list(nx.all_simple_paths( | |
| self.flow_graph, | |
| AgentRole.ARCHITECT, | |
| AgentRole.DOCUMENTATION_WRITER | |
| )) | |
| # Execute paths in parallel | |
| await self._execute_paths(paths) | |
| return self._compile_results() | |
| except Exception as e: | |
| logger.error(f"Flow execution failed: {str(e)}") | |
| raise | |
| async def _execute_paths(self, paths: List[List[AgentRole]]): | |
| """Execute all paths in the flow graph.""" | |
| try: | |
| results = [] | |
| for path in paths: | |
| path_results = [] | |
| for role in path: | |
| # Get the agent's prompt based on previous results | |
| prompt = self._generate_prompt(role, path_results) | |
| # Execute the agent's task | |
| result = await self._execute_agent_task(role, prompt) | |
| path_results.append(result) | |
| # Store result in context | |
| self.context_manager.add_memory( | |
| f"{role.value}_result", | |
| result, | |
| {"timestamp": datetime.now()} | |
| ) | |
| results.extend(path_results) | |
| # Store all results in context | |
| self.context_manager.add_memory( | |
| "path_results", | |
| results, | |
| {"timestamp": datetime.now()} | |
| ) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Failed to execute paths: {str(e)}") | |
| raise | |
| def _generate_prompt(self, role: AgentRole, previous_results: List[str]) -> str: | |
| """Generate a prompt for an agent based on previous results.""" | |
| requirements = self.context_manager.global_context.get("requirements", "") | |
| # Base prompt with requirements | |
| prompt = f"Requirements: {requirements}\n\n" | |
| # Add context from previous results | |
| if previous_results: | |
| prompt += "Previous work:\n" | |
| for i, result in enumerate(previous_results): | |
| prompt += f"{i+1}. {result}\n" | |
| # Add role-specific instructions | |
| if role == AgentRole.ARCHITECT: | |
| prompt += "\nAs the Architect, design the high-level system architecture." | |
| elif role == AgentRole.UI_DESIGNER: | |
| prompt += "\nAs the UI Designer, create the user interface design." | |
| elif role == AgentRole.BACKEND_DEVELOPER: | |
| prompt += "\nAs the Backend Developer, implement the server-side logic." | |
| elif role == AgentRole.DATABASE_ENGINEER: | |
| prompt += "\nAs the Database Engineer, design the data model and storage." | |
| elif role == AgentRole.SECURITY_EXPERT: | |
| prompt += "\nAs the Security Expert, ensure security best practices." | |
| elif role == AgentRole.QA_ENGINEER: | |
| prompt += "\nAs the QA Engineer, create test cases and validation." | |
| elif role == AgentRole.DEVOPS_ENGINEER: | |
| prompt += "\nAs the DevOps Engineer, set up deployment and CI/CD." | |
| elif role == AgentRole.DOCUMENTATION_WRITER: | |
| prompt += "\nAs the Documentation Writer, create comprehensive documentation." | |
| return prompt | |
| def _compile_results(self) -> str: | |
| """Compile all results into a final output.""" | |
| try: | |
| results = [] | |
| # Get all results from memory | |
| for role in AgentRole: | |
| result = self.context_manager.get_memory(f"{role.value}_result") | |
| if result: | |
| results.append(f"## {role.value}\n{result['value']}\n") | |
| return "\n".join(results) | |
| except Exception as e: | |
| logger.error(f"Failed to compile results: {str(e)}") | |
| raise | |
| async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
| """Execute a specific agent's task with the given prompt.""" | |
| try: | |
| if role == AgentRole.ARCHITECT: | |
| agent = get_agent("architect") | |
| elif role == AgentRole.UI_DESIGNER: | |
| agent = get_agent("ui_designer") | |
| elif role == AgentRole.BACKEND_DEVELOPER: | |
| agent = get_agent("backend_developer") | |
| elif role == AgentRole.DATABASE_ENGINEER: | |
| agent = get_agent("database_engineer") | |
| elif role == AgentRole.SECURITY_EXPERT: | |
| agent = get_agent("security_expert") | |
| elif role == AgentRole.QA_ENGINEER: | |
| agent = get_agent("qa_engineer") | |
| elif role == AgentRole.DEVOPS_ENGINEER: | |
| agent = get_agent("devops_engineer") | |
| elif role == AgentRole.DOCUMENTATION_WRITER: | |
| agent = get_agent("documentation_writer") | |
| else: | |
| raise ValueError(f"Unknown agent role: {role}") | |
| # Execute the agent's task | |
| result = await asyncio.to_thread(agent.run, prompt) | |
| # Log the execution | |
| logger.info(f"Agent {role.value} completed task") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Agent {role.value} failed: {str(e)}") | |
| raise | |
| class FileContext: | |
| """Context for file operations and tracking.""" | |
| path: Path | |
| content: str | |
| last_modified: datetime | |
| dependencies: Set[Path] | |
| checksum: str | |
| def from_path(cls, path: Path): | |
| content = path.read_text() | |
| return cls( | |
| path=path, | |
| content=content, | |
| last_modified=datetime.fromtimestamp(path.stat().st_mtime), | |
| dependencies=set(), | |
| checksum=hashlib.md5(content.encode()).hexdigest() | |
| ) | |
| class MemoryItem: | |
| """Represents a single memory item in the system.""" | |
| key: str | |
| value: Any | |
| context: dict | |
| timestamp: datetime | |
| importance: float = 1.0 | |
| references: Set[str] = field(default_factory=set) | |
| class ContextManager: | |
| """Manages real-time context awareness across the system.""" | |
| def __init__(self): | |
| self.file_contexts: Dict[Path, FileContext] = {} | |
| self.global_context: Dict[str, Any] = {} | |
| self.command_history: List[Dict] = [] | |
| self.memory_store: Dict[str, MemoryItem] = {} | |
| def update_file_context(self, path: Path) -> FileContext: | |
| """Update context for a specific file.""" | |
| context = FileContext.from_path(path) | |
| self.file_contexts[path] = context | |
| return context | |
| def get_related_files(self, path: Path) -> Set[Path]: | |
| """Find files related to the given file.""" | |
| if path not in self.file_contexts: | |
| self.update_file_context(path) | |
| context = self.file_contexts[path] | |
| return context.dependencies | |
| def track_command(self, command: str, args: List[str], result: Any): | |
| """Track command execution and results.""" | |
| self.command_history.append({ | |
| 'command': command, | |
| 'args': args, | |
| 'result': result, | |
| 'timestamp': datetime.now(), | |
| }) | |
| def add_memory(self, key: str, value: Any, context: dict = None): | |
| """Add an item to the memory store.""" | |
| self.memory_store[key] = MemoryItem( | |
| key=key, | |
| value=value, | |
| context=context or {}, | |
| timestamp=datetime.now() | |
| ) | |
| def get_memory(self, key: str) -> Any: | |
| """Retrieve an item from memory.""" | |
| item = self.memory_store.get(key) | |
| return item.value if item else None | |
| class FileOperationManager: | |
| """Manages multi-file operations and tracking.""" | |
| def __init__(self, context_manager: ContextManager): | |
| self.context_manager = context_manager | |
| self.pending_changes: Dict[Path, str] = {} | |
| async def edit_files(self, changes: Dict[Path, str]): | |
| """Apply changes to multiple files atomically.""" | |
| try: | |
| # Validate all changes first | |
| for path, content in changes.items(): | |
| if not self._validate_change(path, content): | |
| raise ValueError(f"Invalid change for {path}") | |
| # Apply changes | |
| for path, content in changes.items(): | |
| await self._apply_change(path, content) | |
| # Update contexts | |
| for path in changes: | |
| self.context_manager.update_file_context(path) | |
| except Exception as e: | |
| logger.error(f"Failed to apply multi-file changes: {str(e)}") | |
| raise | |
| def _validate_change(self, path: Path, content: str) -> bool: | |
| """Validate a proposed file change.""" | |
| try: | |
| # Check file exists or can be created | |
| if not path.parent.exists(): | |
| path.parent.mkdir(parents=True) | |
| # Validate syntax if it's a Python file | |
| if path.suffix == '.py': | |
| compile(content, str(path), 'exec') | |
| return True | |
| except Exception as e: | |
| logger.error(f"Validation failed for {path}: {str(e)}") | |
| return False | |
| async def _apply_change(self, path: Path, content: str): | |
| """Apply a single file change.""" | |
| path.write_text(content) | |
| class CommandManager: | |
| """Manages command suggestions and execution.""" | |
| def __init__(self, context_manager: ContextManager): | |
| self.context_manager = context_manager | |
| self.command_templates: Dict[str, str] = {} | |
| def suggest_commands(self, context: dict) -> List[Dict]: | |
| """Suggest relevant commands based on context.""" | |
| suggestions = [] | |
| for cmd_name, template in self.command_templates.items(): | |
| if self._is_relevant(cmd_name, context): | |
| suggestions.append({ | |
| 'command': cmd_name, | |
| 'template': template, | |
| 'confidence': self._calculate_confidence(cmd_name, context) | |
| }) | |
| return sorted(suggestions, key=lambda x: x['confidence'], reverse=True) | |
| async def execute_command(self, command: str, args: List[str]) -> Any: | |
| """Execute a command and track its result.""" | |
| try: | |
| # Execute the command | |
| result = await self._run_command(command, args) | |
| # Track the execution | |
| self.context_manager.track_command(command, args, result) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Command execution failed: {str(e)}") | |
| raise | |
| def _is_relevant(self, cmd_name: str, context: dict) -> bool: | |
| """Determine if a command is relevant to the current context.""" | |
| # Implementation depends on specific rules | |
| return True | |
| def _calculate_confidence(self, cmd_name: str, context: dict) -> float: | |
| """Calculate confidence score for a command suggestion.""" | |
| # Implementation depends on specific metrics | |
| return 1.0 | |
| class RuleSystem: | |
| """Manages system rules and constraints.""" | |
| def __init__(self): | |
| self.rules: Dict[str, callable] = {} | |
| self.constraints: Dict[str, callable] = {} | |
| def add_rule(self, name: str, rule_func: callable): | |
| """Add a new rule to the system.""" | |
| self.rules[name] = rule_func | |
| def add_constraint(self, name: str, constraint_func: callable): | |
| """Add a new constraint to the system.""" | |
| self.constraints[name] = constraint_func | |
| def evaluate_rules(self, context: dict) -> Dict[str, bool]: | |
| """Evaluate all rules against the current context.""" | |
| return {name: rule(context) for name, rule in self.rules.items()} | |
| def check_constraints(self, context: dict) -> Dict[str, bool]: | |
| """Check all constraints against the current context.""" | |
| return {name: constraint(context) for name, constraint in self.constraints.items()} | |
| class ProjectBuilder: | |
| """Handles autonomous creation of project files and folders.""" | |
| def __init__(self, base_path: Path): | |
| self.base_path = Path(base_path) | |
| self.current_build = None | |
| self.file_manifest = [] | |
| async def create_project(self, app_name: str, structure: dict) -> Path: | |
| """Create a new project with the specified structure.""" | |
| try: | |
| # Create temporary build directory | |
| build_dir = Path(tempfile.mkdtemp()) | |
| self.current_build = build_dir / app_name | |
| self.current_build.mkdir(parents=True) | |
| # Create project structure | |
| await self._create_structure(self.current_build, structure) | |
| return self.current_build | |
| except Exception as e: | |
| logger.error(f"Project creation failed: {str(e)}") | |
| if self.current_build and self.current_build.exists(): | |
| shutil.rmtree(self.current_build) | |
| raise | |
| async def _create_structure(self, parent: Path, structure: dict): | |
| """Recursively create project structure.""" | |
| for name, content in structure.items(): | |
| path = parent / name | |
| if isinstance(content, dict): | |
| path.mkdir(exist_ok=True) | |
| await self._create_structure(path, content) | |
| else: | |
| path.write_text(str(content)) | |
| self.file_manifest.append(path) | |
| class OutputManager: | |
| """Manages project outputs and creates downloadable artifacts.""" | |
| def __init__(self, project_builder: ProjectBuilder): | |
| self.project_builder = project_builder | |
| self.output_dir = Path(tempfile.mkdtemp()) | |
| self.downloads = {} | |
| def create_download(self, app_name: str) -> str: | |
| """Create a downloadable zip file of the project.""" | |
| try: | |
| if not self.project_builder.current_build: | |
| raise ValueError("No project has been built yet") | |
| # Create zip file | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_name = f"{app_name}_{timestamp}.zip" | |
| zip_path = self.output_dir / zip_name | |
| with ZipFile(zip_path, 'w') as zipf: | |
| for file_path in self.project_builder.file_manifest: | |
| rel_path = file_path.relative_to(self.project_builder.current_build) | |
| zipf.write(file_path, rel_path) | |
| # Store download info | |
| self.downloads[zip_name] = { | |
| 'path': zip_path, | |
| 'created_at': datetime.now(), | |
| 'size': zip_path.stat().st_size | |
| } | |
| return str(zip_path) | |
| except Exception as e: | |
| logger.error(f"Failed to create download: {str(e)}") | |
| raise | |
| class EnhancedAIFlow(AIFlow): | |
| """Enhanced AI Flow with project building and output management.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.project_builder = ProjectBuilder(Path(tempfile.mkdtemp())) | |
| self.output_manager = OutputManager(self.project_builder) | |
| self.context_manager = ContextManager() | |
| self.file_manager = FileOperationManager(self.context_manager) | |
| self.command_manager = CommandManager(self.context_manager) | |
| self.rule_system = RuleSystem() | |
| self.flow_graph = nx.DiGraph() | |
| self.contexts: Dict[AgentRole, AgentContext] = {} | |
| self.global_context = {} | |
| self.requirements = "" | |
| def initialize_flow(self): | |
| """Initialize the AI Flow with agent relationships and dependencies.""" | |
| # Create nodes for each agent role | |
| for role in AgentRole: | |
| self.flow_graph.add_node(role) | |
| self.contexts[role] = AgentContext( | |
| role=role, | |
| state=FlowState.PENDING, | |
| artifacts={}, | |
| dependencies=[], | |
| feedback=[] | |
| ) | |
| # Define dependencies | |
| dependencies = { | |
| AgentRole.UI_DESIGNER: [AgentRole.ARCHITECT], | |
| AgentRole.BACKEND_DEVELOPER: [AgentRole.ARCHITECT], | |
| AgentRole.DATABASE_ENGINEER: [AgentRole.ARCHITECT, AgentRole.BACKEND_DEVELOPER], | |
| AgentRole.SECURITY_EXPERT: [AgentRole.ARCHITECT, AgentRole.BACKEND_DEVELOPER], | |
| AgentRole.QA_ENGINEER: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER], | |
| AgentRole.DEVOPS_ENGINEER: [AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
| AgentRole.DOCUMENTATION_WRITER: [AgentRole.ARCHITECT, AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER] | |
| } | |
| # Add edges based on dependencies | |
| for role, deps in dependencies.items(): | |
| for dep in deps: | |
| self.flow_graph.add_edge(dep, role) | |
| self.contexts[role].dependencies.extend(deps) | |
| def _generate_prompt(self, role: AgentRole) -> str: | |
| """Generate a prompt for an agent based on context and dependencies.""" | |
| try: | |
| context = self.contexts[role] | |
| dependencies_output = [] | |
| # Gather outputs from dependencies (limited to last 1000 chars) | |
| for dep_role in context.dependencies: | |
| dep_context = self.contexts[dep_role] | |
| if dep_context.state == FlowState.COMPLETED and "output" in dep_context.artifacts: | |
| output = dep_context.artifacts['output'] | |
| if len(output) > 1000: | |
| output = output[:997] + "..." | |
| dependencies_output.append(f"## {dep_role.value} Output:\n{output}") | |
| # Build role-specific prompts (with size limits) | |
| role_prompts = { | |
| AgentRole.ARCHITECT: """Design the high-level architecture (brief overview): | |
| Requirements: {requirements} | |
| Focus: system design, components, tech stack, data flow, scalability""", | |
| AgentRole.UI_DESIGNER: """Design the UI (key elements): | |
| Requirements: {requirements} | |
| Previous: {dependencies} | |
| Focus: UX, layout, responsiveness, themes""", | |
| AgentRole.BACKEND_DEVELOPER: """Implement core backend logic: | |
| Requirements: {requirements} | |
| Architecture: {dependencies} | |
| Focus: API, business logic, validation""", | |
| AgentRole.DATABASE_ENGINEER: """Design data layer: | |
| Requirements: {requirements} | |
| Context: {dependencies} | |
| Focus: schema, relationships, optimization""", | |
| AgentRole.SECURITY_EXPERT: """Review security: | |
| Requirements: {requirements} | |
| Context: {dependencies} | |
| Focus: auth, data protection, best practices""", | |
| AgentRole.QA_ENGINEER: """Design testing: | |
| Requirements: {requirements} | |
| Implementation: {dependencies} | |
| Focus: coverage, automation, edge cases""", | |
| AgentRole.DEVOPS_ENGINEER: """Setup deployment: | |
| Requirements: {requirements} | |
| Context: {dependencies} | |
| Focus: CI/CD, infrastructure, monitoring""", | |
| AgentRole.DOCUMENTATION_WRITER: """Create docs: | |
| Requirements: {requirements} | |
| System: {dependencies} | |
| Focus: setup, API docs, guides""" | |
| } | |
| # Get the base prompt for the role | |
| base_prompt = role_prompts.get(role, "") | |
| # Truncate requirements if too long | |
| requirements = self.requirements | |
| if len(requirements) > 1000: | |
| requirements = requirements[:997] + "..." | |
| # Format the prompt with requirements and dependencies | |
| formatted_prompt = base_prompt.format( | |
| requirements=requirements, | |
| dependencies="\n\n".join(dependencies_output) if dependencies_output else "No previous context available." | |
| ) | |
| return formatted_prompt | |
| except Exception as e: | |
| logger.error(f"Failed to generate prompt for {role}: {str(e)}") | |
| raise | |
| async def execute_flow(self, requirements: str) -> str: | |
| """Execute the AI Flow and build the project.""" | |
| try: | |
| # Initialize flow with requirements | |
| self.requirements = requirements | |
| self.initialize_flow() | |
| # Extract app name from requirements | |
| app_name = requirements.split()[0].lower().replace(" ", "_") | |
| # Execute agents in parallel where possible | |
| paths = list(nx.all_simple_paths(self.flow_graph, AgentRole.ARCHITECT, AgentRole.DOCUMENTATION_WRITER)) | |
| results = await self._execute_paths(paths) | |
| # Generate project structure and documentation | |
| project_structure = generate_project_structure(app_name, self.contexts[AgentRole.ARCHITECT].artifacts) | |
| documentation = generate_documentation(app_name, requirements, self.contexts[AgentRole.DOCUMENTATION_WRITER].artifacts) | |
| return f""" | |
| # {app_name.title()} - Generated Application | |
| ## Project Structure | |
| ``` | |
| {project_structure} | |
| ``` | |
| ## Documentation | |
| {documentation} | |
| ## Next Steps | |
| 1. Review the generated architecture and components | |
| 2. Set up the development environment | |
| 3. Implement the components following the provided structure | |
| 4. Run the test suite | |
| 5. Deploy using the provided configurations | |
| ## Support | |
| For any issues or questions, please refer to the documentation or create an issue in the repository. | |
| """ | |
| except Exception as e: | |
| logger.error(f"Failed to execute flow: {str(e)}") | |
| raise | |
| finally: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| async def _execute_paths(self, paths: List[List[AgentRole]]) -> List[str]: | |
| """Execute all paths in the flow graph.""" | |
| try: | |
| # Execute paths in parallel | |
| tasks = [] | |
| for path in paths: | |
| for role in path: | |
| if self.contexts[role].state == FlowState.PENDING: | |
| tasks.append(self._execute_agent(role)) | |
| self.contexts[role].state = FlowState.RUNNING | |
| # Wait for all tasks to complete | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results | |
| for result in results: | |
| if isinstance(result, Exception): | |
| raise result | |
| return results | |
| except Exception as e: | |
| logger.error(f"Failed to execute paths: {str(e)}") | |
| raise | |
| async def _execute_agent(self, role: AgentRole) -> str: | |
| """Execute a single agent's tasks with enhanced context.""" | |
| try: | |
| # Generate prompt | |
| prompt = self._generate_prompt(role) | |
| # Execute agent's task | |
| result = await self._execute_agent_task(role, prompt) | |
| # Update context | |
| self.contexts[role].state = FlowState.COMPLETED | |
| self.contexts[role].artifacts["output"] = result | |
| return result | |
| except Exception as e: | |
| logger.error(f"Failed to execute agent {role}: {str(e)}") | |
| self.contexts[role].state = FlowState.FAILED | |
| raise | |
| async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
| """Execute a specific agent's task with the given prompt.""" | |
| try: | |
| # Get agent | |
| agent = get_agent(role) | |
| # Execute the agent's task | |
| result = await asyncio.to_thread(agent.run, prompt) | |
| # Process and return the result | |
| return result | |
| except Exception as e: | |
| logger.error(f"Agent task execution failed for {role}: {str(e)}") | |
| raise | |
| # Update the multi_agent_workflow function to use AI Flows | |
| async def multi_agent_workflow(requirements: str) -> str: | |
| """Execute a multi-agent workflow using AI Flows to generate a complex app.""" | |
| try: | |
| # Create AI Flow instance | |
| ai_flow = EnhancedAIFlow() | |
| # Generate the app | |
| result = await ai_flow.execute_flow(requirements) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Multi-agent workflow failed: {str(e)}") | |
| raise | |
| # Update the app_generator function to handle async execution | |
| async def app_generator(requirements: str) -> Dict[str, str]: | |
| """Generate an app based on the provided requirements using AI Flows.""" | |
| try: | |
| # Create AI Flow instance | |
| ai_flow = EnhancedAIFlow() | |
| # Generate the app | |
| result = await ai_flow.execute_flow(requirements) | |
| # Create downloadable output | |
| download_path = ai_flow.output_manager.create_download("generated_app") | |
| return { | |
| "output": result, | |
| "download_path": str(download_path) if download_path else None | |
| } | |
| except Exception as e: | |
| logger.error(f"App generation failed: {str(e)}") | |
| raise | |
| async def stream_output(requirements, progress=gr.Progress()): | |
| """Stream the output during app generation.""" | |
| try: | |
| # Initialize | |
| stream_handler.update(" Starting app generation...", "Initializing") | |
| yield "Starting...", None, " Starting app generation...", "Initializing" | |
| # Update progress | |
| phases = [ | |
| (" Analyzing requirements...", "Analyzing"), | |
| (" Generating architecture...", "Designing"), | |
| (" Creating project structure...", "Creating"), | |
| (" Implementing features...", "Implementing"), | |
| (" Finalizing...", "Finalizing") | |
| ] | |
| for msg, status in progress.tqdm(phases): | |
| stream_handler.update(msg, status) | |
| yield None, None, "\n".join(stream_handler.output), status | |
| await asyncio.sleep(1) # Non-blocking sleep | |
| # Generate the app | |
| stream_handler.update(" Running AI Flow system...", "Processing") | |
| yield None, None, "\n".join(stream_handler.output), "Processing" | |
| try: | |
| # Run the app generator with a timeout | |
| async with asyncio.timeout(60): # 60 second timeout | |
| result = await app_generator(requirements) | |
| # Update output with result | |
| if result["output"]: | |
| stream_handler.update("\n" + result["output"], "Completed") | |
| yield result["output"], result["download_path"], "\n".join(stream_handler.output), "Completed" | |
| else: | |
| raise Exception("No output generated") | |
| except asyncio.TimeoutError: | |
| stream_handler.update("\nApp generation timed out after 60 seconds", "Failed") | |
| yield None, None, "\n".join(stream_handler.output), "Failed" | |
| raise | |
| except Exception as e: | |
| error_msg = f"\nError: {str(e)}" | |
| stream_handler.update(error_msg, "Failed") | |
| yield None, None, "\n".join(stream_handler.output), "Failed" | |
| raise | |
| finally: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| class StreamHandler: | |
| """Handles streaming output for the Gradio interface.""" | |
| def __init__(self): | |
| self.output = [] | |
| self.current_status = "" | |
| def update(self, message: str, status: str = None): | |
| """Update the output stream.""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] {message}" | |
| self.output.append(formatted_message) | |
| if status: | |
| self.current_status = status | |
| # Keep only the last 100 lines | |
| if len(self.output) > 100: | |
| self.output = self.output[-100:] | |
| return "\n".join(self.output), self.current_status | |
| # Gradio UI | |
| with gr.Blocks(theme=gr.themes.Soft()) as ui: | |
| stream_handler = StreamHandler() | |
| gr.Markdown("# Autonomous App Generator with AI Flow") | |
| gr.Markdown(""" | |
| ## Instructions | |
| 1. Describe the app you want to build in detail | |
| 2. Include any specific requirements or features | |
| 3. Click 'Generate App' to start the process | |
| 4. Download your generated app from the provided link | |
| ### Example: | |
| ``` | |
| Create a personal task management application with: | |
| - User authentication (email/password, Google OAuth) | |
| - Task management (CRUD, priorities, due dates, reminders) | |
| - Modern UI with dark/light theme | |
| - Real-time updates using WebSocket | |
| - PostgreSQL and Redis for storage | |
| ``` | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| requirements_input = gr.Textbox( | |
| label="App Requirements", | |
| placeholder="Describe the app you want to build...", | |
| lines=10 | |
| ) | |
| with gr.Row(): | |
| generate_button = gr.Button("Generate App", variant="primary") | |
| cancel_button = gr.Button("Cancel", variant="stop") | |
| status = gr.Textbox( | |
| label="Status", | |
| value="Ready", | |
| interactive=False | |
| ) | |
| with gr.Column(scale=6): | |
| with gr.Tabs(): | |
| with gr.TabItem("Output"): | |
| output = gr.Markdown( | |
| label="Generated App Details", | |
| value="Your app details will appear here..." | |
| ) | |
| with gr.TabItem("Download"): | |
| file_output = gr.File( | |
| label="Download Generated App", | |
| interactive=False | |
| ) | |
| with gr.TabItem("Live Log"): | |
| log_output = gr.Textbox( | |
| label="Generation Logs", | |
| value="Logs will appear here...", | |
| lines=10, | |
| max_lines=20, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| async def stream_output(requirements, progress=gr.Progress()): | |
| """Stream the output during app generation.""" | |
| try: | |
| # Initialize | |
| stream_handler.update(" Starting app generation...", "Initializing") | |
| yield "Starting...", None, " Starting app generation...", "Initializing" | |
| # Update progress | |
| phases = [ | |
| (" Analyzing requirements...", "Analyzing"), | |
| (" Generating architecture...", "Designing"), | |
| (" Creating project structure...", "Creating"), | |
| (" Implementing features...", "Implementing"), | |
| (" Finalizing...", "Finalizing") | |
| ] | |
| for msg, status in progress.tqdm(phases): | |
| stream_handler.update(msg, status) | |
| yield None, None, "\n".join(stream_handler.output), status | |
| await asyncio.sleep(1) # Non-blocking sleep | |
| # Generate the app | |
| stream_handler.update(" Running AI Flow system...", "Processing") | |
| yield None, None, "\n".join(stream_handler.output), "Processing" | |
| try: | |
| # Run the app generator with a timeout | |
| async with asyncio.timeout(60): # 60 second timeout | |
| result = await app_generator(requirements) | |
| # Update output with result | |
| if result["output"]: | |
| stream_handler.update("\n" + result["output"], "Completed") | |
| yield result["output"], result["download_path"], "\n".join(stream_handler.output), "Completed" | |
| else: | |
| raise Exception("No output generated") | |
| except asyncio.TimeoutError: | |
| stream_handler.update("\nApp generation timed out after 60 seconds", "Failed") | |
| yield None, None, "\n".join(stream_handler.output), "Failed" | |
| raise | |
| except Exception as e: | |
| error_msg = f"\nError: {str(e)}" | |
| stream_handler.update(error_msg, "Failed") | |
| yield None, None, "\n".join(stream_handler.output), "Failed" | |
| raise | |
| finally: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| def cancel_generation(): | |
| """Cancel the current generation process.""" | |
| stream_handler.update(" Generation cancelled by user", "Cancelled") | |
| return "Generation cancelled", None, "\n".join(stream_handler.output), "Cancelled" | |
| generate_button.click( | |
| stream_output, | |
| inputs=[requirements_input], | |
| outputs=[output, file_output, log_output, status], | |
| show_progress=True | |
| ) | |
| cancel_button.click( | |
| cancel_generation, | |
| outputs=[output, file_output, log_output, status] | |
| ) | |
| # Run the Gradio app | |
| if __name__ == "__main__": | |
| try: | |
| ui.launch( | |
| share=True, # Enable sharing | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to launch Gradio interface: {str(e)}") |