Dexter Edep
Adjust research agent
30ea2d5
"""
Research Agent for Disaster Risk Construction Planner
Gathers construction recommendations using DuckDuckGo search and web fetching with LLM analysis
"""
import asyncio
import os
import logging
from typing import List, Dict, Any, AsyncGenerator, Optional
from langchain_openai import ChatOpenAI
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
import httpx
from bs4 import BeautifulSoup
from models import (
RiskData,
BuildingType,
Recommendations,
RecommendationDetail,
BuildingCodeReference,
HazardDetail
)
# Configure logging
logger = logging.getLogger(__name__)
class ResearchAgent:
"""Agentic research agent using LLM with DuckDuckGo search"""
def __init__(self):
"""Initialize research agent"""
self.model_name = os.getenv('OPENAI_MODEL', 'gpt-4o-mini')
# Initialize DuckDuckGo search tool
try:
search_wrapper = DuckDuckGoSearchAPIWrapper(max_results=5)
self.search_tool = DuckDuckGoSearchResults(api_wrapper=search_wrapper)
logger.info("DuckDuckGo search tool initialized")
except Exception as e:
logger.warning(f"Failed to initialize DuckDuckGo search: {e}")
self.search_tool = None
self.system_prompt = """You are an expert construction research agent for disaster-resistant building in the Philippines.
Your role is to:
1. Search for construction guidelines and building codes using web search
2. Analyze construction recommendations from authoritative sources
3. Provide practical, actionable advice for construction professionals
4. Focus on disaster-resistant construction techniques specific to Philippine hazards
5. Reference Philippine building codes (NBCP, NSCP) and international standards
When providing recommendations:
- Prioritize hazards based on severity (CRITICAL > HIGH > MODERATE > LOW)
- Explain technical terms in plain language
- Provide specific construction techniques and materials
- Include cost implications when relevant
- Reference building codes and standards
- Consider the specific building type requirements
Always structure your response with:
1. General Construction Guidelines
2. Hazard-Specific Recommendations (by category)
3. Priority Actions
4. Building Code References
"""
async def get_agentic_recommendations(
self,
risks: RiskData,
building_type: BuildingType
) -> Recommendations:
"""
Get agentic construction recommendations with LLM analysis
Uses hybrid approach:
1. Extract risk types from risk data
2. Search for guidelines using DuckDuckGo MCP
3. Fetch page content using Fetch MCP
4. Use LLM to analyze and synthesize recommendations
Args:
risks: Risk assessment data
building_type: Type of building
Returns:
Construction recommendations with LLM-enhanced analysis
"""
try:
logger.info(f"Starting agentic research for {building_type}")
# Extract risk types from RiskData
risk_types = self._extract_risk_types(risks)
logger.info(f"Identified risk types: {', '.join(risk_types)}")
# Search for guidelines
search_results = await self.search_guidelines(risk_types, building_type)
logger.info(f"Found {len(search_results)} search results")
# Fetch page content from top results
page_contents = await self.fetch_page_content(search_results)
logger.info(f"Fetched {len(page_contents)} page contents")
# Check if LLM is available
openai_api_key = os.getenv('OPENAI_API_KEY')
if openai_api_key and openai_api_key != 'dummy-key-for-blaxel':
try:
logger.info("Using LLM for intelligent synthesis...")
# Use LLM to synthesize recommendations
recommendations = await self._synthesize_with_llm(
page_contents,
risks,
building_type,
risk_types
)
logger.info("LLM synthesis completed successfully")
return recommendations
except Exception as llm_error:
logger.warning(f"LLM synthesis failed: {str(llm_error)}, falling back to rule-based synthesis")
else:
logger.info("No OpenAI API key configured, using rule-based synthesis")
# Fall back to rule-based synthesis
recommendations = self.synthesize_recommendations(
page_contents,
risks,
building_type
)
return recommendations
except Exception as e:
logger.error(f"Agentic research failed: {str(e)}", exc_info=True)
# Fall back to basic recommendations
return self._generate_fallback_recommendations(risks, building_type)
async def get_construction_recommendations(
self,
risks: RiskData,
building_type: BuildingType
) -> Recommendations:
"""
Main entry point for research (backwards compatible)
Args:
risks: Risk assessment data
building_type: Type of building
Returns:
Construction recommendations
"""
return await self.get_agentic_recommendations(risks, building_type)
async def get_streaming_recommendations(
self,
risks: RiskData,
building_type: BuildingType
) -> AsyncGenerator[str, None]:
"""
Get streaming construction recommendations with LLM analysis
Args:
risks: Risk assessment data
building_type: Type of building
Yields:
Streaming recommendations from the LLM
"""
try:
yield f"Researching construction recommendations for {building_type.replace('_', ' ')}...\n\n"
# Extract risk types
risk_types = self._extract_risk_types(risks)
yield f"✓ Identified {len(risk_types)} risk types: {', '.join(risk_types)}\n\n"
# Search for guidelines
yield "Searching for construction guidelines...\n"
search_results = await self.search_guidelines(risk_types, building_type)
yield f"✓ Found {len(search_results)} relevant sources\n\n"
# Fetch page content
yield "Fetching detailed information...\n"
page_contents = await self.fetch_page_content(search_results)
yield f"✓ Retrieved {len(page_contents)} documents\n\n"
# Check if LLM is available
openai_api_key = os.getenv('OPENAI_API_KEY')
if openai_api_key and openai_api_key != 'dummy-key-for-blaxel':
yield "Analyzing with AI...\n\n"
yield "=" * 60 + "\n\n"
try:
# Stream LLM analysis
async for chunk in self._stream_llm_synthesis(
page_contents,
risks,
building_type,
risk_types
):
yield chunk
yield "\n\n" + "=" * 60 + "\n"
yield "\n✓ Research complete\n"
except Exception as llm_error:
logger.error(f"LLM streaming failed: {str(llm_error)}")
yield f"\n\nLLM analysis failed: {str(llm_error)}\n"
yield "Showing structured recommendations instead...\n\n"
else:
yield "\nNote: LLM analysis not available (no OPENAI_API_KEY configured)\n"
yield "Showing structured recommendations:\n\n"
yield "=" * 60 + "\n\n"
except Exception as e:
logger.error(f"Streaming research failed: {str(e)}", exc_info=True)
yield f"\n\nError during research: {str(e)}\n"
def _extract_risk_types(self, risks: RiskData) -> List[str]:
"""
Extract active risk types from RiskData
Args:
risks: Risk assessment data
Returns:
List of active risk types
"""
risk_types = []
# Check seismic hazards
seismic = risks.hazards.seismic
if self._is_hazard_active(seismic.active_fault):
risk_types.append("earthquake")
if self._is_hazard_active(seismic.liquefaction):
risk_types.append("liquefaction")
if self._is_hazard_active(seismic.tsunami):
risk_types.append("tsunami")
# Check volcanic hazards
volcanic = risks.hazards.volcanic
if self._is_hazard_active(volcanic.active_volcano):
risk_types.append("volcanic")
if self._is_hazard_active(volcanic.ashfall):
risk_types.append("ashfall")
# Check hydrometeorological hazards
hydro = risks.hazards.hydrometeorological
if self._is_hazard_active(hydro.flood):
risk_types.append("flood")
if self._is_hazard_active(hydro.rain_induced_landslide):
risk_types.append("landslide")
if self._is_hazard_active(hydro.storm_surge):
risk_types.append("storm surge")
if self._is_hazard_active(hydro.severe_winds):
risk_types.append("typhoon")
return risk_types if risk_types else ["general disaster"]
def _is_hazard_active(self, hazard: HazardDetail) -> bool:
"""
Check if a hazard is active/present
Args:
hazard: Hazard detail
Returns:
True if hazard is active
"""
if not hazard:
return False
status_lower = hazard.status.lower()
return status_lower not in ["none", "not present", "no data", "n/a"]
def _build_search_query(self, risk_types: List[str], building_type: BuildingType) -> str:
"""
Build search query for construction guidelines
Args:
risk_types: List of risk types
building_type: Type of building
Returns:
Search query string
"""
building_type_str = building_type.replace("_", " ")
risk_str = " ".join(risk_types[:2]) # Use top 2 risk types
return f"Philippines {risk_str} resistant construction guidelines {building_type_str}"
async def search_guidelines(
self,
risk_types: List[str],
building_type: BuildingType
) -> List[Dict[str, Any]]:
"""
Search for disaster-resistant construction guidelines using DuckDuckGo
Args:
risk_types: List of risk types to search for
building_type: Type of building
Returns:
List of search results with URLs and snippets
"""
if not self.search_tool:
logger.warning("DuckDuckGo search tool not available")
return []
try:
all_results = []
building_type_str = building_type.replace("_", " ")
# Build search queries for each risk type
for risk_type in risk_types[:3]: # Limit to top 3 risk types
query = f"Philippines {risk_type} resistant construction guidelines {building_type_str}"
try:
logger.info(f"Searching: {query}")
# Use the search tool synchronously (it's not async)
results_str = await asyncio.to_thread(self.search_tool.run, query)
# Parse results - DuckDuckGo returns a string with results
if results_str:
# Results are in format: [snippet: ..., title: ..., link: ...]
# Parse into structured format
parsed_results = self._parse_search_results(results_str)
all_results.extend(parsed_results)
logger.info(f"Found {len(parsed_results)} results for {risk_type}")
except Exception as e:
logger.error(f"Error searching for {risk_type}: {e}")
# Add general Philippines building code search
try:
code_query = f"Philippines National Building Code {building_type_str} disaster resistant"
logger.info(f"Searching: {code_query}")
results_str = await asyncio.to_thread(self.search_tool.run, code_query)
if results_str:
parsed_results = self._parse_search_results(results_str)
all_results.extend(parsed_results)
logger.info(f"Found {len(parsed_results)} building code results")
except Exception as e:
logger.error(f"Error searching for building codes: {e}")
return all_results
except Exception as e:
logger.error(f"Error in search_guidelines: {e}")
return []
def _parse_search_results(self, results_str: str) -> List[Dict[str, Any]]:
"""
Parse DuckDuckGo search results string into structured format
Args:
results_str: Raw search results string
Returns:
List of parsed results with title, url, snippet
"""
parsed = []
try:
# Results are in format: [snippet: ..., title: ..., link: ...]
# Split by result boundaries
import re
# Find all results using regex
pattern = r'\[snippet:\s*([^,]+),\s*title:\s*([^,]+),\s*link:\s*([^\]]+)\]'
matches = re.findall(pattern, results_str, re.DOTALL)
for snippet, title, link in matches:
parsed.append({
'snippet': snippet.strip(),
'title': title.strip(),
'url': link.strip(),
'link': link.strip()
})
# If regex parsing fails, try simple parsing
if not parsed and results_str:
# Just create a single result with the raw text
parsed.append({
'snippet': results_str[:500],
'title': 'Search Result',
'url': '',
'link': ''
})
except Exception as e:
logger.error(f"Error parsing search results: {e}")
return parsed
async def fetch_page_content(self, search_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Fetch content from web pages using httpx
Args:
search_results: List of search results with URLs
Returns:
List of page contents with URL and text
"""
page_contents = []
# Create httpx client with timeout
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
# Fetch content from top results (limit to 5 to avoid timeout)
for result in search_results[:5]:
url = result.get('url') or result.get('link')
title = result.get('title', '')
snippet = result.get('snippet', '')
if not url:
# If no URL, just use snippet
if snippet:
page_contents.append({
'url': 'N/A',
'title': title,
'content': snippet
})
continue
try:
logger.info(f"Fetching content from: {url}")
# Fetch the page
response = await client.get(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
if response.status_code == 200:
# Parse HTML content
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(['script', 'style', 'nav', 'footer', 'header']):
script.decompose()
# Get text content
text = soup.get_text(separator=' ', strip=True)
# Clean up whitespace
text = ' '.join(text.split())
# Limit to 5000 characters to avoid token limits
if len(text) > 5000:
text = text[:5000] + '...'
page_contents.append({
'url': url,
'title': title,
'content': text
})
logger.info(f"Successfully fetched {len(text)} characters from {url}")
else:
logger.warning(f"Failed to fetch {url}: HTTP {response.status_code}")
# Fall back to snippet
if snippet:
page_contents.append({
'url': url,
'title': title,
'content': snippet
})
except httpx.TimeoutException:
logger.warning(f"Timeout fetching {url}, using snippet")
if snippet:
page_contents.append({
'url': url,
'title': title,
'content': snippet
})
except Exception as e:
logger.error(f"Error fetching {url}: {e}")
# Fall back to snippet
if snippet:
page_contents.append({
'url': url,
'title': title,
'content': snippet
})
logger.info(f"Fetched content from {len(page_contents)} sources")
return page_contents
def synthesize_recommendations(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData,
building_type: BuildingType
) -> Recommendations:
"""
Synthesize recommendations from fetched content
Args:
page_contents: List of page contents
risks: Risk assessment data
building_type: Type of building
Returns:
Structured recommendations
"""
# Extract recommendations by category
seismic_recs = self._extract_seismic_recommendations(page_contents, risks)
volcanic_recs = self._extract_volcanic_recommendations(page_contents, risks)
hydro_recs = self._extract_hydrometeorological_recommendations(page_contents, risks)
# Generate general guidelines
general_guidelines = self._generate_general_guidelines(building_type, risks)
# Extract building code references
building_codes = self._extract_building_codes(page_contents)
# Generate priority actions
priority_actions = self._generate_priority_actions(risks, building_type)
return Recommendations(
general_guidelines=general_guidelines,
seismic_recommendations=seismic_recs,
volcanic_recommendations=volcanic_recs,
hydrometeorological_recommendations=hydro_recs,
priority_actions=priority_actions,
building_codes=building_codes
)
def _extract_seismic_recommendations(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData
) -> List[RecommendationDetail]:
"""Extract seismic-related recommendations"""
recommendations = []
seismic = risks.hazards.seismic
if self._is_hazard_active(seismic.active_fault) or self._is_hazard_active(seismic.ground_shaking):
recommendations.append(RecommendationDetail(
hazard_type="Earthquake/Ground Shaking",
recommendation="Use reinforced concrete or steel frame construction with proper seismic detailing. Ensure adequate lateral bracing and shear walls.",
rationale="Strong ground shaking can cause structural failure in buildings without proper seismic resistance.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(seismic.liquefaction):
recommendations.append(RecommendationDetail(
hazard_type="Liquefaction",
recommendation="Implement deep foundation systems (piles or caissons) extending below liquefiable soil layers. Consider ground improvement techniques.",
rationale="Liquefaction causes soil to lose strength, leading to foundation failure and building settlement.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(seismic.tsunami):
recommendations.append(RecommendationDetail(
hazard_type="Tsunami",
recommendation="Elevate critical building systems above expected tsunami inundation levels. Use flood-resistant materials for lower floors.",
rationale="Tsunami waves can cause severe flooding and structural damage to coastal buildings.",
source_url=page_contents[0]['url'] if page_contents else None
))
return recommendations
def _extract_volcanic_recommendations(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData
) -> List[RecommendationDetail]:
"""Extract volcanic-related recommendations"""
recommendations = []
volcanic = risks.hazards.volcanic
if self._is_hazard_active(volcanic.ashfall):
recommendations.append(RecommendationDetail(
hazard_type="Volcanic Ashfall",
recommendation="Design roofs with steep pitch (minimum 30 degrees) to prevent ash accumulation. Use reinforced roof structures to handle additional ash load.",
rationale="Volcanic ash accumulation can cause roof collapse due to excessive weight.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(volcanic.pyroclastic_flow):
recommendations.append(RecommendationDetail(
hazard_type="Pyroclastic Flow",
recommendation="Consider relocating construction site outside pyroclastic flow hazard zones. If not possible, use reinforced concrete construction with minimal openings.",
rationale="Pyroclastic flows are extremely destructive and can destroy most structures.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(volcanic.lahar):
recommendations.append(RecommendationDetail(
hazard_type="Lahar",
recommendation="Elevate building on raised foundation or stilts. Install protective barriers and drainage systems to divert lahar flows.",
rationale="Lahars (volcanic mudflows) can bury and damage structures in their path.",
source_url=page_contents[0]['url'] if page_contents else None
))
return recommendations
def _extract_hydrometeorological_recommendations(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData
) -> List[RecommendationDetail]:
"""Extract hydrometeorological-related recommendations"""
recommendations = []
hydro = risks.hazards.hydrometeorological
if self._is_hazard_active(hydro.flood):
recommendations.append(RecommendationDetail(
hazard_type="Flooding",
recommendation="Elevate building above expected flood levels. Use flood-resistant materials (concrete, treated wood) for lower floors. Install proper drainage systems.",
rationale="Flooding can cause structural damage, foundation erosion, and interior damage.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(hydro.rain_induced_landslide):
recommendations.append(RecommendationDetail(
hazard_type="Landslide",
recommendation="Implement slope stabilization measures including retaining walls, proper drainage, and vegetation. Avoid building on steep slopes if possible.",
rationale="Landslides can destroy buildings and cause loss of life.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(hydro.storm_surge):
recommendations.append(RecommendationDetail(
hazard_type="Storm Surge",
recommendation="Elevate building on reinforced foundation. Use impact-resistant materials. Install storm shutters for windows and openings.",
rationale="Storm surge can cause severe flooding and wave impact damage to coastal structures.",
source_url=page_contents[0]['url'] if page_contents else None
))
if self._is_hazard_active(hydro.severe_winds):
recommendations.append(RecommendationDetail(
hazard_type="Severe Winds/Typhoon",
recommendation="Use hurricane straps and clips to secure roof to walls. Install impact-resistant windows. Ensure proper roof-to-wall connections.",
rationale="Severe winds can cause roof failure, window breakage, and structural damage.",
source_url=page_contents[0]['url'] if page_contents else None
))
return recommendations
def _generate_general_guidelines(
self,
building_type: BuildingType,
risks: RiskData
) -> List[str]:
"""Generate general construction guidelines"""
guidelines = [
"Follow the National Building Code of the Philippines (NBCP) and National Structural Code of the Philippines (NSCP)",
"Hire licensed engineers and architects for design and supervision",
"Use quality materials that meet Philippine Standards (PS) specifications",
"Ensure proper construction supervision and quality control throughout the project",
"Obtain all necessary building permits and comply with local zoning regulations"
]
# Add building-type specific guidelines
if "residential" in building_type:
guidelines.append("Consider future expansion needs and ensure structural capacity for additional floors if planned")
elif "commercial" in building_type or "institutional" in building_type:
guidelines.append("Ensure compliance with fire safety codes and accessibility standards for public buildings")
elif "industrial" in building_type:
guidelines.append("Design for specific industrial loads and vibration requirements")
# Add risk-specific guidelines
if risks.summary.overall_risk_level in ["CRITICAL", "HIGH"]:
guidelines.append("Consider engaging a disaster risk reduction specialist for site-specific recommendations")
guidelines.append("Implement redundant safety systems and emergency evacuation plans")
return guidelines
def _extract_building_codes(
self,
page_contents: List[Dict[str, Any]]
) -> List[BuildingCodeReference]:
"""Extract building code references from content"""
codes = [
BuildingCodeReference(
code_name="National Building Code of the Philippines (NBCP)",
section="General",
requirement="All construction must comply with NBCP provisions for structural safety, fire safety, and sanitation"
),
BuildingCodeReference(
code_name="National Structural Code of the Philippines (NSCP)",
section="Seismic Design",
requirement="Buildings must be designed to resist seismic forces based on seismic zone classification"
),
BuildingCodeReference(
code_name="NSCP",
section="Wind Design",
requirement="Structures must be designed for wind loads based on location and exposure category"
)
]
return codes
def _generate_priority_actions(
self,
risks: RiskData,
building_type: BuildingType
) -> List[str]:
"""Generate priority actions based on risk assessment"""
actions = []
# High priority actions based on risk level
if risks.summary.overall_risk_level == "CRITICAL":
actions.append("URGENT: Conduct detailed geotechnical investigation before construction")
actions.append("URGENT: Engage structural engineer with disaster-resistant design expertise")
# Priority actions based on specific hazards
seismic = risks.hazards.seismic
if self._is_hazard_active(seismic.active_fault):
actions.append("Conduct fault line mapping and avoid building directly on active faults")
if self._is_hazard_active(seismic.liquefaction):
actions.append("Perform soil liquefaction assessment and implement ground improvement if needed")
volcanic = risks.hazards.volcanic
if self._is_hazard_active(volcanic.active_volcano):
actions.append("Review volcanic hazard maps and consider evacuation routes in site planning")
hydro = risks.hazards.hydrometeorological
if self._is_hazard_active(hydro.flood):
actions.append("Determine 100-year flood elevation and design building accordingly")
if self._is_hazard_active(hydro.rain_induced_landslide):
actions.append("Conduct slope stability analysis and implement stabilization measures")
# General priority actions
actions.append("Obtain building permit and ensure compliance with local building codes")
actions.append("Implement quality assurance program during construction")
return actions
async def _synthesize_with_llm(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData,
building_type: BuildingType,
risk_types: List[str]
) -> Recommendations:
"""
Use LLM to synthesize construction recommendations
Args:
page_contents: Fetched web page contents
risks: Risk assessment data
building_type: Type of building
risk_types: List of identified risk types
Returns:
Structured recommendations with LLM analysis
"""
try:
# Get OpenAI API key
openai_api_key = os.getenv('OPENAI_API_KEY')
# Initialize LLM
model = ChatOpenAI(
model=self.model_name,
api_key=openai_api_key,
temperature=0.7
)
logger.info(f"Using OpenAI model: {self.model_name}")
# Create context from page contents
context = self._create_research_context(page_contents, risks, building_type, risk_types)
# Create prompt for LLM
prompt = f"""{self.system_prompt}
Based on the following research and risk assessment, provide comprehensive construction recommendations:
{context}
Provide detailed recommendations in the following format:
## General Guidelines
- List 5-7 general construction guidelines
## Seismic Recommendations
For each active seismic hazard, provide:
- Hazard type
- Specific recommendation
- Rationale
## Volcanic Recommendations
For each active volcanic hazard, provide:
- Hazard type
- Specific recommendation
- Rationale
## Hydrometeorological Recommendations
For each active hydrometeorological hazard, provide:
- Hazard type
- Specific recommendation
- Rationale
## Priority Actions
- List 5-8 priority actions in order of importance
## Building Code References
- List relevant Philippine building codes (NBCP, NSCP) with sections and requirements
"""
# Get LLM response
logger.info("Invoking LLM for synthesis...")
response = await model.ainvoke(prompt)
# Extract content
llm_output = response.content if hasattr(response, 'content') else str(response)
logger.info(f"LLM synthesis completed: {len(llm_output)} characters")
# Parse LLM output into structured recommendations
recommendations = self._parse_llm_recommendations(llm_output, risks, building_type)
# Add LLM analysis to recommendations
if hasattr(recommendations, 'llm_analysis'):
recommendations.llm_analysis = llm_output
return recommendations
except Exception as e:
logger.error(f"LLM synthesis failed: {str(e)}")
raise
async def _stream_llm_synthesis(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData,
building_type: BuildingType,
risk_types: List[str]
) -> AsyncGenerator[str, None]:
"""
Stream LLM synthesis of construction recommendations
Args:
page_contents: Fetched web page contents
risks: Risk assessment data
building_type: Type of building
risk_types: List of identified risk types
Yields:
Streaming recommendations from LLM
"""
try:
# Get OpenAI API key
openai_api_key = os.getenv('OPENAI_API_KEY')
# Initialize LLM
model = ChatOpenAI(
model=self.model_name,
api_key=openai_api_key,
temperature=0.7
)
logger.info(f"Using OpenAI model: {self.model_name}")
# Create context
context = self._create_research_context(page_contents, risks, building_type, risk_types)
# Create prompt
prompt = f"""{self.system_prompt}
Based on the following research and risk assessment, provide comprehensive construction recommendations:
{context}
Provide detailed, practical recommendations for disaster-resistant construction."""
# Stream LLM response
logger.info("Starting LLM streaming synthesis...")
async for chunk in model.astream(prompt):
if hasattr(chunk, 'content') and chunk.content:
yield chunk.content
logger.info("Streaming synthesis completed")
except Exception as e:
logger.error(f"LLM streaming failed: {str(e)}")
yield f"\n\nError: {str(e)}\n"
def _create_research_context(
self,
page_contents: List[Dict[str, Any]],
risks: RiskData,
building_type: BuildingType,
risk_types: List[str]
) -> str:
"""Create context for LLM from research data"""
context_parts = []
# Building and location info
context_parts.append(f"## Building Information")
context_parts.append(f"Building Type: {building_type.replace('_', ' ').title()}")
context_parts.append(f"Location: {risks.location.name}, {risks.location.administrative_area}")
context_parts.append(f"Coordinates: {risks.location.coordinates.latitude}, {risks.location.coordinates.longitude}")
# Risk summary
context_parts.append(f"\n## Risk Assessment Summary")
context_parts.append(f"Overall Risk Level: {risks.summary.overall_risk_level}")
context_parts.append(f"High Risk Hazards: {risks.summary.high_risk_count}")
context_parts.append(f"Moderate Risk Hazards: {risks.summary.moderate_risk_count}")
if risks.summary.critical_hazards:
context_parts.append(f"Critical Hazards: {', '.join(risks.summary.critical_hazards)}")
# Active hazards
context_parts.append(f"\n## Active Hazards")
context_parts.append(f"Risk Types: {', '.join(risk_types)}")
# Seismic hazards
seismic = risks.hazards.seismic
if self._is_hazard_active(seismic.active_fault):
context_parts.append(f"\n### Seismic Hazards")
context_parts.append(f"- Active Fault: {seismic.active_fault.description}")
if seismic.active_fault.distance:
context_parts.append(f" Distance: {seismic.active_fault.distance}")
if self._is_hazard_active(seismic.ground_shaking):
context_parts.append(f"- Ground Shaking: {seismic.ground_shaking.description}")
if self._is_hazard_active(seismic.liquefaction):
context_parts.append(f"- Liquefaction: {seismic.liquefaction.description}")
# Volcanic hazards
volcanic = risks.hazards.volcanic
if self._is_hazard_active(volcanic.active_volcano):
context_parts.append(f"\n### Volcanic Hazards")
context_parts.append(f"- Active Volcano: {volcanic.active_volcano.description}")
if volcanic.active_volcano.distance:
context_parts.append(f" Distance: {volcanic.active_volcano.distance}")
if self._is_hazard_active(volcanic.ashfall):
context_parts.append(f"- Ashfall: {volcanic.ashfall.description}")
# Hydrometeorological hazards
hydro = risks.hazards.hydrometeorological
if self._is_hazard_active(hydro.flood):
context_parts.append(f"\n### Hydrometeorological Hazards")
context_parts.append(f"- Flood: {hydro.flood.description}")
if self._is_hazard_active(hydro.rain_induced_landslide):
context_parts.append(f"- Landslide: {hydro.rain_induced_landslide.description}")
if self._is_hazard_active(hydro.storm_surge):
context_parts.append(f"- Storm Surge: {hydro.storm_surge.description}")
if self._is_hazard_active(hydro.severe_winds):
context_parts.append(f"- Severe Winds: {hydro.severe_winds.description}")
# Research sources
if page_contents:
context_parts.append(f"\n## Research Sources")
for i, content in enumerate(page_contents[:3], 1): # Limit to top 3
context_parts.append(f"\n### Source {i}: {content.get('title', 'Unknown')}")
context_parts.append(f"URL: {content.get('url', 'N/A')}")
# Truncate content to avoid token limits
content_text = content.get('content', '')
if isinstance(content_text, str):
content_text = content_text[:2000] # Limit to 2000 chars per source
context_parts.append(f"Content: {content_text}")
return "\n".join(context_parts)
def _parse_llm_recommendations(
self,
llm_output: str,
risks: RiskData,
building_type: BuildingType
) -> Recommendations:
"""
Parse LLM output into structured Recommendations
Falls back to rule-based recommendations if parsing fails
"""
try:
# Try to extract structured data from LLM output
# This is a simple parser - could be enhanced with more sophisticated parsing
general_guidelines = []
seismic_recs = []
volcanic_recs = []
hydro_recs = []
priority_actions = []
building_codes = []
# Split by sections
sections = llm_output.split('##')
for section in sections:
section_lower = section.lower()
if 'general' in section_lower and 'guideline' in section_lower:
# Extract bullet points
lines = section.split('\n')
for line in lines:
line = line.strip()
if line.startswith('-') or line.startswith('•'):
general_guidelines.append(line.lstrip('-•').strip())
elif 'priority' in section_lower and 'action' in section_lower:
lines = section.split('\n')
for line in lines:
line = line.strip()
if line.startswith('-') or line.startswith('•'):
priority_actions.append(line.lstrip('-•').strip())
# If parsing didn't extract enough data, fall back to rule-based
if len(general_guidelines) < 3:
logger.warning("LLM output parsing incomplete, using rule-based fallback")
return self.synthesize_recommendations([], risks, building_type)
# Use rule-based for hazard-specific recommendations
# (LLM output format may vary, so we use reliable rule-based approach)
seismic_recs = self._extract_seismic_recommendations([], risks)
volcanic_recs = self._extract_volcanic_recommendations([], risks)
hydro_recs = self._extract_hydrometeorological_recommendations([], risks)
building_codes = self._extract_building_codes([])
# Ensure we have priority actions
if len(priority_actions) < 3:
priority_actions = self._generate_priority_actions(risks, building_type)
return Recommendations(
general_guidelines=general_guidelines[:7], # Limit to 7
seismic_recommendations=seismic_recs,
volcanic_recommendations=volcanic_recs,
hydrometeorological_recommendations=hydro_recs,
priority_actions=priority_actions[:8], # Limit to 8
building_codes=building_codes
)
except Exception as e:
logger.error(f"Failed to parse LLM recommendations: {str(e)}")
# Fall back to rule-based
return self.synthesize_recommendations([], risks, building_type)
def _generate_fallback_recommendations(
self,
risks: RiskData,
building_type: BuildingType
) -> Recommendations:
"""Generate basic fallback recommendations when all else fails"""
return self.synthesize_recommendations([], risks, building_type)
# Blaxel agent entry point
async def main(input_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Main entry point for Blaxel agent
Args:
input_data: Dictionary containing risks and building_type
Returns:
Dictionary containing recommendations
"""
agent = ResearchAgent()
# Parse input
risks = input_data.get('risks')
building_type = input_data.get('building_type')
if not risks or not building_type:
return {
'success': False,
'error': 'Missing required parameters: risks and building_type'
}
try:
recommendations = await agent.get_construction_recommendations(risks, building_type)
return {
'success': True,
'recommendations': recommendations
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
if __name__ == "__main__":
import sys
import json
# For local testing
if len(sys.argv) > 1:
input_data = json.loads(sys.argv[1])
result = asyncio.run(main(input_data))
print(json.dumps(result, indent=2))