""" Research Agent for Disaster Risk Construction Planner Gathers construction recommendations using DuckDuckGo search and web fetching with LLM analysis """ import asyncio import os import logging from typing import List, Dict, Any, AsyncGenerator, Optional from langchain_openai import ChatOpenAI from langchain_community.tools import DuckDuckGoSearchResults from langchain_community.utilities import DuckDuckGoSearchAPIWrapper import httpx from bs4 import BeautifulSoup from models import ( RiskData, BuildingType, Recommendations, RecommendationDetail, BuildingCodeReference, HazardDetail ) # Configure logging logger = logging.getLogger(__name__) class ResearchAgent: """Agentic research agent using LLM with DuckDuckGo search""" def __init__(self): """Initialize research agent""" self.model_name = os.getenv('OPENAI_MODEL', 'gpt-4o-mini') # Initialize DuckDuckGo search tool try: search_wrapper = DuckDuckGoSearchAPIWrapper(max_results=5) self.search_tool = DuckDuckGoSearchResults(api_wrapper=search_wrapper) logger.info("DuckDuckGo search tool initialized") except Exception as e: logger.warning(f"Failed to initialize DuckDuckGo search: {e}") self.search_tool = None self.system_prompt = """You are an expert construction research agent for disaster-resistant building in the Philippines. Your role is to: 1. Search for construction guidelines and building codes using web search 2. Analyze construction recommendations from authoritative sources 3. Provide practical, actionable advice for construction professionals 4. Focus on disaster-resistant construction techniques specific to Philippine hazards 5. Reference Philippine building codes (NBCP, NSCP) and international standards When providing recommendations: - Prioritize hazards based on severity (CRITICAL > HIGH > MODERATE > LOW) - Explain technical terms in plain language - Provide specific construction techniques and materials - Include cost implications when relevant - Reference building codes and standards - Consider the specific building type requirements Always structure your response with: 1. General Construction Guidelines 2. Hazard-Specific Recommendations (by category) 3. Priority Actions 4. Building Code References """ async def get_agentic_recommendations( self, risks: RiskData, building_type: BuildingType ) -> Recommendations: """ Get agentic construction recommendations with LLM analysis Uses hybrid approach: 1. Extract risk types from risk data 2. Search for guidelines using DuckDuckGo MCP 3. Fetch page content using Fetch MCP 4. Use LLM to analyze and synthesize recommendations Args: risks: Risk assessment data building_type: Type of building Returns: Construction recommendations with LLM-enhanced analysis """ try: logger.info(f"Starting agentic research for {building_type}") # Extract risk types from RiskData risk_types = self._extract_risk_types(risks) logger.info(f"Identified risk types: {', '.join(risk_types)}") # Search for guidelines search_results = await self.search_guidelines(risk_types, building_type) logger.info(f"Found {len(search_results)} search results") # Fetch page content from top results page_contents = await self.fetch_page_content(search_results) logger.info(f"Fetched {len(page_contents)} page contents") # Check if LLM is available openai_api_key = os.getenv('OPENAI_API_KEY') if openai_api_key and openai_api_key != 'dummy-key-for-blaxel': try: logger.info("Using LLM for intelligent synthesis...") # Use LLM to synthesize recommendations recommendations = await self._synthesize_with_llm( page_contents, risks, building_type, risk_types ) logger.info("LLM synthesis completed successfully") return recommendations except Exception as llm_error: logger.warning(f"LLM synthesis failed: {str(llm_error)}, falling back to rule-based synthesis") else: logger.info("No OpenAI API key configured, using rule-based synthesis") # Fall back to rule-based synthesis recommendations = self.synthesize_recommendations( page_contents, risks, building_type ) return recommendations except Exception as e: logger.error(f"Agentic research failed: {str(e)}", exc_info=True) # Fall back to basic recommendations return self._generate_fallback_recommendations(risks, building_type) async def get_construction_recommendations( self, risks: RiskData, building_type: BuildingType ) -> Recommendations: """ Main entry point for research (backwards compatible) Args: risks: Risk assessment data building_type: Type of building Returns: Construction recommendations """ return await self.get_agentic_recommendations(risks, building_type) async def get_streaming_recommendations( self, risks: RiskData, building_type: BuildingType ) -> AsyncGenerator[str, None]: """ Get streaming construction recommendations with LLM analysis Args: risks: Risk assessment data building_type: Type of building Yields: Streaming recommendations from the LLM """ try: yield f"Researching construction recommendations for {building_type.replace('_', ' ')}...\n\n" # Extract risk types risk_types = self._extract_risk_types(risks) yield f"✓ Identified {len(risk_types)} risk types: {', '.join(risk_types)}\n\n" # Search for guidelines yield "Searching for construction guidelines...\n" search_results = await self.search_guidelines(risk_types, building_type) yield f"✓ Found {len(search_results)} relevant sources\n\n" # Fetch page content yield "Fetching detailed information...\n" page_contents = await self.fetch_page_content(search_results) yield f"✓ Retrieved {len(page_contents)} documents\n\n" # Check if LLM is available openai_api_key = os.getenv('OPENAI_API_KEY') if openai_api_key and openai_api_key != 'dummy-key-for-blaxel': yield "Analyzing with AI...\n\n" yield "=" * 60 + "\n\n" try: # Stream LLM analysis async for chunk in self._stream_llm_synthesis( page_contents, risks, building_type, risk_types ): yield chunk yield "\n\n" + "=" * 60 + "\n" yield "\n✓ Research complete\n" except Exception as llm_error: logger.error(f"LLM streaming failed: {str(llm_error)}") yield f"\n\nLLM analysis failed: {str(llm_error)}\n" yield "Showing structured recommendations instead...\n\n" else: yield "\nNote: LLM analysis not available (no OPENAI_API_KEY configured)\n" yield "Showing structured recommendations:\n\n" yield "=" * 60 + "\n\n" except Exception as e: logger.error(f"Streaming research failed: {str(e)}", exc_info=True) yield f"\n\nError during research: {str(e)}\n" def _extract_risk_types(self, risks: RiskData) -> List[str]: """ Extract active risk types from RiskData Args: risks: Risk assessment data Returns: List of active risk types """ risk_types = [] # Check seismic hazards seismic = risks.hazards.seismic if self._is_hazard_active(seismic.active_fault): risk_types.append("earthquake") if self._is_hazard_active(seismic.liquefaction): risk_types.append("liquefaction") if self._is_hazard_active(seismic.tsunami): risk_types.append("tsunami") # Check volcanic hazards volcanic = risks.hazards.volcanic if self._is_hazard_active(volcanic.active_volcano): risk_types.append("volcanic") if self._is_hazard_active(volcanic.ashfall): risk_types.append("ashfall") # Check hydrometeorological hazards hydro = risks.hazards.hydrometeorological if self._is_hazard_active(hydro.flood): risk_types.append("flood") if self._is_hazard_active(hydro.rain_induced_landslide): risk_types.append("landslide") if self._is_hazard_active(hydro.storm_surge): risk_types.append("storm surge") if self._is_hazard_active(hydro.severe_winds): risk_types.append("typhoon") return risk_types if risk_types else ["general disaster"] def _is_hazard_active(self, hazard: HazardDetail) -> bool: """ Check if a hazard is active/present Args: hazard: Hazard detail Returns: True if hazard is active """ if not hazard: return False status_lower = hazard.status.lower() return status_lower not in ["none", "not present", "no data", "n/a"] def _build_search_query(self, risk_types: List[str], building_type: BuildingType) -> str: """ Build search query for construction guidelines Args: risk_types: List of risk types building_type: Type of building Returns: Search query string """ building_type_str = building_type.replace("_", " ") risk_str = " ".join(risk_types[:2]) # Use top 2 risk types return f"Philippines {risk_str} resistant construction guidelines {building_type_str}" async def search_guidelines( self, risk_types: List[str], building_type: BuildingType ) -> List[Dict[str, Any]]: """ Search for disaster-resistant construction guidelines using DuckDuckGo Args: risk_types: List of risk types to search for building_type: Type of building Returns: List of search results with URLs and snippets """ if not self.search_tool: logger.warning("DuckDuckGo search tool not available") return [] try: all_results = [] building_type_str = building_type.replace("_", " ") # Build search queries for each risk type for risk_type in risk_types[:3]: # Limit to top 3 risk types query = f"Philippines {risk_type} resistant construction guidelines {building_type_str}" try: logger.info(f"Searching: {query}") # Use the search tool synchronously (it's not async) results_str = await asyncio.to_thread(self.search_tool.run, query) # Parse results - DuckDuckGo returns a string with results if results_str: # Results are in format: [snippet: ..., title: ..., link: ...] # Parse into structured format parsed_results = self._parse_search_results(results_str) all_results.extend(parsed_results) logger.info(f"Found {len(parsed_results)} results for {risk_type}") except Exception as e: logger.error(f"Error searching for {risk_type}: {e}") # Add general Philippines building code search try: code_query = f"Philippines National Building Code {building_type_str} disaster resistant" logger.info(f"Searching: {code_query}") results_str = await asyncio.to_thread(self.search_tool.run, code_query) if results_str: parsed_results = self._parse_search_results(results_str) all_results.extend(parsed_results) logger.info(f"Found {len(parsed_results)} building code results") except Exception as e: logger.error(f"Error searching for building codes: {e}") return all_results except Exception as e: logger.error(f"Error in search_guidelines: {e}") return [] def _parse_search_results(self, results_str: str) -> List[Dict[str, Any]]: """ Parse DuckDuckGo search results string into structured format Args: results_str: Raw search results string Returns: List of parsed results with title, url, snippet """ parsed = [] try: # Results are in format: [snippet: ..., title: ..., link: ...] # Split by result boundaries import re # Find all results using regex pattern = r'\[snippet:\s*([^,]+),\s*title:\s*([^,]+),\s*link:\s*([^\]]+)\]' matches = re.findall(pattern, results_str, re.DOTALL) for snippet, title, link in matches: parsed.append({ 'snippet': snippet.strip(), 'title': title.strip(), 'url': link.strip(), 'link': link.strip() }) # If regex parsing fails, try simple parsing if not parsed and results_str: # Just create a single result with the raw text parsed.append({ 'snippet': results_str[:500], 'title': 'Search Result', 'url': '', 'link': '' }) except Exception as e: logger.error(f"Error parsing search results: {e}") return parsed async def fetch_page_content(self, search_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Fetch content from web pages using httpx Args: search_results: List of search results with URLs Returns: List of page contents with URL and text """ page_contents = [] # Create httpx client with timeout async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: # Fetch content from top results (limit to 5 to avoid timeout) for result in search_results[:5]: url = result.get('url') or result.get('link') title = result.get('title', '') snippet = result.get('snippet', '') if not url: # If no URL, just use snippet if snippet: page_contents.append({ 'url': 'N/A', 'title': title, 'content': snippet }) continue try: logger.info(f"Fetching content from: {url}") # Fetch the page response = await client.get(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) if response.status_code == 200: # Parse HTML content soup = BeautifulSoup(response.text, 'html.parser') # Remove script and style elements for script in soup(['script', 'style', 'nav', 'footer', 'header']): script.decompose() # Get text content text = soup.get_text(separator=' ', strip=True) # Clean up whitespace text = ' '.join(text.split()) # Limit to 5000 characters to avoid token limits if len(text) > 5000: text = text[:5000] + '...' page_contents.append({ 'url': url, 'title': title, 'content': text }) logger.info(f"Successfully fetched {len(text)} characters from {url}") else: logger.warning(f"Failed to fetch {url}: HTTP {response.status_code}") # Fall back to snippet if snippet: page_contents.append({ 'url': url, 'title': title, 'content': snippet }) except httpx.TimeoutException: logger.warning(f"Timeout fetching {url}, using snippet") if snippet: page_contents.append({ 'url': url, 'title': title, 'content': snippet }) except Exception as e: logger.error(f"Error fetching {url}: {e}") # Fall back to snippet if snippet: page_contents.append({ 'url': url, 'title': title, 'content': snippet }) logger.info(f"Fetched content from {len(page_contents)} sources") return page_contents def synthesize_recommendations( self, page_contents: List[Dict[str, Any]], risks: RiskData, building_type: BuildingType ) -> Recommendations: """ Synthesize recommendations from fetched content Args: page_contents: List of page contents risks: Risk assessment data building_type: Type of building Returns: Structured recommendations """ # Extract recommendations by category seismic_recs = self._extract_seismic_recommendations(page_contents, risks) volcanic_recs = self._extract_volcanic_recommendations(page_contents, risks) hydro_recs = self._extract_hydrometeorological_recommendations(page_contents, risks) # Generate general guidelines general_guidelines = self._generate_general_guidelines(building_type, risks) # Extract building code references building_codes = self._extract_building_codes(page_contents) # Generate priority actions priority_actions = self._generate_priority_actions(risks, building_type) return Recommendations( general_guidelines=general_guidelines, seismic_recommendations=seismic_recs, volcanic_recommendations=volcanic_recs, hydrometeorological_recommendations=hydro_recs, priority_actions=priority_actions, building_codes=building_codes ) def _extract_seismic_recommendations( self, page_contents: List[Dict[str, Any]], risks: RiskData ) -> List[RecommendationDetail]: """Extract seismic-related recommendations""" recommendations = [] seismic = risks.hazards.seismic if self._is_hazard_active(seismic.active_fault) or self._is_hazard_active(seismic.ground_shaking): recommendations.append(RecommendationDetail( hazard_type="Earthquake/Ground Shaking", recommendation="Use reinforced concrete or steel frame construction with proper seismic detailing. Ensure adequate lateral bracing and shear walls.", rationale="Strong ground shaking can cause structural failure in buildings without proper seismic resistance.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(seismic.liquefaction): recommendations.append(RecommendationDetail( hazard_type="Liquefaction", recommendation="Implement deep foundation systems (piles or caissons) extending below liquefiable soil layers. Consider ground improvement techniques.", rationale="Liquefaction causes soil to lose strength, leading to foundation failure and building settlement.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(seismic.tsunami): recommendations.append(RecommendationDetail( hazard_type="Tsunami", recommendation="Elevate critical building systems above expected tsunami inundation levels. Use flood-resistant materials for lower floors.", rationale="Tsunami waves can cause severe flooding and structural damage to coastal buildings.", source_url=page_contents[0]['url'] if page_contents else None )) return recommendations def _extract_volcanic_recommendations( self, page_contents: List[Dict[str, Any]], risks: RiskData ) -> List[RecommendationDetail]: """Extract volcanic-related recommendations""" recommendations = [] volcanic = risks.hazards.volcanic if self._is_hazard_active(volcanic.ashfall): recommendations.append(RecommendationDetail( hazard_type="Volcanic Ashfall", recommendation="Design roofs with steep pitch (minimum 30 degrees) to prevent ash accumulation. Use reinforced roof structures to handle additional ash load.", rationale="Volcanic ash accumulation can cause roof collapse due to excessive weight.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(volcanic.pyroclastic_flow): recommendations.append(RecommendationDetail( hazard_type="Pyroclastic Flow", recommendation="Consider relocating construction site outside pyroclastic flow hazard zones. If not possible, use reinforced concrete construction with minimal openings.", rationale="Pyroclastic flows are extremely destructive and can destroy most structures.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(volcanic.lahar): recommendations.append(RecommendationDetail( hazard_type="Lahar", recommendation="Elevate building on raised foundation or stilts. Install protective barriers and drainage systems to divert lahar flows.", rationale="Lahars (volcanic mudflows) can bury and damage structures in their path.", source_url=page_contents[0]['url'] if page_contents else None )) return recommendations def _extract_hydrometeorological_recommendations( self, page_contents: List[Dict[str, Any]], risks: RiskData ) -> List[RecommendationDetail]: """Extract hydrometeorological-related recommendations""" recommendations = [] hydro = risks.hazards.hydrometeorological if self._is_hazard_active(hydro.flood): recommendations.append(RecommendationDetail( hazard_type="Flooding", recommendation="Elevate building above expected flood levels. Use flood-resistant materials (concrete, treated wood) for lower floors. Install proper drainage systems.", rationale="Flooding can cause structural damage, foundation erosion, and interior damage.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(hydro.rain_induced_landslide): recommendations.append(RecommendationDetail( hazard_type="Landslide", recommendation="Implement slope stabilization measures including retaining walls, proper drainage, and vegetation. Avoid building on steep slopes if possible.", rationale="Landslides can destroy buildings and cause loss of life.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(hydro.storm_surge): recommendations.append(RecommendationDetail( hazard_type="Storm Surge", recommendation="Elevate building on reinforced foundation. Use impact-resistant materials. Install storm shutters for windows and openings.", rationale="Storm surge can cause severe flooding and wave impact damage to coastal structures.", source_url=page_contents[0]['url'] if page_contents else None )) if self._is_hazard_active(hydro.severe_winds): recommendations.append(RecommendationDetail( hazard_type="Severe Winds/Typhoon", recommendation="Use hurricane straps and clips to secure roof to walls. Install impact-resistant windows. Ensure proper roof-to-wall connections.", rationale="Severe winds can cause roof failure, window breakage, and structural damage.", source_url=page_contents[0]['url'] if page_contents else None )) return recommendations def _generate_general_guidelines( self, building_type: BuildingType, risks: RiskData ) -> List[str]: """Generate general construction guidelines""" guidelines = [ "Follow the National Building Code of the Philippines (NBCP) and National Structural Code of the Philippines (NSCP)", "Hire licensed engineers and architects for design and supervision", "Use quality materials that meet Philippine Standards (PS) specifications", "Ensure proper construction supervision and quality control throughout the project", "Obtain all necessary building permits and comply with local zoning regulations" ] # Add building-type specific guidelines if "residential" in building_type: guidelines.append("Consider future expansion needs and ensure structural capacity for additional floors if planned") elif "commercial" in building_type or "institutional" in building_type: guidelines.append("Ensure compliance with fire safety codes and accessibility standards for public buildings") elif "industrial" in building_type: guidelines.append("Design for specific industrial loads and vibration requirements") # Add risk-specific guidelines if risks.summary.overall_risk_level in ["CRITICAL", "HIGH"]: guidelines.append("Consider engaging a disaster risk reduction specialist for site-specific recommendations") guidelines.append("Implement redundant safety systems and emergency evacuation plans") return guidelines def _extract_building_codes( self, page_contents: List[Dict[str, Any]] ) -> List[BuildingCodeReference]: """Extract building code references from content""" codes = [ BuildingCodeReference( code_name="National Building Code of the Philippines (NBCP)", section="General", requirement="All construction must comply with NBCP provisions for structural safety, fire safety, and sanitation" ), BuildingCodeReference( code_name="National Structural Code of the Philippines (NSCP)", section="Seismic Design", requirement="Buildings must be designed to resist seismic forces based on seismic zone classification" ), BuildingCodeReference( code_name="NSCP", section="Wind Design", requirement="Structures must be designed for wind loads based on location and exposure category" ) ] return codes def _generate_priority_actions( self, risks: RiskData, building_type: BuildingType ) -> List[str]: """Generate priority actions based on risk assessment""" actions = [] # High priority actions based on risk level if risks.summary.overall_risk_level == "CRITICAL": actions.append("URGENT: Conduct detailed geotechnical investigation before construction") actions.append("URGENT: Engage structural engineer with disaster-resistant design expertise") # Priority actions based on specific hazards seismic = risks.hazards.seismic if self._is_hazard_active(seismic.active_fault): actions.append("Conduct fault line mapping and avoid building directly on active faults") if self._is_hazard_active(seismic.liquefaction): actions.append("Perform soil liquefaction assessment and implement ground improvement if needed") volcanic = risks.hazards.volcanic if self._is_hazard_active(volcanic.active_volcano): actions.append("Review volcanic hazard maps and consider evacuation routes in site planning") hydro = risks.hazards.hydrometeorological if self._is_hazard_active(hydro.flood): actions.append("Determine 100-year flood elevation and design building accordingly") if self._is_hazard_active(hydro.rain_induced_landslide): actions.append("Conduct slope stability analysis and implement stabilization measures") # General priority actions actions.append("Obtain building permit and ensure compliance with local building codes") actions.append("Implement quality assurance program during construction") return actions async def _synthesize_with_llm( self, page_contents: List[Dict[str, Any]], risks: RiskData, building_type: BuildingType, risk_types: List[str] ) -> Recommendations: """ Use LLM to synthesize construction recommendations Args: page_contents: Fetched web page contents risks: Risk assessment data building_type: Type of building risk_types: List of identified risk types Returns: Structured recommendations with LLM analysis """ try: # Get OpenAI API key openai_api_key = os.getenv('OPENAI_API_KEY') # Initialize LLM model = ChatOpenAI( model=self.model_name, api_key=openai_api_key, temperature=0.7 ) logger.info(f"Using OpenAI model: {self.model_name}") # Create context from page contents context = self._create_research_context(page_contents, risks, building_type, risk_types) # Create prompt for LLM prompt = f"""{self.system_prompt} Based on the following research and risk assessment, provide comprehensive construction recommendations: {context} Provide detailed recommendations in the following format: ## General Guidelines - List 5-7 general construction guidelines ## Seismic Recommendations For each active seismic hazard, provide: - Hazard type - Specific recommendation - Rationale ## Volcanic Recommendations For each active volcanic hazard, provide: - Hazard type - Specific recommendation - Rationale ## Hydrometeorological Recommendations For each active hydrometeorological hazard, provide: - Hazard type - Specific recommendation - Rationale ## Priority Actions - List 5-8 priority actions in order of importance ## Building Code References - List relevant Philippine building codes (NBCP, NSCP) with sections and requirements """ # Get LLM response logger.info("Invoking LLM for synthesis...") response = await model.ainvoke(prompt) # Extract content llm_output = response.content if hasattr(response, 'content') else str(response) logger.info(f"LLM synthesis completed: {len(llm_output)} characters") # Parse LLM output into structured recommendations recommendations = self._parse_llm_recommendations(llm_output, risks, building_type) # Add LLM analysis to recommendations if hasattr(recommendations, 'llm_analysis'): recommendations.llm_analysis = llm_output return recommendations except Exception as e: logger.error(f"LLM synthesis failed: {str(e)}") raise async def _stream_llm_synthesis( self, page_contents: List[Dict[str, Any]], risks: RiskData, building_type: BuildingType, risk_types: List[str] ) -> AsyncGenerator[str, None]: """ Stream LLM synthesis of construction recommendations Args: page_contents: Fetched web page contents risks: Risk assessment data building_type: Type of building risk_types: List of identified risk types Yields: Streaming recommendations from LLM """ try: # Get OpenAI API key openai_api_key = os.getenv('OPENAI_API_KEY') # Initialize LLM model = ChatOpenAI( model=self.model_name, api_key=openai_api_key, temperature=0.7 ) logger.info(f"Using OpenAI model: {self.model_name}") # Create context context = self._create_research_context(page_contents, risks, building_type, risk_types) # Create prompt prompt = f"""{self.system_prompt} Based on the following research and risk assessment, provide comprehensive construction recommendations: {context} Provide detailed, practical recommendations for disaster-resistant construction.""" # Stream LLM response logger.info("Starting LLM streaming synthesis...") async for chunk in model.astream(prompt): if hasattr(chunk, 'content') and chunk.content: yield chunk.content logger.info("Streaming synthesis completed") except Exception as e: logger.error(f"LLM streaming failed: {str(e)}") yield f"\n\nError: {str(e)}\n" def _create_research_context( self, page_contents: List[Dict[str, Any]], risks: RiskData, building_type: BuildingType, risk_types: List[str] ) -> str: """Create context for LLM from research data""" context_parts = [] # Building and location info context_parts.append(f"## Building Information") context_parts.append(f"Building Type: {building_type.replace('_', ' ').title()}") context_parts.append(f"Location: {risks.location.name}, {risks.location.administrative_area}") context_parts.append(f"Coordinates: {risks.location.coordinates.latitude}, {risks.location.coordinates.longitude}") # Risk summary context_parts.append(f"\n## Risk Assessment Summary") context_parts.append(f"Overall Risk Level: {risks.summary.overall_risk_level}") context_parts.append(f"High Risk Hazards: {risks.summary.high_risk_count}") context_parts.append(f"Moderate Risk Hazards: {risks.summary.moderate_risk_count}") if risks.summary.critical_hazards: context_parts.append(f"Critical Hazards: {', '.join(risks.summary.critical_hazards)}") # Active hazards context_parts.append(f"\n## Active Hazards") context_parts.append(f"Risk Types: {', '.join(risk_types)}") # Seismic hazards seismic = risks.hazards.seismic if self._is_hazard_active(seismic.active_fault): context_parts.append(f"\n### Seismic Hazards") context_parts.append(f"- Active Fault: {seismic.active_fault.description}") if seismic.active_fault.distance: context_parts.append(f" Distance: {seismic.active_fault.distance}") if self._is_hazard_active(seismic.ground_shaking): context_parts.append(f"- Ground Shaking: {seismic.ground_shaking.description}") if self._is_hazard_active(seismic.liquefaction): context_parts.append(f"- Liquefaction: {seismic.liquefaction.description}") # Volcanic hazards volcanic = risks.hazards.volcanic if self._is_hazard_active(volcanic.active_volcano): context_parts.append(f"\n### Volcanic Hazards") context_parts.append(f"- Active Volcano: {volcanic.active_volcano.description}") if volcanic.active_volcano.distance: context_parts.append(f" Distance: {volcanic.active_volcano.distance}") if self._is_hazard_active(volcanic.ashfall): context_parts.append(f"- Ashfall: {volcanic.ashfall.description}") # Hydrometeorological hazards hydro = risks.hazards.hydrometeorological if self._is_hazard_active(hydro.flood): context_parts.append(f"\n### Hydrometeorological Hazards") context_parts.append(f"- Flood: {hydro.flood.description}") if self._is_hazard_active(hydro.rain_induced_landslide): context_parts.append(f"- Landslide: {hydro.rain_induced_landslide.description}") if self._is_hazard_active(hydro.storm_surge): context_parts.append(f"- Storm Surge: {hydro.storm_surge.description}") if self._is_hazard_active(hydro.severe_winds): context_parts.append(f"- Severe Winds: {hydro.severe_winds.description}") # Research sources if page_contents: context_parts.append(f"\n## Research Sources") for i, content in enumerate(page_contents[:3], 1): # Limit to top 3 context_parts.append(f"\n### Source {i}: {content.get('title', 'Unknown')}") context_parts.append(f"URL: {content.get('url', 'N/A')}") # Truncate content to avoid token limits content_text = content.get('content', '') if isinstance(content_text, str): content_text = content_text[:2000] # Limit to 2000 chars per source context_parts.append(f"Content: {content_text}") return "\n".join(context_parts) def _parse_llm_recommendations( self, llm_output: str, risks: RiskData, building_type: BuildingType ) -> Recommendations: """ Parse LLM output into structured Recommendations Falls back to rule-based recommendations if parsing fails """ try: # Try to extract structured data from LLM output # This is a simple parser - could be enhanced with more sophisticated parsing general_guidelines = [] seismic_recs = [] volcanic_recs = [] hydro_recs = [] priority_actions = [] building_codes = [] # Split by sections sections = llm_output.split('##') for section in sections: section_lower = section.lower() if 'general' in section_lower and 'guideline' in section_lower: # Extract bullet points lines = section.split('\n') for line in lines: line = line.strip() if line.startswith('-') or line.startswith('•'): general_guidelines.append(line.lstrip('-•').strip()) elif 'priority' in section_lower and 'action' in section_lower: lines = section.split('\n') for line in lines: line = line.strip() if line.startswith('-') or line.startswith('•'): priority_actions.append(line.lstrip('-•').strip()) # If parsing didn't extract enough data, fall back to rule-based if len(general_guidelines) < 3: logger.warning("LLM output parsing incomplete, using rule-based fallback") return self.synthesize_recommendations([], risks, building_type) # Use rule-based for hazard-specific recommendations # (LLM output format may vary, so we use reliable rule-based approach) seismic_recs = self._extract_seismic_recommendations([], risks) volcanic_recs = self._extract_volcanic_recommendations([], risks) hydro_recs = self._extract_hydrometeorological_recommendations([], risks) building_codes = self._extract_building_codes([]) # Ensure we have priority actions if len(priority_actions) < 3: priority_actions = self._generate_priority_actions(risks, building_type) return Recommendations( general_guidelines=general_guidelines[:7], # Limit to 7 seismic_recommendations=seismic_recs, volcanic_recommendations=volcanic_recs, hydrometeorological_recommendations=hydro_recs, priority_actions=priority_actions[:8], # Limit to 8 building_codes=building_codes ) except Exception as e: logger.error(f"Failed to parse LLM recommendations: {str(e)}") # Fall back to rule-based return self.synthesize_recommendations([], risks, building_type) def _generate_fallback_recommendations( self, risks: RiskData, building_type: BuildingType ) -> Recommendations: """Generate basic fallback recommendations when all else fails""" return self.synthesize_recommendations([], risks, building_type) # Blaxel agent entry point async def main(input_data: Dict[str, Any]) -> Dict[str, Any]: """ Main entry point for Blaxel agent Args: input_data: Dictionary containing risks and building_type Returns: Dictionary containing recommendations """ agent = ResearchAgent() # Parse input risks = input_data.get('risks') building_type = input_data.get('building_type') if not risks or not building_type: return { 'success': False, 'error': 'Missing required parameters: risks and building_type' } try: recommendations = await agent.get_construction_recommendations(risks, building_type) return { 'success': True, 'recommendations': recommendations } except Exception as e: return { 'success': False, 'error': str(e) } if __name__ == "__main__": import sys import json # For local testing if len(sys.argv) > 1: input_data = json.loads(sys.argv[1]) result = asyncio.run(main(input_data)) print(json.dumps(result, indent=2))