|
|
""" |
|
|
Research Agent for Disaster Risk Construction Planner |
|
|
Gathers construction recommendations using DuckDuckGo search and web fetching with LLM analysis |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import os |
|
|
import logging |
|
|
from typing import List, Dict, Any, AsyncGenerator, Optional |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain_community.tools import DuckDuckGoSearchResults |
|
|
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper |
|
|
import httpx |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
from models import ( |
|
|
RiskData, |
|
|
BuildingType, |
|
|
Recommendations, |
|
|
RecommendationDetail, |
|
|
BuildingCodeReference, |
|
|
HazardDetail |
|
|
) |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ResearchAgent: |
|
|
"""Agentic research agent using LLM with DuckDuckGo search""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize research agent""" |
|
|
self.model_name = os.getenv('OPENAI_MODEL', 'gpt-4o-mini') |
|
|
|
|
|
|
|
|
try: |
|
|
search_wrapper = DuckDuckGoSearchAPIWrapper(max_results=5) |
|
|
self.search_tool = DuckDuckGoSearchResults(api_wrapper=search_wrapper) |
|
|
logger.info("DuckDuckGo search tool initialized") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to initialize DuckDuckGo search: {e}") |
|
|
self.search_tool = None |
|
|
|
|
|
self.system_prompt = """You are an expert construction research agent for disaster-resistant building in the Philippines. |
|
|
|
|
|
Your role is to: |
|
|
1. Search for construction guidelines and building codes using web search |
|
|
2. Analyze construction recommendations from authoritative sources |
|
|
3. Provide practical, actionable advice for construction professionals |
|
|
4. Focus on disaster-resistant construction techniques specific to Philippine hazards |
|
|
5. Reference Philippine building codes (NBCP, NSCP) and international standards |
|
|
|
|
|
When providing recommendations: |
|
|
- Prioritize hazards based on severity (CRITICAL > HIGH > MODERATE > LOW) |
|
|
- Explain technical terms in plain language |
|
|
- Provide specific construction techniques and materials |
|
|
- Include cost implications when relevant |
|
|
- Reference building codes and standards |
|
|
- Consider the specific building type requirements |
|
|
|
|
|
Always structure your response with: |
|
|
1. General Construction Guidelines |
|
|
2. Hazard-Specific Recommendations (by category) |
|
|
3. Priority Actions |
|
|
4. Building Code References |
|
|
""" |
|
|
|
|
|
async def get_agentic_recommendations( |
|
|
self, |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> Recommendations: |
|
|
""" |
|
|
Get agentic construction recommendations with LLM analysis |
|
|
|
|
|
Uses hybrid approach: |
|
|
1. Extract risk types from risk data |
|
|
2. Search for guidelines using DuckDuckGo MCP |
|
|
3. Fetch page content using Fetch MCP |
|
|
4. Use LLM to analyze and synthesize recommendations |
|
|
|
|
|
Args: |
|
|
risks: Risk assessment data |
|
|
building_type: Type of building |
|
|
|
|
|
Returns: |
|
|
Construction recommendations with LLM-enhanced analysis |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Starting agentic research for {building_type}") |
|
|
|
|
|
|
|
|
risk_types = self._extract_risk_types(risks) |
|
|
logger.info(f"Identified risk types: {', '.join(risk_types)}") |
|
|
|
|
|
|
|
|
search_results = await self.search_guidelines(risk_types, building_type) |
|
|
logger.info(f"Found {len(search_results)} search results") |
|
|
|
|
|
|
|
|
page_contents = await self.fetch_page_content(search_results) |
|
|
logger.info(f"Fetched {len(page_contents)} page contents") |
|
|
|
|
|
|
|
|
openai_api_key = os.getenv('OPENAI_API_KEY') |
|
|
|
|
|
if openai_api_key and openai_api_key != 'dummy-key-for-blaxel': |
|
|
try: |
|
|
logger.info("Using LLM for intelligent synthesis...") |
|
|
|
|
|
|
|
|
recommendations = await self._synthesize_with_llm( |
|
|
page_contents, |
|
|
risks, |
|
|
building_type, |
|
|
risk_types |
|
|
) |
|
|
|
|
|
logger.info("LLM synthesis completed successfully") |
|
|
return recommendations |
|
|
|
|
|
except Exception as llm_error: |
|
|
logger.warning(f"LLM synthesis failed: {str(llm_error)}, falling back to rule-based synthesis") |
|
|
else: |
|
|
logger.info("No OpenAI API key configured, using rule-based synthesis") |
|
|
|
|
|
|
|
|
recommendations = self.synthesize_recommendations( |
|
|
page_contents, |
|
|
risks, |
|
|
building_type |
|
|
) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Agentic research failed: {str(e)}", exc_info=True) |
|
|
|
|
|
return self._generate_fallback_recommendations(risks, building_type) |
|
|
|
|
|
async def get_construction_recommendations( |
|
|
self, |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> Recommendations: |
|
|
""" |
|
|
Main entry point for research (backwards compatible) |
|
|
|
|
|
Args: |
|
|
risks: Risk assessment data |
|
|
building_type: Type of building |
|
|
|
|
|
Returns: |
|
|
Construction recommendations |
|
|
""" |
|
|
return await self.get_agentic_recommendations(risks, building_type) |
|
|
|
|
|
async def get_streaming_recommendations( |
|
|
self, |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
Get streaming construction recommendations with LLM analysis |
|
|
|
|
|
Args: |
|
|
risks: Risk assessment data |
|
|
building_type: Type of building |
|
|
|
|
|
Yields: |
|
|
Streaming recommendations from the LLM |
|
|
""" |
|
|
try: |
|
|
yield f"Researching construction recommendations for {building_type.replace('_', ' ')}...\n\n" |
|
|
|
|
|
|
|
|
risk_types = self._extract_risk_types(risks) |
|
|
yield f"✓ Identified {len(risk_types)} risk types: {', '.join(risk_types)}\n\n" |
|
|
|
|
|
|
|
|
yield "Searching for construction guidelines...\n" |
|
|
search_results = await self.search_guidelines(risk_types, building_type) |
|
|
yield f"✓ Found {len(search_results)} relevant sources\n\n" |
|
|
|
|
|
|
|
|
yield "Fetching detailed information...\n" |
|
|
page_contents = await self.fetch_page_content(search_results) |
|
|
yield f"✓ Retrieved {len(page_contents)} documents\n\n" |
|
|
|
|
|
|
|
|
openai_api_key = os.getenv('OPENAI_API_KEY') |
|
|
|
|
|
if openai_api_key and openai_api_key != 'dummy-key-for-blaxel': |
|
|
yield "Analyzing with AI...\n\n" |
|
|
yield "=" * 60 + "\n\n" |
|
|
|
|
|
try: |
|
|
|
|
|
async for chunk in self._stream_llm_synthesis( |
|
|
page_contents, |
|
|
risks, |
|
|
building_type, |
|
|
risk_types |
|
|
): |
|
|
yield chunk |
|
|
|
|
|
yield "\n\n" + "=" * 60 + "\n" |
|
|
yield "\n✓ Research complete\n" |
|
|
|
|
|
except Exception as llm_error: |
|
|
logger.error(f"LLM streaming failed: {str(llm_error)}") |
|
|
yield f"\n\nLLM analysis failed: {str(llm_error)}\n" |
|
|
yield "Showing structured recommendations instead...\n\n" |
|
|
else: |
|
|
yield "\nNote: LLM analysis not available (no OPENAI_API_KEY configured)\n" |
|
|
yield "Showing structured recommendations:\n\n" |
|
|
yield "=" * 60 + "\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Streaming research failed: {str(e)}", exc_info=True) |
|
|
yield f"\n\nError during research: {str(e)}\n" |
|
|
|
|
|
def _extract_risk_types(self, risks: RiskData) -> List[str]: |
|
|
""" |
|
|
Extract active risk types from RiskData |
|
|
|
|
|
Args: |
|
|
risks: Risk assessment data |
|
|
|
|
|
Returns: |
|
|
List of active risk types |
|
|
""" |
|
|
risk_types = [] |
|
|
|
|
|
|
|
|
seismic = risks.hazards.seismic |
|
|
if self._is_hazard_active(seismic.active_fault): |
|
|
risk_types.append("earthquake") |
|
|
if self._is_hazard_active(seismic.liquefaction): |
|
|
risk_types.append("liquefaction") |
|
|
if self._is_hazard_active(seismic.tsunami): |
|
|
risk_types.append("tsunami") |
|
|
|
|
|
|
|
|
volcanic = risks.hazards.volcanic |
|
|
if self._is_hazard_active(volcanic.active_volcano): |
|
|
risk_types.append("volcanic") |
|
|
if self._is_hazard_active(volcanic.ashfall): |
|
|
risk_types.append("ashfall") |
|
|
|
|
|
|
|
|
hydro = risks.hazards.hydrometeorological |
|
|
if self._is_hazard_active(hydro.flood): |
|
|
risk_types.append("flood") |
|
|
if self._is_hazard_active(hydro.rain_induced_landslide): |
|
|
risk_types.append("landslide") |
|
|
if self._is_hazard_active(hydro.storm_surge): |
|
|
risk_types.append("storm surge") |
|
|
if self._is_hazard_active(hydro.severe_winds): |
|
|
risk_types.append("typhoon") |
|
|
|
|
|
return risk_types if risk_types else ["general disaster"] |
|
|
|
|
|
def _is_hazard_active(self, hazard: HazardDetail) -> bool: |
|
|
""" |
|
|
Check if a hazard is active/present |
|
|
|
|
|
Args: |
|
|
hazard: Hazard detail |
|
|
|
|
|
Returns: |
|
|
True if hazard is active |
|
|
""" |
|
|
if not hazard: |
|
|
return False |
|
|
|
|
|
status_lower = hazard.status.lower() |
|
|
return status_lower not in ["none", "not present", "no data", "n/a"] |
|
|
|
|
|
def _build_search_query(self, risk_types: List[str], building_type: BuildingType) -> str: |
|
|
""" |
|
|
Build search query for construction guidelines |
|
|
|
|
|
Args: |
|
|
risk_types: List of risk types |
|
|
building_type: Type of building |
|
|
|
|
|
Returns: |
|
|
Search query string |
|
|
""" |
|
|
building_type_str = building_type.replace("_", " ") |
|
|
risk_str = " ".join(risk_types[:2]) |
|
|
|
|
|
return f"Philippines {risk_str} resistant construction guidelines {building_type_str}" |
|
|
|
|
|
async def search_guidelines( |
|
|
self, |
|
|
risk_types: List[str], |
|
|
building_type: BuildingType |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Search for disaster-resistant construction guidelines using DuckDuckGo |
|
|
|
|
|
Args: |
|
|
risk_types: List of risk types to search for |
|
|
building_type: Type of building |
|
|
|
|
|
Returns: |
|
|
List of search results with URLs and snippets |
|
|
""" |
|
|
if not self.search_tool: |
|
|
logger.warning("DuckDuckGo search tool not available") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
all_results = [] |
|
|
building_type_str = building_type.replace("_", " ") |
|
|
|
|
|
|
|
|
for risk_type in risk_types[:3]: |
|
|
query = f"Philippines {risk_type} resistant construction guidelines {building_type_str}" |
|
|
|
|
|
try: |
|
|
logger.info(f"Searching: {query}") |
|
|
|
|
|
|
|
|
results_str = await asyncio.to_thread(self.search_tool.run, query) |
|
|
|
|
|
|
|
|
if results_str: |
|
|
|
|
|
|
|
|
parsed_results = self._parse_search_results(results_str) |
|
|
all_results.extend(parsed_results) |
|
|
logger.info(f"Found {len(parsed_results)} results for {risk_type}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error searching for {risk_type}: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
code_query = f"Philippines National Building Code {building_type_str} disaster resistant" |
|
|
logger.info(f"Searching: {code_query}") |
|
|
|
|
|
results_str = await asyncio.to_thread(self.search_tool.run, code_query) |
|
|
|
|
|
if results_str: |
|
|
parsed_results = self._parse_search_results(results_str) |
|
|
all_results.extend(parsed_results) |
|
|
logger.info(f"Found {len(parsed_results)} building code results") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error searching for building codes: {e}") |
|
|
|
|
|
return all_results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in search_guidelines: {e}") |
|
|
return [] |
|
|
|
|
|
def _parse_search_results(self, results_str: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Parse DuckDuckGo search results string into structured format |
|
|
|
|
|
Args: |
|
|
results_str: Raw search results string |
|
|
|
|
|
Returns: |
|
|
List of parsed results with title, url, snippet |
|
|
""" |
|
|
parsed = [] |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
pattern = r'\[snippet:\s*([^,]+),\s*title:\s*([^,]+),\s*link:\s*([^\]]+)\]' |
|
|
matches = re.findall(pattern, results_str, re.DOTALL) |
|
|
|
|
|
for snippet, title, link in matches: |
|
|
parsed.append({ |
|
|
'snippet': snippet.strip(), |
|
|
'title': title.strip(), |
|
|
'url': link.strip(), |
|
|
'link': link.strip() |
|
|
}) |
|
|
|
|
|
|
|
|
if not parsed and results_str: |
|
|
|
|
|
parsed.append({ |
|
|
'snippet': results_str[:500], |
|
|
'title': 'Search Result', |
|
|
'url': '', |
|
|
'link': '' |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing search results: {e}") |
|
|
|
|
|
return parsed |
|
|
|
|
|
async def fetch_page_content(self, search_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch content from web pages using httpx |
|
|
|
|
|
Args: |
|
|
search_results: List of search results with URLs |
|
|
|
|
|
Returns: |
|
|
List of page contents with URL and text |
|
|
""" |
|
|
page_contents = [] |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: |
|
|
|
|
|
for result in search_results[:5]: |
|
|
url = result.get('url') or result.get('link') |
|
|
title = result.get('title', '') |
|
|
snippet = result.get('snippet', '') |
|
|
|
|
|
if not url: |
|
|
|
|
|
if snippet: |
|
|
page_contents.append({ |
|
|
'url': 'N/A', |
|
|
'title': title, |
|
|
'content': snippet |
|
|
}) |
|
|
continue |
|
|
|
|
|
try: |
|
|
logger.info(f"Fetching content from: {url}") |
|
|
|
|
|
|
|
|
response = await client.get(url, headers={ |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
|
}) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
|
|
|
for script in soup(['script', 'style', 'nav', 'footer', 'header']): |
|
|
script.decompose() |
|
|
|
|
|
|
|
|
text = soup.get_text(separator=' ', strip=True) |
|
|
|
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
|
|
|
if len(text) > 5000: |
|
|
text = text[:5000] + '...' |
|
|
|
|
|
page_contents.append({ |
|
|
'url': url, |
|
|
'title': title, |
|
|
'content': text |
|
|
}) |
|
|
|
|
|
logger.info(f"Successfully fetched {len(text)} characters from {url}") |
|
|
else: |
|
|
logger.warning(f"Failed to fetch {url}: HTTP {response.status_code}") |
|
|
|
|
|
if snippet: |
|
|
page_contents.append({ |
|
|
'url': url, |
|
|
'title': title, |
|
|
'content': snippet |
|
|
}) |
|
|
|
|
|
except httpx.TimeoutException: |
|
|
logger.warning(f"Timeout fetching {url}, using snippet") |
|
|
if snippet: |
|
|
page_contents.append({ |
|
|
'url': url, |
|
|
'title': title, |
|
|
'content': snippet |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching {url}: {e}") |
|
|
|
|
|
if snippet: |
|
|
page_contents.append({ |
|
|
'url': url, |
|
|
'title': title, |
|
|
'content': snippet |
|
|
}) |
|
|
|
|
|
logger.info(f"Fetched content from {len(page_contents)} sources") |
|
|
return page_contents |
|
|
|
|
|
def synthesize_recommendations( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> Recommendations: |
|
|
""" |
|
|
Synthesize recommendations from fetched content |
|
|
|
|
|
Args: |
|
|
page_contents: List of page contents |
|
|
risks: Risk assessment data |
|
|
building_type: Type of building |
|
|
|
|
|
Returns: |
|
|
Structured recommendations |
|
|
""" |
|
|
|
|
|
seismic_recs = self._extract_seismic_recommendations(page_contents, risks) |
|
|
volcanic_recs = self._extract_volcanic_recommendations(page_contents, risks) |
|
|
hydro_recs = self._extract_hydrometeorological_recommendations(page_contents, risks) |
|
|
|
|
|
|
|
|
general_guidelines = self._generate_general_guidelines(building_type, risks) |
|
|
|
|
|
|
|
|
building_codes = self._extract_building_codes(page_contents) |
|
|
|
|
|
|
|
|
priority_actions = self._generate_priority_actions(risks, building_type) |
|
|
|
|
|
return Recommendations( |
|
|
general_guidelines=general_guidelines, |
|
|
seismic_recommendations=seismic_recs, |
|
|
volcanic_recommendations=volcanic_recs, |
|
|
hydrometeorological_recommendations=hydro_recs, |
|
|
priority_actions=priority_actions, |
|
|
building_codes=building_codes |
|
|
) |
|
|
|
|
|
def _extract_seismic_recommendations( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData |
|
|
) -> List[RecommendationDetail]: |
|
|
"""Extract seismic-related recommendations""" |
|
|
recommendations = [] |
|
|
|
|
|
seismic = risks.hazards.seismic |
|
|
|
|
|
if self._is_hazard_active(seismic.active_fault) or self._is_hazard_active(seismic.ground_shaking): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Earthquake/Ground Shaking", |
|
|
recommendation="Use reinforced concrete or steel frame construction with proper seismic detailing. Ensure adequate lateral bracing and shear walls.", |
|
|
rationale="Strong ground shaking can cause structural failure in buildings without proper seismic resistance.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(seismic.liquefaction): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Liquefaction", |
|
|
recommendation="Implement deep foundation systems (piles or caissons) extending below liquefiable soil layers. Consider ground improvement techniques.", |
|
|
rationale="Liquefaction causes soil to lose strength, leading to foundation failure and building settlement.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(seismic.tsunami): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Tsunami", |
|
|
recommendation="Elevate critical building systems above expected tsunami inundation levels. Use flood-resistant materials for lower floors.", |
|
|
rationale="Tsunami waves can cause severe flooding and structural damage to coastal buildings.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
def _extract_volcanic_recommendations( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData |
|
|
) -> List[RecommendationDetail]: |
|
|
"""Extract volcanic-related recommendations""" |
|
|
recommendations = [] |
|
|
|
|
|
volcanic = risks.hazards.volcanic |
|
|
|
|
|
if self._is_hazard_active(volcanic.ashfall): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Volcanic Ashfall", |
|
|
recommendation="Design roofs with steep pitch (minimum 30 degrees) to prevent ash accumulation. Use reinforced roof structures to handle additional ash load.", |
|
|
rationale="Volcanic ash accumulation can cause roof collapse due to excessive weight.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(volcanic.pyroclastic_flow): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Pyroclastic Flow", |
|
|
recommendation="Consider relocating construction site outside pyroclastic flow hazard zones. If not possible, use reinforced concrete construction with minimal openings.", |
|
|
rationale="Pyroclastic flows are extremely destructive and can destroy most structures.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(volcanic.lahar): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Lahar", |
|
|
recommendation="Elevate building on raised foundation or stilts. Install protective barriers and drainage systems to divert lahar flows.", |
|
|
rationale="Lahars (volcanic mudflows) can bury and damage structures in their path.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
def _extract_hydrometeorological_recommendations( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData |
|
|
) -> List[RecommendationDetail]: |
|
|
"""Extract hydrometeorological-related recommendations""" |
|
|
recommendations = [] |
|
|
|
|
|
hydro = risks.hazards.hydrometeorological |
|
|
|
|
|
if self._is_hazard_active(hydro.flood): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Flooding", |
|
|
recommendation="Elevate building above expected flood levels. Use flood-resistant materials (concrete, treated wood) for lower floors. Install proper drainage systems.", |
|
|
rationale="Flooding can cause structural damage, foundation erosion, and interior damage.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(hydro.rain_induced_landslide): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Landslide", |
|
|
recommendation="Implement slope stabilization measures including retaining walls, proper drainage, and vegetation. Avoid building on steep slopes if possible.", |
|
|
rationale="Landslides can destroy buildings and cause loss of life.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(hydro.storm_surge): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Storm Surge", |
|
|
recommendation="Elevate building on reinforced foundation. Use impact-resistant materials. Install storm shutters for windows and openings.", |
|
|
rationale="Storm surge can cause severe flooding and wave impact damage to coastal structures.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
if self._is_hazard_active(hydro.severe_winds): |
|
|
recommendations.append(RecommendationDetail( |
|
|
hazard_type="Severe Winds/Typhoon", |
|
|
recommendation="Use hurricane straps and clips to secure roof to walls. Install impact-resistant windows. Ensure proper roof-to-wall connections.", |
|
|
rationale="Severe winds can cause roof failure, window breakage, and structural damage.", |
|
|
source_url=page_contents[0]['url'] if page_contents else None |
|
|
)) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
def _generate_general_guidelines( |
|
|
self, |
|
|
building_type: BuildingType, |
|
|
risks: RiskData |
|
|
) -> List[str]: |
|
|
"""Generate general construction guidelines""" |
|
|
guidelines = [ |
|
|
"Follow the National Building Code of the Philippines (NBCP) and National Structural Code of the Philippines (NSCP)", |
|
|
"Hire licensed engineers and architects for design and supervision", |
|
|
"Use quality materials that meet Philippine Standards (PS) specifications", |
|
|
"Ensure proper construction supervision and quality control throughout the project", |
|
|
"Obtain all necessary building permits and comply with local zoning regulations" |
|
|
] |
|
|
|
|
|
|
|
|
if "residential" in building_type: |
|
|
guidelines.append("Consider future expansion needs and ensure structural capacity for additional floors if planned") |
|
|
elif "commercial" in building_type or "institutional" in building_type: |
|
|
guidelines.append("Ensure compliance with fire safety codes and accessibility standards for public buildings") |
|
|
elif "industrial" in building_type: |
|
|
guidelines.append("Design for specific industrial loads and vibration requirements") |
|
|
|
|
|
|
|
|
if risks.summary.overall_risk_level in ["CRITICAL", "HIGH"]: |
|
|
guidelines.append("Consider engaging a disaster risk reduction specialist for site-specific recommendations") |
|
|
guidelines.append("Implement redundant safety systems and emergency evacuation plans") |
|
|
|
|
|
return guidelines |
|
|
|
|
|
def _extract_building_codes( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]] |
|
|
) -> List[BuildingCodeReference]: |
|
|
"""Extract building code references from content""" |
|
|
codes = [ |
|
|
BuildingCodeReference( |
|
|
code_name="National Building Code of the Philippines (NBCP)", |
|
|
section="General", |
|
|
requirement="All construction must comply with NBCP provisions for structural safety, fire safety, and sanitation" |
|
|
), |
|
|
BuildingCodeReference( |
|
|
code_name="National Structural Code of the Philippines (NSCP)", |
|
|
section="Seismic Design", |
|
|
requirement="Buildings must be designed to resist seismic forces based on seismic zone classification" |
|
|
), |
|
|
BuildingCodeReference( |
|
|
code_name="NSCP", |
|
|
section="Wind Design", |
|
|
requirement="Structures must be designed for wind loads based on location and exposure category" |
|
|
) |
|
|
] |
|
|
|
|
|
return codes |
|
|
|
|
|
def _generate_priority_actions( |
|
|
self, |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> List[str]: |
|
|
"""Generate priority actions based on risk assessment""" |
|
|
actions = [] |
|
|
|
|
|
|
|
|
if risks.summary.overall_risk_level == "CRITICAL": |
|
|
actions.append("URGENT: Conduct detailed geotechnical investigation before construction") |
|
|
actions.append("URGENT: Engage structural engineer with disaster-resistant design expertise") |
|
|
|
|
|
|
|
|
seismic = risks.hazards.seismic |
|
|
if self._is_hazard_active(seismic.active_fault): |
|
|
actions.append("Conduct fault line mapping and avoid building directly on active faults") |
|
|
|
|
|
if self._is_hazard_active(seismic.liquefaction): |
|
|
actions.append("Perform soil liquefaction assessment and implement ground improvement if needed") |
|
|
|
|
|
volcanic = risks.hazards.volcanic |
|
|
if self._is_hazard_active(volcanic.active_volcano): |
|
|
actions.append("Review volcanic hazard maps and consider evacuation routes in site planning") |
|
|
|
|
|
hydro = risks.hazards.hydrometeorological |
|
|
if self._is_hazard_active(hydro.flood): |
|
|
actions.append("Determine 100-year flood elevation and design building accordingly") |
|
|
|
|
|
if self._is_hazard_active(hydro.rain_induced_landslide): |
|
|
actions.append("Conduct slope stability analysis and implement stabilization measures") |
|
|
|
|
|
|
|
|
actions.append("Obtain building permit and ensure compliance with local building codes") |
|
|
actions.append("Implement quality assurance program during construction") |
|
|
|
|
|
return actions |
|
|
|
|
|
async def _synthesize_with_llm( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData, |
|
|
building_type: BuildingType, |
|
|
risk_types: List[str] |
|
|
) -> Recommendations: |
|
|
""" |
|
|
Use LLM to synthesize construction recommendations |
|
|
|
|
|
Args: |
|
|
page_contents: Fetched web page contents |
|
|
risks: Risk assessment data |
|
|
building_type: Type of building |
|
|
risk_types: List of identified risk types |
|
|
|
|
|
Returns: |
|
|
Structured recommendations with LLM analysis |
|
|
""" |
|
|
try: |
|
|
|
|
|
openai_api_key = os.getenv('OPENAI_API_KEY') |
|
|
|
|
|
|
|
|
model = ChatOpenAI( |
|
|
model=self.model_name, |
|
|
api_key=openai_api_key, |
|
|
temperature=0.7 |
|
|
) |
|
|
logger.info(f"Using OpenAI model: {self.model_name}") |
|
|
|
|
|
|
|
|
context = self._create_research_context(page_contents, risks, building_type, risk_types) |
|
|
|
|
|
|
|
|
prompt = f"""{self.system_prompt} |
|
|
|
|
|
Based on the following research and risk assessment, provide comprehensive construction recommendations: |
|
|
|
|
|
{context} |
|
|
|
|
|
Provide detailed recommendations in the following format: |
|
|
|
|
|
## General Guidelines |
|
|
- List 5-7 general construction guidelines |
|
|
|
|
|
## Seismic Recommendations |
|
|
For each active seismic hazard, provide: |
|
|
- Hazard type |
|
|
- Specific recommendation |
|
|
- Rationale |
|
|
|
|
|
## Volcanic Recommendations |
|
|
For each active volcanic hazard, provide: |
|
|
- Hazard type |
|
|
- Specific recommendation |
|
|
- Rationale |
|
|
|
|
|
## Hydrometeorological Recommendations |
|
|
For each active hydrometeorological hazard, provide: |
|
|
- Hazard type |
|
|
- Specific recommendation |
|
|
- Rationale |
|
|
|
|
|
## Priority Actions |
|
|
- List 5-8 priority actions in order of importance |
|
|
|
|
|
## Building Code References |
|
|
- List relevant Philippine building codes (NBCP, NSCP) with sections and requirements |
|
|
""" |
|
|
|
|
|
|
|
|
logger.info("Invoking LLM for synthesis...") |
|
|
response = await model.ainvoke(prompt) |
|
|
|
|
|
|
|
|
llm_output = response.content if hasattr(response, 'content') else str(response) |
|
|
logger.info(f"LLM synthesis completed: {len(llm_output)} characters") |
|
|
|
|
|
|
|
|
recommendations = self._parse_llm_recommendations(llm_output, risks, building_type) |
|
|
|
|
|
|
|
|
if hasattr(recommendations, 'llm_analysis'): |
|
|
recommendations.llm_analysis = llm_output |
|
|
|
|
|
return recommendations |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"LLM synthesis failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
async def _stream_llm_synthesis( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData, |
|
|
building_type: BuildingType, |
|
|
risk_types: List[str] |
|
|
) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
Stream LLM synthesis of construction recommendations |
|
|
|
|
|
Args: |
|
|
page_contents: Fetched web page contents |
|
|
risks: Risk assessment data |
|
|
building_type: Type of building |
|
|
risk_types: List of identified risk types |
|
|
|
|
|
Yields: |
|
|
Streaming recommendations from LLM |
|
|
""" |
|
|
try: |
|
|
|
|
|
openai_api_key = os.getenv('OPENAI_API_KEY') |
|
|
|
|
|
|
|
|
model = ChatOpenAI( |
|
|
model=self.model_name, |
|
|
api_key=openai_api_key, |
|
|
temperature=0.7 |
|
|
) |
|
|
logger.info(f"Using OpenAI model: {self.model_name}") |
|
|
|
|
|
|
|
|
context = self._create_research_context(page_contents, risks, building_type, risk_types) |
|
|
|
|
|
|
|
|
prompt = f"""{self.system_prompt} |
|
|
|
|
|
Based on the following research and risk assessment, provide comprehensive construction recommendations: |
|
|
|
|
|
{context} |
|
|
|
|
|
Provide detailed, practical recommendations for disaster-resistant construction.""" |
|
|
|
|
|
|
|
|
logger.info("Starting LLM streaming synthesis...") |
|
|
|
|
|
async for chunk in model.astream(prompt): |
|
|
if hasattr(chunk, 'content') and chunk.content: |
|
|
yield chunk.content |
|
|
|
|
|
logger.info("Streaming synthesis completed") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"LLM streaming failed: {str(e)}") |
|
|
yield f"\n\nError: {str(e)}\n" |
|
|
|
|
|
def _create_research_context( |
|
|
self, |
|
|
page_contents: List[Dict[str, Any]], |
|
|
risks: RiskData, |
|
|
building_type: BuildingType, |
|
|
risk_types: List[str] |
|
|
) -> str: |
|
|
"""Create context for LLM from research data""" |
|
|
context_parts = [] |
|
|
|
|
|
|
|
|
context_parts.append(f"## Building Information") |
|
|
context_parts.append(f"Building Type: {building_type.replace('_', ' ').title()}") |
|
|
context_parts.append(f"Location: {risks.location.name}, {risks.location.administrative_area}") |
|
|
context_parts.append(f"Coordinates: {risks.location.coordinates.latitude}, {risks.location.coordinates.longitude}") |
|
|
|
|
|
|
|
|
context_parts.append(f"\n## Risk Assessment Summary") |
|
|
context_parts.append(f"Overall Risk Level: {risks.summary.overall_risk_level}") |
|
|
context_parts.append(f"High Risk Hazards: {risks.summary.high_risk_count}") |
|
|
context_parts.append(f"Moderate Risk Hazards: {risks.summary.moderate_risk_count}") |
|
|
if risks.summary.critical_hazards: |
|
|
context_parts.append(f"Critical Hazards: {', '.join(risks.summary.critical_hazards)}") |
|
|
|
|
|
|
|
|
context_parts.append(f"\n## Active Hazards") |
|
|
context_parts.append(f"Risk Types: {', '.join(risk_types)}") |
|
|
|
|
|
|
|
|
seismic = risks.hazards.seismic |
|
|
if self._is_hazard_active(seismic.active_fault): |
|
|
context_parts.append(f"\n### Seismic Hazards") |
|
|
context_parts.append(f"- Active Fault: {seismic.active_fault.description}") |
|
|
if seismic.active_fault.distance: |
|
|
context_parts.append(f" Distance: {seismic.active_fault.distance}") |
|
|
if self._is_hazard_active(seismic.ground_shaking): |
|
|
context_parts.append(f"- Ground Shaking: {seismic.ground_shaking.description}") |
|
|
if self._is_hazard_active(seismic.liquefaction): |
|
|
context_parts.append(f"- Liquefaction: {seismic.liquefaction.description}") |
|
|
|
|
|
|
|
|
volcanic = risks.hazards.volcanic |
|
|
if self._is_hazard_active(volcanic.active_volcano): |
|
|
context_parts.append(f"\n### Volcanic Hazards") |
|
|
context_parts.append(f"- Active Volcano: {volcanic.active_volcano.description}") |
|
|
if volcanic.active_volcano.distance: |
|
|
context_parts.append(f" Distance: {volcanic.active_volcano.distance}") |
|
|
if self._is_hazard_active(volcanic.ashfall): |
|
|
context_parts.append(f"- Ashfall: {volcanic.ashfall.description}") |
|
|
|
|
|
|
|
|
hydro = risks.hazards.hydrometeorological |
|
|
if self._is_hazard_active(hydro.flood): |
|
|
context_parts.append(f"\n### Hydrometeorological Hazards") |
|
|
context_parts.append(f"- Flood: {hydro.flood.description}") |
|
|
if self._is_hazard_active(hydro.rain_induced_landslide): |
|
|
context_parts.append(f"- Landslide: {hydro.rain_induced_landslide.description}") |
|
|
if self._is_hazard_active(hydro.storm_surge): |
|
|
context_parts.append(f"- Storm Surge: {hydro.storm_surge.description}") |
|
|
if self._is_hazard_active(hydro.severe_winds): |
|
|
context_parts.append(f"- Severe Winds: {hydro.severe_winds.description}") |
|
|
|
|
|
|
|
|
if page_contents: |
|
|
context_parts.append(f"\n## Research Sources") |
|
|
for i, content in enumerate(page_contents[:3], 1): |
|
|
context_parts.append(f"\n### Source {i}: {content.get('title', 'Unknown')}") |
|
|
context_parts.append(f"URL: {content.get('url', 'N/A')}") |
|
|
|
|
|
content_text = content.get('content', '') |
|
|
if isinstance(content_text, str): |
|
|
content_text = content_text[:2000] |
|
|
context_parts.append(f"Content: {content_text}") |
|
|
|
|
|
return "\n".join(context_parts) |
|
|
|
|
|
def _parse_llm_recommendations( |
|
|
self, |
|
|
llm_output: str, |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> Recommendations: |
|
|
""" |
|
|
Parse LLM output into structured Recommendations |
|
|
|
|
|
Falls back to rule-based recommendations if parsing fails |
|
|
""" |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
general_guidelines = [] |
|
|
seismic_recs = [] |
|
|
volcanic_recs = [] |
|
|
hydro_recs = [] |
|
|
priority_actions = [] |
|
|
building_codes = [] |
|
|
|
|
|
|
|
|
sections = llm_output.split('##') |
|
|
|
|
|
for section in sections: |
|
|
section_lower = section.lower() |
|
|
|
|
|
if 'general' in section_lower and 'guideline' in section_lower: |
|
|
|
|
|
lines = section.split('\n') |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if line.startswith('-') or line.startswith('•'): |
|
|
general_guidelines.append(line.lstrip('-•').strip()) |
|
|
|
|
|
elif 'priority' in section_lower and 'action' in section_lower: |
|
|
lines = section.split('\n') |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if line.startswith('-') or line.startswith('•'): |
|
|
priority_actions.append(line.lstrip('-•').strip()) |
|
|
|
|
|
|
|
|
if len(general_guidelines) < 3: |
|
|
logger.warning("LLM output parsing incomplete, using rule-based fallback") |
|
|
return self.synthesize_recommendations([], risks, building_type) |
|
|
|
|
|
|
|
|
|
|
|
seismic_recs = self._extract_seismic_recommendations([], risks) |
|
|
volcanic_recs = self._extract_volcanic_recommendations([], risks) |
|
|
hydro_recs = self._extract_hydrometeorological_recommendations([], risks) |
|
|
building_codes = self._extract_building_codes([]) |
|
|
|
|
|
|
|
|
if len(priority_actions) < 3: |
|
|
priority_actions = self._generate_priority_actions(risks, building_type) |
|
|
|
|
|
return Recommendations( |
|
|
general_guidelines=general_guidelines[:7], |
|
|
seismic_recommendations=seismic_recs, |
|
|
volcanic_recommendations=volcanic_recs, |
|
|
hydrometeorological_recommendations=hydro_recs, |
|
|
priority_actions=priority_actions[:8], |
|
|
building_codes=building_codes |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to parse LLM recommendations: {str(e)}") |
|
|
|
|
|
return self.synthesize_recommendations([], risks, building_type) |
|
|
|
|
|
def _generate_fallback_recommendations( |
|
|
self, |
|
|
risks: RiskData, |
|
|
building_type: BuildingType |
|
|
) -> Recommendations: |
|
|
"""Generate basic fallback recommendations when all else fails""" |
|
|
return self.synthesize_recommendations([], risks, building_type) |
|
|
|
|
|
|
|
|
|
|
|
async def main(input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Main entry point for Blaxel agent |
|
|
|
|
|
Args: |
|
|
input_data: Dictionary containing risks and building_type |
|
|
|
|
|
Returns: |
|
|
Dictionary containing recommendations |
|
|
""" |
|
|
agent = ResearchAgent() |
|
|
|
|
|
|
|
|
risks = input_data.get('risks') |
|
|
building_type = input_data.get('building_type') |
|
|
|
|
|
if not risks or not building_type: |
|
|
return { |
|
|
'success': False, |
|
|
'error': 'Missing required parameters: risks and building_type' |
|
|
} |
|
|
|
|
|
try: |
|
|
recommendations = await agent.get_construction_recommendations(risks, building_type) |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'recommendations': recommendations |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import sys |
|
|
import json |
|
|
|
|
|
|
|
|
if len(sys.argv) > 1: |
|
|
input_data = json.loads(sys.argv[1]) |
|
|
result = asyncio.run(main(input_data)) |
|
|
print(json.dumps(result, indent=2)) |
|
|
|