""" Risk Display Component Displays risk assessment results with color coding """ from models import RiskData, HazardDetail from typing import List def format_risk_display(risk_data) -> str: """ Format risk data for display with color coding and structured sections Args: risk_data: Risk assessment data (dict or RiskData object) Returns: Formatted HTML/Markdown string """ import logging logger = logging.getLogger(__name__) # Debug logging logger.info(f"format_risk_display received type: {type(risk_data)}") logger.info(f"risk_data is dict: {isinstance(risk_data, dict)}") if isinstance(risk_data, dict): logger.info(f"risk_data keys: {list(risk_data.keys())}") logger.info(f"risk_data.success: {risk_data.get('success')}") # Handle both dict and object formats if not risk_data: return "⚠️ Risk assessment data unavailable" success = risk_data.get('success', True) if isinstance(risk_data, dict) else getattr(risk_data, 'success', True) if not success: return "⚠️ Risk assessment data unavailable" # Color coding for risk levels risk_colors = { "CRITICAL": "#dc2626", # red "HIGH": "#ea580c", # orange "MODERATE": "#eab308", # yellow "LOW": "#16a34a" # green } # Extract data safely from dict or object summary = risk_data.get('summary', {}) if isinstance(risk_data, dict) else risk_data.summary location = risk_data.get('location', {}) if isinstance(risk_data, dict) else risk_data.location risk_level = summary.get('overall_risk_level', 'UNKNOWN') if isinstance(summary, dict) else summary.overall_risk_level risk_color = risk_colors.get(risk_level, "#6b7280") # Extract location data - handle both nested and flat structures if isinstance(location, dict): location_name = location.get('name', 'Unknown') # Check if coordinates are nested or at top level if 'coordinates' in location: coordinates = location.get('coordinates', {}) lat = coordinates.get('latitude', 0) if isinstance(coordinates, dict) else coordinates.latitude lon = coordinates.get('longitude', 0) if isinstance(coordinates, dict) else coordinates.longitude else: # Flat structure - lat/lon directly in location lat = location.get('latitude', 0) lon = location.get('longitude', 0) admin_area = location.get('administrative_area', location_name) else: location_name = location.name coordinates = location.coordinates lat = coordinates.latitude if hasattr(coordinates, 'latitude') else coordinates.get('latitude', 0) lon = coordinates.longitude if hasattr(coordinates, 'longitude') else coordinates.get('longitude', 0) admin_area = location.administrative_area # Extract summary stats total_hazards = summary.get('total_hazards_assessed', 0) if isinstance(summary, dict) else summary.total_hazards_assessed high_risk = summary.get('high_risk_count', 0) if isinstance(summary, dict) else summary.high_risk_count moderate_risk = summary.get('moderate_risk_count', 0) if isinstance(summary, dict) else summary.moderate_risk_count # Build HTML output html = f"""

🛡️ Risk Assessment Summary

Overall Risk Level: {risk_level}

Location: {location_name}

Coordinates: {lat:.4f}°N, {lon:.4f}°E

Administrative Area: {admin_area}

📊 Hazard Statistics

""" # Critical hazards section critical_hazards = summary.get('critical_hazards', []) if isinstance(summary, dict) else summary.critical_hazards if critical_hazards: html += """

⚠️ Critical Hazards

