Dexter Edep
Fix risk assessment
75e9f67
"""
Data models for Disaster Risk Construction Planner
Pydantic models for FastAPI compatibility and Blaxel deployment
"""
from __future__ import annotations
from typing import Optional, List, Literal, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
# Input Types
BuildingType = Literal[
"residential_single_family",
"residential_multi_family",
"residential_high_rise",
"commercial_office",
"commercial_retail",
"industrial_warehouse",
"institutional_school",
"institutional_hospital",
"infrastructure_bridge",
"mixed_use"
]
RiskLevel = Literal["CRITICAL", "HIGH", "MODERATE", "LOW"]
# Base Models
class Coordinates(BaseModel):
"""Geographic coordinates"""
latitude: float
longitude: float
class LocationInfo(BaseModel):
"""Location information"""
name: str
coordinates: Coordinates
administrative_area: str
# Risk Assessment Models
class HazardDetail(BaseModel):
"""Detailed information about a specific hazard"""
status: str
description: str
distance: Optional[str] = None
direction: Optional[str] = None
severity: Optional[str] = None
class SeismicHazards(BaseModel):
"""Seismic hazard information"""
active_fault: HazardDetail
ground_shaking: HazardDetail
liquefaction: HazardDetail
tsunami: HazardDetail
earthquake_induced_landslide: HazardDetail
fissure: HazardDetail
ground_rupture: HazardDetail
class VolcanicHazards(BaseModel):
"""Volcanic hazard information"""
active_volcano: HazardDetail
potentially_active_volcano: HazardDetail
inactive_volcano: HazardDetail
ashfall: HazardDetail
pyroclastic_flow: HazardDetail
lahar: HazardDetail
lava: HazardDetail
ballistic_projectile: HazardDetail
base_surge: HazardDetail
volcanic_tsunami: HazardDetail
class HydroHazards(BaseModel):
"""Hydrometeorological hazard information"""
flood: HazardDetail
rain_induced_landslide: HazardDetail
storm_surge: HazardDetail
severe_winds: HazardDetail
class HazardData(BaseModel):
"""Complete hazard data from risk assessment"""
seismic: SeismicHazards
volcanic: VolcanicHazards
hydrometeorological: HydroHazards
class RiskSummary(BaseModel):
"""Summary of overall risk assessment"""
overall_risk_level: RiskLevel
total_hazards_assessed: int
high_risk_count: int
moderate_risk_count: int
critical_hazards: List[str] = Field(default_factory=list)
llm_analysis: Optional[str] = None # LLM-generated analysis and recommendations
class FacilityInfo(BaseModel):
"""Critical facilities information from risk assessment"""
schools: Dict[str, Any] | List[Dict[str, Any]] = Field(default_factory=lambda: {})
hospitals: Dict[str, Any] | List[Dict[str, Any]] = Field(default_factory=lambda: {})
road_networks: Dict[str, Any] | List[Dict[str, Any]] = Field(default_factory=lambda: [])
class Metadata(BaseModel):
"""Metadata for data sources"""
timestamp: str
source: str
cache_status: str
ttl: int
class RiskData(BaseModel):
"""Complete risk assessment data"""
success: bool
summary: RiskSummary
location: LocationInfo
hazards: HazardData
facilities: FacilityInfo
metadata: Metadata
# Construction Recommendations Models
class RecommendationDetail(BaseModel):
"""Detailed construction recommendation"""
hazard_type: str
recommendation: str
rationale: str
source_url: Optional[str] = None
class BuildingCodeReference(BaseModel):
"""Building code reference"""
code_name: str
section: str
requirement: str
class Recommendations(BaseModel):
"""Construction recommendations"""
general_guidelines: List[str] = Field(default_factory=list)
seismic_recommendations: List[RecommendationDetail] = Field(default_factory=list)
volcanic_recommendations: List[RecommendationDetail] = Field(default_factory=list)
hydrometeorological_recommendations: List[RecommendationDetail] = Field(default_factory=list)
priority_actions: List[str] = Field(default_factory=list)
building_codes: List[BuildingCodeReference] = Field(default_factory=list)
# Material Cost Models
class MaterialCost(BaseModel):
"""Material cost information"""
material_name: str
category: str
unit: str
price_per_unit: float
currency: str
quantity_needed: Optional[float] = None
total_cost: Optional[float] = None
source: Optional[str] = None
class CostEstimate(BaseModel):
"""Cost estimate range"""
low: float
mid: float
high: float
currency: str
class CostData(BaseModel):
"""Complete cost analysis data"""
materials: List[MaterialCost] = Field(default_factory=list)
total_estimate: Optional[CostEstimate] = None
market_conditions: str = ""
last_updated: str = ""
# Critical Facilities Models
class FacilityDetail(BaseModel):
"""Detailed facility information"""
name: str
type: str
distance_meters: float
travel_time_minutes: float
directions: str
coordinates: Coordinates
class RoadDetail(BaseModel):
"""Road network information"""
name: str
type: Literal["primary", "secondary"]
distance_meters: float
class FacilityData(BaseModel):
"""Complete facility location data"""
schools: List[FacilityDetail] = Field(default_factory=list)
hospitals: List[FacilityDetail] = Field(default_factory=list)
emergency_services: List[FacilityDetail] = Field(default_factory=list)
utilities: List[FacilityDetail] = Field(default_factory=list)
road_networks: List[RoadDetail] = Field(default_factory=list)
# Final Output Models
class PlanMetadata(BaseModel):
"""Construction plan metadata"""
generated_at: str
building_type: BuildingType
building_area: Optional[float]
location: LocationInfo
coordinates: Coordinates
class ExecutiveSummary(BaseModel):
"""Executive summary of construction plan"""
overall_risk: str
critical_concerns: List[str] = Field(default_factory=list)
key_recommendations: List[str] = Field(default_factory=list)
building_specific_notes: List[str] = Field(default_factory=list)
class ExportFormats(BaseModel):
"""Export format URLs"""
pdf_url: Optional[str] = None
json_url: Optional[str] = None
class ConstructionPlan(BaseModel):
"""Complete construction plan output"""
metadata: PlanMetadata
executive_summary: ExecutiveSummary
risk_assessment: RiskData
construction_recommendations: Recommendations
material_costs: CostData
critical_facilities: FacilityData
export_formats: ExportFormats
# Error Handling Models
class ErrorDetail(BaseModel):
"""Error detail information"""
code: str
message: str
details: Optional[Dict[str, Any]] = None
retry_possible: bool = False
class ErrorResponse(BaseModel):
"""Error response structure"""
success: bool = False
error: Optional[ErrorDetail] = None
partial_results: Optional[Dict[str, Any]] = None