import os
from datetime import datetime
import ee
import json
import numpy as np
import geemap.foliumap as gee_folium
import leafmap.foliumap as leaf_folium
import gradio as gr
import pandas as pd
import geopandas as gpd
import plotly.express as px
import branca.colormap as cm
from shapely.ops import transform
import pyproj
from io import BytesIO
import requests
import kml2geojson
import folium
import xml.etree.ElementTree as ET
from fastapi import FastAPI, HTTPException, Response
from urllib.parse import unquote
from pydantic import BaseModel, HttpUrl
app = FastAPI()
# --- Helper Functions ---
def one_time_setup():
"""Initializes the Earth Engine API."""
try:
# Attempt to initialize with default credentials
ee.Initialize()
except Exception:
try:
# Fallback to service account credentials if default init fails
credentials_path = os.path.expanduser("~/.config/earthengine/credentials.json")
ee_credentials = os.environ.get("EE")
if ee_credentials:
os.makedirs(os.path.dirname(credentials_path), exist_ok=True)
with open(credentials_path, "w") as f:
f.write(ee_credentials)
credentials = ee.ServiceAccountCredentials('ujjwal@ee-ujjwaliitd.iam.gserviceaccount.com', credentials_path)
ee.Initialize(credentials, project='ee-ujjwaliitd')
except Exception as inner_e:
# If the fallback also fails, print the error
print(f"Earth Engine initialization failed: {inner_e}")
def _process_spatial_data(data_bytes: BytesIO) -> gpd.GeoDataFrame:
"""Core function to process bytes of a KML or GeoJSON file."""
# Read the first few bytes to determine file type without consuming the stream
start_of_file = data_bytes.read(100)
data_bytes.seek(0) # Reset stream position
# Check if the file is KML (XML-based)
if start_of_file.strip().lower().startswith(b' gpd.GeoDataFrame:
"""Downloads and reads a KML/GeoJSON from a URL."""
if not url or not url.strip():
return None
# Handle Google Drive URLs
if "drive.google.com" in url:
if "/file/d/" in url:
file_id = url.split('/d/')[1].split('/')[0]
elif "open?id=" in url:
file_id = url.split('open?id=')[1].split('&')[0]
else:
raise ValueError("Unsupported Google Drive URL format. Please provide a direct link or a shareable link with 'open?id=' or '/file/d/'.")
download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
else:
download_url = url
try:
response = requests.get(download_url, timeout=30)
response.raise_for_status()
data_bytes = BytesIO(response.content)
return _process_spatial_data(data_bytes)
except requests.exceptions.RequestException as e:
raise ValueError(f"Failed to download file from URL: {e}")
def find_best_epsg(geometry) -> int:
"""Finds the most suitable EPSG code for a given geometry based on its centroid."""
if geometry.geom_type == "Polygon":
centroid = geometry.centroid
else:
raise ValueError("Geometry is not a Polygon.")
common_epsg_codes = [
7761, # Gujarat
7774, # Rajasthan
7766, # MadhyaPradesh
7767, # Maharastra
7755, # India
# Add other relevant state/country EPSG codes here
]
for epsg in common_epsg_codes:
try:
crs = pyproj.CRS.from_epsg(epsg)
area_of_use = crs.area_of_use.bounds
if (area_of_use[0] <= centroid.x <= area_of_use[2]) and \
(area_of_use[1] <= centroid.y <= area_of_use[3]):
return epsg
except pyproj.exceptions.CRSError:
continue
return 4326 # Default to WGS84 if no suitable projection is found
def shape_3d_to_2d(shape):
"""Converts a 3D geometry to 2D."""
if shape.has_z:
return transform(lambda x, y, z: (x, y), shape)
return shape
def preprocess_gdf(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Preprocesses a GeoDataFrame by converting geometries to 2D and fixing invalid ones."""
gdf["geometry"] = gdf["geometry"].apply(shape_3d_to_2d)
gdf["geometry"] = gdf.buffer(0)
return gdf
def to_best_crs(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Converts a GeoDataFrame to the most suitable CRS."""
if not gdf.empty and gdf["geometry"].iloc[0] is not None:
best_epsg_code = find_best_epsg(gdf.to_crs(epsg=4326)["geometry"].iloc[0])
return gdf.to_crs(epsg=best_epsg_code)
return gdf
def is_valid_polygon(geometry_gdf):
"""Checks if the geometry in a GeoDataFrame is a valid, non-empty Polygon."""
if geometry_gdf.empty:
return False
geometry = geometry_gdf.geometry.item()
return (geometry.type == 'Polygon') and (not geometry.is_empty)
def add_geometry_to_map(m, geometry_gdf, buffer_geometry_gdf, opacity=0.3):
"""Adds geometry and its buffer to a folium map."""
if buffer_geometry_gdf is not None and not buffer_geometry_gdf.empty:
folium.GeoJson(
buffer_geometry_gdf.to_crs(epsg=4326),
name="Geometry Buffer",
style_function=lambda x: {"color": "red", "fillOpacity": opacity, "fillColor": "red"}
).add_to(m)
if geometry_gdf is not None and not geometry_gdf.empty:
folium.GeoJson(
geometry_gdf.to_crs(epsg=4326),
name="Geometry",
style_function=lambda x: {"color": "blue", "fillOpacity": opacity, "fillColor": "blue"}
).add_to(m)
return m
def get_wayback_data():
"""Fetches and parses Wayback imagery data from ArcGIS."""
try:
url = "https://wayback.maptiles.arcgis.com/arcgis/rest/services/World_Imagery/MapServer/WMTS/1.0.0/WMTSCapabilities.xml"
response = requests.get(url)
response.raise_for_status() # Ensure request was successful
# Parse XML
root = ET.fromstring(response.content)
ns = {
"wmts": "https://www.opengis.net/wmts/1.0",
"ows": "https://www.opengis.net/ows/1.1",
"xlink": "https://www.w3.org/1999/xlink",
}
# Use a robust XPath to find all 'Layer' elements anywhere in the document.
# This is less brittle than specifying the full path.
layers = root.findall(".//wmts:Contents/wmts:Layer", ns)
layer_data = []
for layer in layers:
title = layer.find("ows:Title", ns)
identifier = layer.find("ows:Identifier", ns)
resource = layer.find("wmts:ResourceURL", ns) # Tile URL template
title_text = title.text if title is not None else "N/A"
identifier_text = identifier.text if identifier is not None else "N/A"
url_template = resource.get("template") if resource is not None else "N/A"
layer_data.append({"Title": title_text, "ResourceURL_Template": url_template})
wayback_df = pd.DataFrame(layer_data)
wayback_df["date"] = pd.to_datetime(wayback_df["Title"].str.extract(r"(\d{4}-\d{2}-\d{2})").squeeze(), errors="coerce")
wayback_df.set_index("date", inplace=True)
return wayback_df.sort_index(ascending=False)
except Exception as e:
print(f"Could not fetch or parse Wayback data: {e}")
return pd.DataFrame()
def get_dem_slope_maps(ee_geometry, map_bounds, wayback_url, wayback_title, zoom=12,):
"""Creates DEM and Slope maps from SRTM data, using wayback tiles as a basemap if available."""
print(wayback_url, wayback_title)
# --- DEM Map ---
dem_map = gee_folium.Map(zoom_start=zoom)
if wayback_url:
dem_map.add_tile_layer(url=wayback_url, name=wayback_title, attribution="Esri")
dem_map_html = "
No DEM data available for this area.
"
try:
dem_layer = ee.Image("USGS/SRTMGL1_003").resample("bilinear").reproject(crs="EPSG:4326", scale=30).clip(ee_geometry)
stats = dem_layer.reduceRegion(reducer=ee.Reducer.minMax(), geometry=ee_geometry, scale=30, maxPixels=1e9).getInfo()
print(stats)
if stats and stats.get('elevation_min') is not None:
min_val, max_val = stats['elevation_min'], stats['elevation_max']
vis_params = {"min": min_val, "max": max_val, "palette": ['#0000FF', '#00FF00', '#FFFF00', '#FF0000']}
dem_map.addLayer(dem_layer, vis_params, "Elevation")
dem_map.add_colorbar(vis_params=vis_params, label="Elevation (m)")
dem_map.addLayerControl()
dem_map.fit_bounds(map_bounds, padding=(10, 10))
dem_map_html = dem_map._repr_html_()
except Exception as e:
print(f"Error creating DEM map: {e}")
dem_map_html = f"Error creating DEM map: {e}
"
# ---Slope Map --- #
slope_map = gee_folium.Map(zoom_start=zoom)
if wayback_url:
slope_map.add_tile_layer(url=wayback_url, name=wayback_title, attribution="Esri")
slope_map_html = "No Slope data available for this area.
"
try:
dem_for_slope = ee.Image("USGS/SRTMGL1_003")
# Calculate slope. The result is an image with slope values in degrees.
slope_layer = ee.Terrain.slope(dem_for_slope).clip(ee_geometry)
stats = slope_layer.reduceRegion(reducer=ee.Reducer.minMax(), geometry=ee_geometry, scale=30, maxPixels=1e9).getInfo()
print(stats)
if stats and stats.get('slope_min') is not None:
min_val, max_val = stats['slope_min'], stats['slope_max']
vis_params = {"min": min_val, "max": max_val, "palette": ['#0000FF', '#00FF00', '#FFFF00', '#FF0000']}
slope_map.addLayer(slope_layer, vis_params, "Slope")
slope_map.add_colorbar(vis_params=vis_params, label="Slope (degrees)")
slope_map.addLayerControl()
slope_map.fit_bounds(map_bounds, padding=(10, 10))
slope_map_html = slope_map._repr_html_()
except Exception as e:
print(f"Error creating Slope map: {e}")
slope_map_html = f"Error creating Slope map: {e}
"
return dem_map_html, slope_map_html
def add_indices(image, nir_band, red_band, blue_band, green_band, swir_band, swir2_band, evi_vars):
"""Calculates and adds multiple vegetation indices to an Earth Engine image."""
nir = image.select(nir_band).divide(10000)
red = image.select(red_band).divide(10000)
blue = image.select(blue_band).divide(10000)
green = image.select(green_band).divide(10000)
swir = image.select(swir_band).divide(10000)
swir2 = image.select(swir2_band).divide(10000)
# Previously existing indices
ndvi = image.normalizedDifference([nir_band, red_band]).rename('NDVI')
evi = image.expression(
'G * ((NIR - RED) / (NIR + C1 * RED - C2 * BLUE + L))', {
'NIR': nir, 'RED': red, 'BLUE': blue,
'G': evi_vars['G'], 'C1': evi_vars['C1'], 'C2': evi_vars['C2'], 'L': evi_vars['L']
}).rename('EVI')
evi2 = image.expression(
'G * (NIR - RED) / (NIR + L + C * RED)', {
'NIR': nir, 'RED': red,
'G': evi_vars['G'], 'L': evi_vars['L'], 'C': evi_vars['C']
}).rename('EVI2')
try:
table = ee.FeatureCollection('projects/in793-aq-nb-24330048/assets/cleanedVDI').select(
["B2", "B4", "B8", "cVDI"], ["Blue", "Red", "NIR", 'cVDI'])
classifier = ee.Classifier.smileRandomForest(50).train(
features=table, classProperty='cVDI', inputProperties=['Blue', 'Red', 'NIR'])
rf = image.classify(classifier).multiply(ee.Number(0.2)).add(ee.Number(0.1)).rename('RandomForest')
except Exception as e:
print(f"Random Forest calculation failed: {e}")
rf = ee.Image.constant(0).rename('RandomForest')
ci = image.expression(
'(-3.98 * (BLUE/NIR) + 12.54 * (GREEN/NIR) - 5.49 * (RED/NIR) - 0.19) / ' +
'(-21.87 * (BLUE/NIR) + 12.4 * (GREEN/NIR) + 19.98 * (RED/NIR) + 1) * 2.29', {
'NIR': nir, 'RED': red, 'BLUE': blue, 'GREEN': green
}).clamp(0, 1).rename('CI')
gujvdi = image.expression(
'0.5 * (NIR - RED) / (NIR + 6 * RED - 8.25 * BLUE - 0.01)', {
'NIR': nir, 'RED': red, 'BLUE': blue
}).rename('GujVDI')
mndwi = image.normalizedDifference([green_band, swir_band]).rename('MNDWI')
# Newly added indices
savi = image.expression('(1 + L) * (NIR - RED) / (NIR + RED + L)', {
'NIR': nir, 'RED': red, 'L': 0.5
}).rename('SAVI')
mvi = image.expression('(NIR - (GREEN + SWIR)) / (NIR + (GREEN + SWIR))', {
'NIR': nir, 'GREEN': green, 'SWIR': swir
}).rename('MVI')
nbr = image.normalizedDifference([nir_band, swir2_band]).rename('NBR')
gci = image.expression('(NIR - GREEN) / GREEN', {
'NIR': nir, 'GREEN': green
}).rename('GCI')
return image.addBands([ndvi, evi, evi2, rf, ci, gujvdi, mndwi, savi, mvi, nbr, gci])
# --- Gradio App Logic ---
# Initialize GEE and fetch wayback data once at the start
one_time_setup()
WAYBACK_DF = get_wayback_data()
def process_and_display(file_obj, url_str, buffer_m, progress=gr.Progress()):
"""Main function to process the uploaded file or URL and generate initial outputs."""
if file_obj is None and not (url_str and url_str.strip()):
return None, "Please upload a file or provide a URL.", None, None, None, None, None
progress(0, desc="Reading and processing geometry...")
try:
input_gdf = get_gdf_from_file(file_obj) if file_obj is not None else get_gdf_from_url(url_str)
input_gdf = preprocess_gdf(input_gdf)
geometry_gdf = next((input_gdf.iloc[[i]] for i in range(len(input_gdf)) if is_valid_polygon(input_gdf.iloc[[i]])), None)
if geometry_gdf is None:
return None, "No valid polygon found in the provided file.", None, None, None, None, None
geometry_gdf = to_best_crs(geometry_gdf)
outer_geometry_gdf = geometry_gdf.copy()
outer_geometry_gdf["geometry"] = outer_geometry_gdf["geometry"].buffer(buffer_m)
buffer_geometry_gdf = gpd.GeoDataFrame(
geometry=[outer_geometry_gdf.unary_union.difference(geometry_gdf.unary_union)],
crs=geometry_gdf.crs
)
except Exception as e:
return None, f"Error processing file: {e}", None, None, None, None, None
progress(0.5, desc="Generating maps and stats...")
m = folium.Map()
wayback_url = None
wayback_title = "Esri Satellite"
if not WAYBACK_DF.empty:
latest_item = WAYBACK_DF.iloc[0]
wayback_title = f"Esri Wayback ({latest_item.name.strftime('%Y-%m-%d')})"
wayback_url = (
latest_item["ResourceURL_Template"]
.replace("{TileMatrixSet}", "GoogleMapsCompatible")
.replace("{TileMatrix}", "{z}")
.replace("{TileRow}", "{y}")
.replace("{TileCol}", "{x}")
)
folium.TileLayer(tiles=wayback_url, attr="Esri", name=wayback_title).add_to(m)
m = add_geometry_to_map(m, geometry_gdf, buffer_geometry_gdf, opacity=0.3)
bounds = geometry_gdf.to_crs(epsg=4326).total_bounds
map_bounds = [[bounds[1], bounds[0]], [bounds[3], bounds[2]]]
m.fit_bounds(map_bounds, padding=(10, 10))
folium.LayerControl().add_to(m)
ee_geometry = ee.Geometry(json.loads(geometry_gdf.to_crs(4326).to_json())['features'][0]['geometry'])
dem_html, slope_html = get_dem_slope_maps(ee_geometry, map_bounds, wayback_url=wayback_url, wayback_title=wayback_title)
stats_df = pd.DataFrame({
"Area (ha)": [f"{geometry_gdf.area.item() / 10000:.2f}"],
"Perimeter (m)": [f"{geometry_gdf.length.item():.2f}"],
"Centroid (Lat, Lon)": [f"({geometry_gdf.to_crs(4326).centroid.y.iloc[0]:.6f}, {geometry_gdf.to_crs(4326).centroid.x.iloc[0]:.6f})"]
})
geometry_json = geometry_gdf.to_json()
buffer_geometry_json = buffer_geometry_gdf.to_json()
progress(1, desc="Done!")
return m._repr_html_(), None, stats_df, dem_html, slope_html, geometry_json, buffer_geometry_json
@app.get("/api/geometry")
def calculate_geometry_metrics(file_url: str):
"""
Accepts a URL to a KML/GeoJSON file, calculates the area and
perimeter of the first valid polygon, and returns the results
in a CSV format compatible with Google Sheets' IMPORTDATA.
"""
try:
decoded_url = unquote(file_url)
input_gdf = get_gdf_from_url(decoded_url)
if input_gdf is None or input_gdf.empty:
raise ValueError("Could not read geometry from the provided URL.")
input_gdf = preprocess_gdf(input_gdf)
geometry_gdf = next((input_gdf.iloc[[i]] for i in range(len(input_gdf)) if is_valid_polygon(input_gdf.iloc[[i]])), None)
if geometry_gdf is None:
raise ValueError("No valid polygon found in the provided file.")
projected_gdf = to_best_crs(geometry_gdf)
area_hectares = projected_gdf.area.item() / 10000
perimeter_meters = projected_gdf.length.item()
centroid_gdf = projected_gdf.to_crs(epsg=4326)
centroid_point = centroid_gdf.centroid.item()
data_row = (
f"area_hectares, {round(area_hectares, 4)},"
f"perimeter_meters, {round(perimeter_meters, 4)},"
f"latitude, {round(centroid_point.y, 4)},"
f"longitude, {round(centroid_point.x, 4)}"
)
csv_output = f"{data_row}"
return Response(content=csv_output, media_type="text/csv")
except ValueError as e:
# Handle specific errors with a 400 Bad Request
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# Handle any other unexpected errors with a 500
print(f"An unexpected error occurred in /api/geometry: {e}")
raise HTTPException(status_code=500, detail="An unexpected server error occurred.")
def calculate_indices(
geometry_json, buffer_geometry_json, veg_indices, evi_vars, date_range,
min_year, max_year, progress=gr.Progress()
):
"""Calculates vegetation indices based on user inputs."""
one_time_setup()
if not all([geometry_json, buffer_geometry_json, veg_indices]):
return "Please process a file and select at least one index first.", None, None, None
try:
geometry_gdf = gpd.read_file(geometry_json)
buffer_geometry_gdf = gpd.read_file(buffer_geometry_json)
ee_geometry = ee.Geometry(json.loads(geometry_gdf.to_crs(4326).to_json())['features'][0]['geometry'])
buffer_ee_geometry = ee.Geometry(json.loads(buffer_geometry_gdf.to_crs(4326).to_json())['features'][0]['geometry'])
start_day, start_month = date_range[0].day, date_range[0].month
end_day, end_month = date_range[1].day, date_range[1].month
dates = [
(f"{year}-{start_month:02d}-{start_day:02d}", f"{year}-{end_month:02d}-{end_day:02d}")
for year in range(min_year, max_year + 1)
]
collection = (
ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.select(
["B2", "B3", "B4", "B8", "B11", "B12", "MSK_CLDPRB"],
["Blue", "Green", "Red", "NIR", "SWIR", "SWIR2", "MSK_CLDPRB"]
)
.map(lambda img: add_indices(img, 'NIR', 'Red', 'Blue', 'Green', 'SWIR', 'SWIR2', evi_vars))
)
result_rows = []
for i, (start_date, end_date) in enumerate(dates):
progress((i + 1) / len(dates), desc=f"Processing {start_date} to {end_date}")
filtered_collection = collection.filterDate(start_date, end_date).filterBounds(ee_geometry)
if filtered_collection.size().getInfo() == 0:
continue
year_val = int(start_date.split('-')[0])
row = {'Year': year_val, 'Date Range': f"{start_date}_to_{end_date}"}
for veg_index in veg_indices:
mosaic = filtered_collection.qualityMosaic(veg_index)
mean_val = mosaic.reduceRegion(reducer=ee.Reducer.mean(), geometry=ee_geometry, scale=10, maxPixels=1e9).get(veg_index).getInfo()
buffer_mean_val = mosaic.reduceRegion(reducer=ee.Reducer.mean(), geometry=buffer_ee_geometry, scale=10, maxPixels=1e9).get(veg_index).getInfo()
row[veg_index] = mean_val
row[f"{veg_index}_buffer"] = buffer_mean_val
row[f"{veg_index}_ratio"] = (mean_val / buffer_mean_val) if buffer_mean_val and buffer_mean_val != 0 else np.nan
result_rows.append(row)
if not result_rows:
return "No satellite imagery found for the selected dates.", None, None, None
result_df = pd.DataFrame(result_rows)
result_df = result_df.round(3)
plots = []
for veg_index in veg_indices:
plot_cols = [veg_index, f"{veg_index}_buffer", f"{veg_index}_ratio"]
existing_plot_cols = [col for col in plot_cols if col in result_df.columns]
plot_df = result_df[['Year'] + existing_plot_cols].dropna()
if not plot_df.empty:
fig = px.line(plot_df, x='Year', y=existing_plot_cols, markers=True, title=f"{veg_index} Time Series")
fig.update_layout(xaxis_title="Year", yaxis_title="Index Value")
fig.update_xaxes(dtick=1)
plots.append(fig)
return None, result_df, plots, "Calculation complete."
except Exception as e:
import traceback
traceback.print_exc()
return f"An error occurred during calculation: {e}", None, None, None
def get_histogram(index_name, image, geometry, bins):
"""Calculates the histogram for an image within a given geometry using GEE."""
try:
# Request histogram data from Earth Engine
hist_info = image.reduceRegion(
reducer=ee.Reducer.fixedHistogram(min=bins[0], max=bins[-1], steps=len(bins)-1),
geometry=geometry,
scale=10, # Scale in meters appropriate for Sentinel-2
maxPixels=1e9
).get(index_name).getInfo()
# Extract histogram counts
if hist_info:
histogram = [item[1] for item in hist_info]
return np.array(histogram), bins
else:
# Return empty histogram if no data
return np.array([0] * (len(bins) - 1)), bins
except Exception as e:
print(f"Could not compute histogram for {index_name}: {e}")
return np.array([0] * (len(bins) - 1)), bins
def generate_comparison_maps(geometry_json, selected_index, selected_years, evi_vars, date_start_str, date_end_str, progress=gr.Progress()):
"""Generates side-by-side maps for a selected index and two selected years with a custom HTML legend."""
if not geometry_json or not selected_index or not selected_years:
return "Please process a file and select an index and years first.", "", ""
if len(selected_years) != 2:
return "Please select exactly two years to compare.", "", ""
one_time_setup()
geometry_gdf = gpd.read_file(geometry_json).to_crs(4326)
ee_geometry = ee.Geometry(json.loads(geometry_gdf.to_json())['features'][0]['geometry'])
bounds = geometry_gdf.total_bounds
map_bounds = [[bounds[1], bounds[0]], [bounds[3], bounds[2]]]
start_month, start_day = map(int, date_start_str.split('-'))
end_month, end_day = map(int, date_end_str.split('-'))
maps_html = []
for i, year in enumerate(selected_years):
progress((i + 1) / 2, desc=f"Generating map for {year}")
start_date = f"{year}-{start_month:02d}-{start_day:02d}"
end_date = f"{year}-{end_month:02d}-{end_day:02d}"
wayback_url = None
wayback_title = "Default Satellite"
if not WAYBACK_DF.empty:
try:
target_date = datetime(int(year), start_month, 15)
nearest_idx = WAYBACK_DF.index.get_indexer([target_date], method='nearest')[0]
wayback_item = WAYBACK_DF.iloc[nearest_idx]
wayback_title = f"Esri Wayback ({wayback_item.name.strftime('%Y-%m-%d')})"
wayback_url = (
wayback_item["ResourceURL_Template"]
.replace("{TileMatrixSet}", "GoogleMapsCompatible")
.replace("{TileMatrix}", "{z}")
.replace("{TileRow}", "{y}")
.replace("{TileCol}", "{x}")
)
except Exception as e:
print(f"Could not find a suitable Wayback basemap for {year}: {e}")
wayback_url = None
wayback_title = "Default Satellite"
collection = (
ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.filterDate(start_date, end_date)
.filterBounds(ee_geometry)
.select(
["B2", "B3", "B4", "B8", "B11", "B12", "MSK_CLDPRB"],
["Blue", "Green", "Red", "NIR", "SWIR", "SWIR2", "MSK_CLDPRB"]
)
.map(lambda img: add_indices(img, 'NIR', 'Red', 'Blue', 'Green', 'SWIR', 'SWIR2', evi_vars))
)
if collection.size().getInfo() == 0:
maps_html.append(f"No data found for {year}.
")
continue
mosaic = collection.qualityMosaic(selected_index)
m = gee_folium.Map(zoom_start=14)
if wayback_url:
m.add_tile_layer(wayback_url, name=wayback_title, attribution="Esri")
else:
m.add_basemap("SATELLITE")
if selected_index in ["NDVI", "RandomForest", "GujVDI", "CI", "EVI", "EVI2"]:
bins = [0, 0.2, 0.4, 0.6, 0.8, 1]
histogram, _ = get_histogram(selected_index, mosaic.select(selected_index), ee_geometry, bins)
total_pix = np.sum(histogram)
formatted_histogram = ["0.00"] * len(histogram)
if total_pix > 0:
formatted_histogram = [f"{h*100/total_pix:.2f}" for h in histogram]
# Define visualization parameters for the classified layer
ind_vis_params = {
"min": 0, "max": 1,
"palette": ["#FF0000", "#FFFF00", "#FFA500", "#00FE00", "#00A400"],
}
m.addLayer(mosaic.select(selected_index).clip(ee_geometry), ind_vis_params, f"{selected_index} Classified Layer ({year})")
legend_items = {
f"0-0.2: Open/Sparse Vegetation ({formatted_histogram[0]}%)": "#FF0000",
f"0.2-0.4: Low Vegetation ({formatted_histogram[1]}%)": "#FFFF00",
f"0.4-0.6: Moderate Vegetation ({formatted_histogram[2]}%)": "#FFA500",
f"0.6-0.8: Dense Vegetation ({formatted_histogram[3]}%)": "#00FE00",
f"0.8-1: Very Dense Vegetation ({formatted_histogram[4]}%)": "#00A400",
}
legend_html = f'''
{selected_index} Classification ({year})
'''
for text, color in legend_items.items():
legend_html += f'- {text}
'
legend_html += "
"
# Add the raw HTML to the map
m.get_root().html.add_child(folium.Element(legend_html))
else: # Fallback for indices without a color bar
vis_params = {"min": 0.0, "max": 1.0, "palette": ['FFFFFF', 'CE7E45', 'DF923D', 'F1B555', 'FCD163', '99B718', '74A901', '66A000', '529400', '3E8601', '207401', '056201', '004C00', '023B01', '012E01', '011D01', '011301']}
clipped_image = mosaic.select(selected_index).clip(ee_geometry)
m.addLayer(clipped_image, vis_params, f"{selected_index} {year}")
m.add_colorbar(vis_params=vis_params, label=f"{selected_index} Value")
folium.GeoJson(geometry_gdf, name="Geometry", style_function=lambda x: {"color": "yellow", "fillOpacity": 0, "weight": 2.5}).add_to(m)
m.fit_bounds(map_bounds, padding=(10, 10))
m.addLayerControl()
maps_html.append(m._repr_html_())
while len(maps_html) < 2:
maps_html.append("")
return f"Comparison generated for {selected_years[0]} and {selected_years[1]}.", maps_html[0], maps_html[1]
# --- Gradio Interface ---
theme = gr.themes.Soft(primary_hue="teal", secondary_hue="orange").set(
background_fill_primary="white"
)
with gr.Blocks(theme=theme, title="Kamlan: KML Analyzer") as demo:
# Hidden state to store data between steps
geometry_data = gr.State()
buffer_geometry_data = gr.State()
timeseries_df_state = gr.State()
gr.HTML("""
Kamlan: KML Analyzer
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 1. Provide Input Geometry")
gr.Markdown("Use either file upload OR a URL.")
file_input = gr.File(label="Upload KML/GeoJSON File", file_types=[".kml", ".geojson"])
url_input = gr.Textbox(label="Or Provide File URL", placeholder="e.g., https://.../my_file.kml")
buffer_input = gr.Number(label="Buffer (meters)", value=50)
process_button = gr.Button("Process Input", variant="primary")
with gr.Accordion("Advanced Settings", open=False):
gr.Markdown("### Select Vegetation Indices")
all_veg_indices = ["GujVDI", "NDVI", "EVI", "EVI2", "RandomForest", "CI", "MNDWI", "SAVI", "MVI", "NBR", "GCI"]
veg_indices_checkboxes = gr.CheckboxGroup(all_veg_indices, label="Indices", value=["NDVI"])
gr.Markdown("### EVI/EVI2 Parameters")
with gr.Row():
evi_g = gr.Number(label="G", value=2.5)
evi_c1 = gr.Number(label="C1", value=6.0)
evi_c2 = gr.Number(label="C2", value=7.5)
with gr.Row():
evi_l = gr.Number(label="L", value=1.0)
evi_c = gr.Number(label="C", value=2.4)
gr.Markdown("### Date Range")
today = datetime.now()
date_start_input = gr.Textbox(label="Start Date (MM-DD)", value="11-15")
date_end_input = gr.Textbox(label="End Date (MM-DD)", value="12-15")
with gr.Row():
min_year_input = gr.Number(label="Start Year", value=2019, precision=0)
max_year_input = gr.Number(label="End Year", value=today.year, precision=0)
calculate_button = gr.Button("Calculate Vegetation Indices", variant="primary")
with gr.Column(scale=2):
gr.Markdown("## 2. Results")
info_box = gr.Textbox(label="Status", interactive=False, placeholder="Status messages will appear here...")
map_output = gr.HTML(label="Map View")
stats_output = gr.DataFrame(label="Geometry Metrics")
gr.Markdown("### Digital Elevation Model (DEM) and Slope")
with gr.Row():
dem_map_output = gr.HTML(label="DEM Map")
slope_map_output = gr.HTML(label="Slope Map")
with gr.Tabs():
with gr.TabItem("Time Series Plot"):
plot_output = gr.Plot(label="Time Series Plot")
with gr.TabItem("Time Series Data"):
timeseries_table = gr.DataFrame(label="Time Series Data")
gr.Markdown("---") # Visual separator
gr.Markdown("## 3. Year-over-Year Index Comparison")
with gr.Row():
comparison_index_select = gr.Radio(all_veg_indices, label="Select Index for Comparison", value="NDVI")
comparison_years_select = gr.CheckboxGroup(label="Select Two Years to Compare", choices=[])
compare_button = gr.Button("Generate Comparison Maps", variant="secondary")
with gr.Row():
map_year_1_output = gr.HTML(label="Comparison Map 1")
map_year_2_output = gr.HTML(label="Comparison Map 2")
# --- Event Handlers ---
def process_on_load(request: gr.Request):
"""Checks for a 'file_url' query parameter when the app loads."""
return request.query_params.get("file_url", "")
demo.load(process_on_load, None, url_input)
process_button.click(
fn=process_and_display,
inputs=[file_input, url_input, buffer_input],
outputs=[map_output, info_box, stats_output, dem_map_output, slope_map_output, geometry_data, buffer_geometry_data]
)
def calculate_wrapper(geometry_json, buffer_json, veg_indices,
g, c1, c2, l, c, start_date_str, end_date_str,
min_year, max_year, progress=gr.Progress()):
"""Wrapper to parse inputs and handle outputs for the main calculation function."""
try:
evi_vars = {'G': g, 'C1': c1, 'C2': c2, 'L': l, 'C': c}
start_month, start_day = map(int, start_date_str.split('-'))
end_month, end_day = map(int, end_date_str.split('-'))
date_range = (datetime(2000, start_month, start_day), datetime(2000, end_month, end_day))
error_msg, df, plots, success_msg = calculate_indices(
geometry_json, buffer_json, veg_indices,
evi_vars, date_range, int(min_year), int(max_year), progress
)
status_message = error_msg or success_msg
first_plot = plots[0] if plots else None
df_display = df.round(3) if df is not None else None
available_years = []
if df is not None and 'Year' in df.columns:
available_years = sorted(df['Year'].unique().tolist())
return status_message, df_display, df, first_plot, gr.update(choices=available_years, value=[])
except Exception as e:
return f"An error occurred in the wrapper: {e}", None, None, None, gr.update(choices=[], value=[])
calculate_button.click(
fn=calculate_wrapper,
inputs=[
geometry_data, buffer_geometry_data, veg_indices_checkboxes,
evi_g, evi_c1, evi_c2, evi_l, evi_c,
date_start_input, date_end_input,
min_year_input, max_year_input
],
outputs=[info_box, timeseries_table, timeseries_df_state, plot_output, comparison_years_select]
)
def comparison_wrapper(geometry_json, selected_index, selected_years, g, c1, c2, l, c, start_date_str, end_date_str, progress=gr.Progress()):
"""Wrapper for the comparison map generation."""
try:
evi_vars = {'G': g, 'C1': c1, 'C2': c2, 'L': l, 'C': c}
status, map1, map2 = generate_comparison_maps(
geometry_json, selected_index, selected_years, evi_vars,
start_date_str, end_date_str, progress
)
return status, map1, map2
except Exception as e:
return f"Error during comparison: {e}", "", ""
compare_button.click(
fn=comparison_wrapper,
inputs=[
geometry_data, comparison_index_select, comparison_years_select,
evi_g, evi_c1, evi_c2, evi_l, evi_c,
date_start_input, date_end_input
],
outputs=[info_box, map_year_1_output, map_year_2_output]
)
gr.HTML("""
""")
def compute_index_histogram(file_url: str, index_name: str, start_date: str, end_date: str):
"""Computes histogram percentage for a vegetation index within a polygon and date range."""
one_time_setup()
decoded_url = unquote(file_url)
gdf = get_gdf_from_url(decoded_url)
gdf = preprocess_gdf(gdf)
geometry_gdf = next((gdf.iloc[[i]] for i in range(len(gdf)) if is_valid_polygon(gdf.iloc[[i]])), None)
if geometry_gdf is None:
raise HTTPException(status_code=400, detail="No valid polygon found.")
ee_geometry = ee.Geometry(json.loads(geometry_gdf.to_crs(4326).to_json())['features'][0]['geometry'])
evi_vars = {'G': 2.5, 'C1': 6.0, 'C2': 7.5, 'L': 1.0, 'C': 2.4}
collection = (
ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.filterDate(start_date, end_date)
.select(
["B2", "B3", "B4", "B8", "B11", "B12", "MSK_CLDPRB"],
["Blue", "Green", "Red", "NIR", "SWIR", "SWIR2", "MSK_CLDPRB"]
)
.map(lambda img: add_indices(img, 'NIR', 'Red', 'Blue', 'Green', 'SWIR', 'SWIR2', evi_vars))
.filterBounds(ee_geometry)
)
if collection.size().getInfo() == 0:
raise HTTPException(status_code=404, detail="No imagery found for the polygon in given date range.")
# Use median composite to avoid cloud spikes
# mosaic = collection.median()
mosaic = collection.qualityMosaic(index_name)
mean_val = mosaic.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=ee_geometry,
scale=10,
maxPixels=1e9
).get(index_name).getInfo()
if mean_val is None:
raise HTTPException(status_code=500, detail=f"Failed to compute mean for {index_name}.")
bins = [0, 0.2, 0.4, 0.6, 0.8, 1.0]
histogram, _ = get_histogram(index_name, mosaic.select(index_name), ee_geometry, bins)
total = histogram.sum()
# result = {f'mean': mean_val}
# for i in range(len(bins) - 1):
# label = f"{bins[i]}-{bins[i+1]}"
# pct = (histogram[i] / total * 100) if total > 0 else 0
# result[label] = round(pct, 2)
output_parts = [f'mean, {round(mean_val, 4)}']
for i in range(len(bins) - 1):
label = f"{bins[i]}-{bins[i+1]}"
pct = (histogram[i] / total * 100) if total > 0 else 0
# Append each key-value pair string to the list
output_parts.append(f'{label}, {round(pct, 2)}')
# Join all parts into a single comma-separated string
final_string = ", ".join(output_parts)
# Return the string as a plain text response
return Response(content=final_string, media_type="text/csv")
# Register endpoints for multiple indices with date range
for idx in ["NDVI", "EVI", "EVI2", "RandomForest", "CI", "GujVDI", "MNDWI", "SAVI", "MVI", "NBR", "GCI"]:
endpoint_path = f"/api/{idx}"
async def index_hist_endpoint(file_url: str, start_date: str, end_date: str, index_name=idx):
return compute_index_histogram(file_url, index_name, start_date, end_date)
app.get(endpoint_path)(index_hist_endpoint)
# Mount the Gradio app onto the FastAPI app
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)