""" # Seismic hazards try: hazards = risk_data.get('hazards', {}) if isinstance(risk_data, dict) else risk_data.hazards seismic = hazards.get('seismic', {}) if isinstance(hazards, dict) else hazards.seismic # Handle both camelCase (MCP) and snake_case (processed) keys html += _format_hazard_section( "🌍 Seismic Hazards", [ ("Active Fault", seismic.get('activeFault') or seismic.get('active_fault', {}) if isinstance(seismic, dict) else seismic.active_fault), ("Ground Shaking", seismic.get('groundShaking') or seismic.get('ground_shaking', {}) if isinstance(seismic, dict) else seismic.ground_shaking), ("Liquefaction", seismic.get('liquefaction', {}) if isinstance(seismic, dict) else seismic.liquefaction), ("Tsunami", seismic.get('tsunami', {}) if isinstance(seismic, dict) else seismic.tsunami), ("Earthquake-Induced Landslide", seismic.get('eil') or seismic.get('earthquake_induced_landslide', {}) if isinstance(seismic, dict) else seismic.earthquake_induced_landslide), ("Fissure", seismic.get('fissure', {}) if isinstance(seismic, dict) else seismic.fissure), ("Ground Rupture", seismic.get('groundRupture') or seismic.get('ground_rupture', {}) if isinstance(seismic, dict) else seismic.ground_rupture), ] ) except Exception as e: logger.warning(f"Error formatting seismic hazards: {e}") html += "

Seismic hazard details unavailable

" # Volcanic hazards try: volcanic = hazards.get('volcanic', {}) if isinstance(hazards, dict) else hazards.volcanic html += _format_hazard_section( "🌋 Volcanic Hazards", [ ("Active Volcano", volcanic.get('activeVolcano') or volcanic.get('active_volcano', {}) if isinstance(volcanic, dict) else volcanic.active_volcano), ("Potentially Active Volcano", volcanic.get('potentiallyActiveVolcano') or volcanic.get('potentially_active_volcano', {}) if isinstance(volcanic, dict) else volcanic.potentially_active_volcano), ("Inactive Volcano", volcanic.get('inactiveVolcano') or volcanic.get('inactive_volcano', {}) if isinstance(volcanic, dict) else volcanic.inactive_volcano), ("Ashfall", volcanic.get('ashfall', {}) if isinstance(volcanic, dict) else volcanic.ashfall), ("Pyroclastic Flow", volcanic.get('pyroclasticFlow') or volcanic.get('pyroclastic_flow', {}) if isinstance(volcanic, dict) else volcanic.pyroclastic_flow), ("Lahar", volcanic.get('lahar', {}) if isinstance(volcanic, dict) else volcanic.lahar), ("Lava", volcanic.get('lava', {}) if isinstance(volcanic, dict) else volcanic.lava), ("Ballistic Projectile", volcanic.get('ballisticProjectile') or volcanic.get('ballistic_projectile', {}) if isinstance(volcanic, dict) else volcanic.ballistic_projectile), ("Base Surge", volcanic.get('baseSurge') or volcanic.get('base_surge', {}) if isinstance(volcanic, dict) else volcanic.base_surge), ("Volcanic Tsunami", volcanic.get('volcanicTsunami') or volcanic.get('volcanic_tsunami', {}) if isinstance(volcanic, dict) else volcanic.volcanic_tsunami), ] ) except Exception as e: logger.warning(f"Error formatting volcanic hazards: {e}") html += "

Volcanic hazard details unavailable

" # Hydrometeorological hazards try: hydro = hazards.get('hydrometeorological', {}) if isinstance(hazards, dict) else hazards.hydrometeorological html += _format_hazard_section( "🌊 Hydrometeorological Hazards", [ ("Flood", hydro.get('flood', {}) if isinstance(hydro, dict) else hydro.flood), ("Rain-Induced Landslide", hydro.get('ril') or hydro.get('rain_induced_landslide', {}) if isinstance(hydro, dict) else hydro.rain_induced_landslide), ("Storm Surge", hydro.get('stormSurge') or hydro.get('storm_surge', {}) if isinstance(hydro, dict) else hydro.storm_surge), ("Severe Winds", hydro.get('severeWinds') or hydro.get('severe_winds', {}) if isinstance(hydro, dict) else hydro.severe_winds), ] ) except Exception as e: logger.warning(f"Error formatting hydrometeorological hazards: {e}") html += "

Hydrometeorological hazard details unavailable

" # Metadata metadata = risk_data.get('metadata', {}) if isinstance(risk_data, dict) else getattr(risk_data, 'metadata', {}) if metadata: source = metadata.get('source', 'Unknown') if isinstance(metadata, dict) else getattr(metadata, 'source', 'Unknown') timestamp = metadata.get('timestamp', 'Unknown') if isinstance(metadata, dict) else getattr(metadata, 'timestamp', 'Unknown') cache_status = metadata.get('cache_status', 'Unknown') if isinstance(metadata, dict) else getattr(metadata, 'cache_status', 'Unknown') html += f"""

Data Source: {source}

Last Updated: {timestamp}

Cache Status: {cache_status}

""" html += """
""" return html def _format_hazard_section(title: str, hazards: List[tuple]) -> str: """Format a section of hazards""" html = f"""

{title}

""" for name, hazard in hazards: if hazard: # Handle both dict and object formats # MCP uses 'assessment' field, processed data uses 'status' if isinstance(hazard, dict): status = hazard.get('assessment') or hazard.get('status', 'Unknown') else: status = getattr(hazard, 'assessment', None) or getattr(hazard, 'status', 'Unknown') status_color = _get_status_color(status) details = _format_hazard_details(hazard) # Extract first line of status if it's multi-line status_display = status.split(';')[0] if ';' in status else status if len(status_display) > 50: status_display = status_display[:50] + "..." html += f""" """ html += """
Hazard Status Details
{name} {status_display} {details}
""" return html def _get_status_color(status: str) -> str: """Get color for hazard status""" status_lower = status.lower() if "high" in status_lower or "critical" in status_lower or "present" in status_lower or "prone" in status_lower: return "#dc2626" # red elif "moderate" in status_lower or "susceptible" in status_lower or "generally susceptible" in status_lower: return "#ea580c" # orange elif "low" in status_lower: return "#eab308" # yellow elif "safe" in status_lower or "n/a" in status_lower or "unavailable" in status_lower: return "#16a34a" # green else: return "#6b7280" # gray for unknown def _format_hazard_details(hazard) -> str: """Format hazard details into readable text (handles dict or object)""" details = [] # Handle both dict and object formats description = hazard.get('description') if isinstance(hazard, dict) else getattr(hazard, 'description', None) assessment = hazard.get('assessment') if isinstance(hazard, dict) else getattr(hazard, 'assessment', None) distance = hazard.get('distance') if isinstance(hazard, dict) else getattr(hazard, 'distance', None) direction = hazard.get('direction') if isinstance(hazard, dict) else getattr(hazard, 'direction', None) severity = hazard.get('severity') if isinstance(hazard, dict) else getattr(hazard, 'severity', None) units = hazard.get('units') if isinstance(hazard, dict) else getattr(hazard, 'units', None) # Use assessment if description is not available (MCP format) if not description and assessment: description = assessment if description: # Truncate long descriptions if len(description) > 200: description = description[:200] + "..." details.append(description) if distance and distance != 'N/A': distance_str = f"{distance} {units}" if units and units != 'N/A' else str(distance) details.append(f"Distance: {distance_str}") if direction and direction != 'N/A': details.append(f"Direction: {direction}") if severity: details.append(f"Severity: {severity}") return " | ".join(details) if details else "No additional details"