42Cummer commited on
Commit
0170ac5
·
verified ·
1 Parent(s): cdcab2c

Upload 9 files

Browse files
Files changed (10) hide show
  1. .gitattributes +1 -0
  2. Dockerfile +29 -0
  3. README.md +2 -11
  4. api/bus_cache.py +142 -0
  5. api/update_static.py +59 -0
  6. api/utils.py +30 -0
  7. requirements.txt +7 -0
  8. src/app.py +321 -0
  9. src/db_manager.py +145 -0
  10. src/ttc_gtfs.duckdb +3 -0
.gitattributes CHANGED
@@ -33,3 +33,4 @@ saved_model/**/* filter=lfs diff=lfs merge=lfs -text
33
  *.zip filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
 
 
33
  *.zip filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
36
+ src/ttc_gtfs.duckdb filter=lfs diff=lfs merge=lfs -text
Dockerfile ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Use a slim Python image for a smaller footprint
2
+ FROM python:3.11-slim
3
+
4
+ # Set environment variables
5
+ ENV PYTHONUNBUFFERED=1 \
6
+ PYTHONDONTWRITEBYTECODE=1 \
7
+ PORT=7860
8
+
9
+ # Create a non-root user (Hugging Face requirement)
10
+ RUN useradd -m -u 1000 user
11
+ USER user
12
+ ENV HOME=/home/user \
13
+ PATH=/home/user/.local/bin:$PATH
14
+
15
+ WORKDIR $HOME/app
16
+
17
+ # Copy requirements first to leverage Docker cache
18
+ COPY --chown=user requirements.txt .
19
+ RUN pip install --no-cache-dir -r requirements.txt
20
+
21
+ # Copy the rest of your code and the database
22
+ # Ensure ttc_gtfs.duckdb is in your root folder
23
+ COPY --chown=user . .
24
+
25
+ # Hugging Face Spaces expects port 7860
26
+ EXPOSE 7860
27
+
28
+ # Run uvicorn
29
+ CMD ["uvicorn", "src.app:app", "--host", "0.0.0.0", "--port", "7860"]
README.md CHANGED
@@ -1,11 +1,2 @@
1
- ---
2
- title: WMB2Backened
3
- emoji: 📚
4
- colorFrom: green
5
- colorTo: pink
6
- sdk: docker
7
- pinned: false
8
- short_description: Backend for WheresMyBus2
9
- ---
10
-
11
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
1
+ # WMB2Backend
2
+ Updated backend for WheresMyBus2.0
 
 
 
 
 
 
 
 
 
api/bus_cache.py ADDED
@@ -0,0 +1,142 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import httpx # type: ignore
3
+ import asyncio # type: ignore
4
+ from google.transit import gtfs_realtime_pb2 # type: ignore
5
+ from dotenv import load_dotenv # type: ignore
6
+ import os # type: ignore
7
+
8
+ load_dotenv()
9
+
10
+ TTC_VEHICLES_URL = os.getenv("GTFS_RT_URL")
11
+ TTC_TRIPS_URL = os.getenv("GTFS_DELAY_URL")
12
+ # Support both GTFS_ALERT_URL and GTFS_ALERTS_URL for compatibility
13
+ TTC_ALERTS_URL = os.getenv("GTFS_ALERTS_URL") or os.getenv("GTFS_ALERT_URL")
14
+
15
+ if not TTC_VEHICLES_URL:
16
+ raise ValueError("GTFS_RT_URL is not set")
17
+ if not TTC_TRIPS_URL:
18
+ raise ValueError("GTFS_DELAY_URL is not set")
19
+ if not TTC_ALERTS_URL:
20
+ raise ValueError("GTFS_ALERTS_URL or GTFS_ALERT_URL is not set")
21
+
22
+ CAUSE_MAP = {1: "Weather", 2: "Holiday", 4: "Accident", 7: "Technical Problem", 11: "Police Activity"}
23
+ EFFECT_MAP = {1: "No Service", 3: "Significant Delays", 4: "Detour", 8: "Unknown Effect"}
24
+
25
+ class AsyncBusCache:
26
+ def __init__(self, ttl=20):
27
+ self.ttl = ttl
28
+ self._data = None
29
+ self._last_updated = 0
30
+
31
+ async def get_data(self):
32
+ if self._data and (time.time() - self._last_updated) < self.ttl:
33
+ return self._data
34
+ return await self._refresh()
35
+
36
+ async def _refresh(self):
37
+ try:
38
+ async with httpx.AsyncClient() as client:
39
+ # 1. Fetch ALL feeds at once
40
+ v_res, t_res, a_res = await asyncio.gather(
41
+ client.get(TTC_VEHICLES_URL, timeout=10),
42
+ client.get(TTC_TRIPS_URL, timeout=10),
43
+ client.get(TTC_ALERTS_URL, timeout=10) # The new alerts feed
44
+ )
45
+
46
+ # 2. Parse Predictions (Store ALL future stops)
47
+ t_feed = gtfs_realtime_pb2.FeedMessage()
48
+ t_feed.ParseFromString(t_res.content)
49
+
50
+ # Map: { "trip_id": { "stop_id_1": time, "stop_id_2": time, ... } }
51
+ prediction_map = {}
52
+ for entity in t_feed.entity:
53
+ if entity.HasField('trip_update'):
54
+ tu = entity.trip_update
55
+ trip_id = str(tu.trip.trip_id)
56
+
57
+ # Store every stop in the remainder of the trip
58
+ prediction_map[trip_id] = {
59
+ str(stu.stop_id): (stu.departure.time if stu.HasField('departure') else stu.arrival.time)
60
+ for stu in tu.stop_time_update
61
+ }
62
+
63
+ # 3. Parse Vehicle Positions
64
+ v_feed = gtfs_realtime_pb2.FeedMessage()
65
+ v_feed.ParseFromString(v_res.content)
66
+
67
+ processed_buses = []
68
+ for entity in v_feed.entity:
69
+ if entity.HasField('vehicle'):
70
+ v = entity.vehicle
71
+ t_id = str(v.trip.trip_id)
72
+
73
+ # Get all predictions for this trip
74
+ trip_predictions = prediction_map.get(t_id, {})
75
+
76
+ # Get the first stop (next stop) for backward compatibility
77
+ next_stop_id = None
78
+ predicted_time = None
79
+ if trip_predictions:
80
+ # Get the first stop in the predictions (sorted by time)
81
+ sorted_stops = sorted(trip_predictions.items(), key=lambda x: x[1])
82
+ if sorted_stops:
83
+ next_stop_id, predicted_time = sorted_stops[0]
84
+
85
+ processed_buses.append({
86
+ "id": v.vehicle.id,
87
+ "route": v.trip.route_id,
88
+ "trip_id": t_id,
89
+ "lat": round(v.position.latitude, 6),
90
+ "lon": round(v.position.longitude, 6),
91
+ "occupancy": v.occupancy_status,
92
+ "next_stop_id": next_stop_id,
93
+ "predicted_time": predicted_time,
94
+ "predictions": trip_predictions # Store all predictions
95
+ })
96
+
97
+ # 4. Parse Alerts
98
+ a_feed = gtfs_realtime_pb2.FeedMessage()
99
+ a_feed.ParseFromString(a_res.content)
100
+
101
+ # Mapping: { "route_id": [ {header, description, effect}, ... ] }
102
+ route_alerts = {}
103
+ for entity in a_feed.entity:
104
+ if entity.HasField('alert'):
105
+ alert = entity.alert
106
+
107
+ # Extract English translations
108
+ header = next((t.text for t in alert.header_text.translation if t.language == "en"), "No Header")
109
+ description = next((t.text for t in alert.description_text.translation if t.language == "en"), "")
110
+
111
+ # Map cause and effect codes to human-readable strings
112
+ cause_code = int(alert.cause) if alert.HasField('cause') else None
113
+ effect_code = int(alert.effect) if alert.HasField('effect') else None
114
+
115
+ alert_payload = {
116
+ "header": header,
117
+ "description": description,
118
+ "cause": CAUSE_MAP.get(cause_code, "Unknown") if cause_code is not None else "Unknown",
119
+ "effect": EFFECT_MAP.get(effect_code, "Unknown") if effect_code is not None else "Unknown",
120
+ "severity": "HIGH" if effect_code == 1 else "MEDIUM"
121
+ }
122
+
123
+ # Link alert to every route it mentions
124
+ for ie in alert.informed_entity:
125
+ if ie.HasField('route_id'):
126
+ rid = str(ie.route_id)
127
+ if rid not in route_alerts:
128
+ route_alerts[rid] = []
129
+ route_alerts[rid].append(alert_payload)
130
+
131
+ self._data = {
132
+ "vehicles": processed_buses,
133
+ "predictions": prediction_map,
134
+ "alerts": route_alerts # Add this to your cache data
135
+ }
136
+ self._last_updated = time.time()
137
+ print(f"--- Cache Refreshed: {len(processed_buses)} buses, {len(route_alerts)} routes with alerts ---")
138
+ return self._data
139
+
140
+ except Exception as e:
141
+ print(f"Async fetch failed: {e}")
142
+ return self._data if self._data else []
api/update_static.py ADDED
@@ -0,0 +1,59 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests # type: ignore
2
+ import zipfile # type: ignore
3
+ import io
4
+ import os # type: ignore
5
+ import shutil # type: ignore
6
+ from pathlib import Path # type: ignore
7
+
8
+ # Toronto Open Data CKAN API Constants
9
+ CKAN_BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca/api/3/action/package_show"
10
+ PACKAGE_ID = "merged-gtfs-ttc-routes-and-schedules"
11
+ STATIC_DIR = str(Path(__file__).parent.parent / "static")
12
+ DB_PATH = str(Path(__file__).parent.parent / "src" / "ttc_gtfs.duckdb")
13
+
14
+ def get_latest_gtfs_url():
15
+ """Queries the CKAN API to find the current download URL for the GTFS ZIP."""
16
+ params = {"id": PACKAGE_ID}
17
+ response = requests.get(CKAN_BASE_URL, params=params)
18
+ data = response.json()
19
+
20
+ # We look for the resource that is a ZIP file and contains 'GTFS' in its name
21
+ for resource in data["result"]["resources"]:
22
+ if resource["format"].lower() == "zip":
23
+ return resource["url"]
24
+ return None
25
+
26
+ def run_full_sync():
27
+ download_url = get_latest_gtfs_url()
28
+ if not download_url:
29
+ print("Could not find GTFS ZIP via API.")
30
+ return False
31
+
32
+ print(f"Found latest GTFS at: {download_url}")
33
+
34
+ # 1. Clear old files
35
+ if os.path.exists(STATIC_DIR):
36
+ print(f"Clearing existing static directory: {STATIC_DIR}")
37
+ shutil.rmtree(STATIC_DIR)
38
+ os.makedirs(STATIC_DIR)
39
+
40
+ # 2. Download and Extract
41
+ print("Downloading and extracting...")
42
+ r = requests.get(download_url)
43
+ print(f"Downloaded {len(r.content):,} bytes")
44
+ with zipfile.ZipFile(io.BytesIO(r.content)) as z:
45
+ file_list = z.namelist()
46
+ print(f"Extracting {len(file_list)} files to {STATIC_DIR}...")
47
+ z.extractall(STATIC_DIR)
48
+ print(f"✓ Extracted {len(os.listdir(STATIC_DIR))} files")
49
+
50
+ # 3. Force DB rebuild by deleting the old DuckDB file
51
+ if os.path.exists(DB_PATH):
52
+ print(f"Deleting old database: {DB_PATH}")
53
+ os.remove(DB_PATH)
54
+
55
+ print("✓ Sync complete. Database will rebuild on next API call.")
56
+ return True
57
+
58
+ if __name__ == "__main__":
59
+ run_full_sync()
api/utils.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime, date, time, timedelta
2
+
3
+ def hms_to_seconds(hms_str):
4
+ """Converts GTFS 'HH:MM:SS' (e.g. '25:30:00') to total seconds from midnight."""
5
+ h, m, s = map(int, hms_str.split(':'))
6
+ return (h * 3600) + (m * 60) + s
7
+
8
+ def get_service_day_start_ts():
9
+ """
10
+ Returns the Unix timestamp for 00:00:00 of the CURRENT service day.
11
+ TTC service day typically flips at 4:00 AM.
12
+ """
13
+ now = datetime.now()
14
+ # If it's 2 AM, we are still technically on 'yesterday's' schedule
15
+ if now.hour < 4:
16
+ service_date = date.today() - timedelta(days=1)
17
+ else:
18
+ service_date = date.today()
19
+
20
+ # Combine that date with 00:00:00 and get the timestamp
21
+ service_start_dt = datetime.combine(service_date, time.min)
22
+ return int(service_start_dt.timestamp())
23
+
24
+ def translate_occupancy(status):
25
+ """Maps GTFS occupancy enums to human readable strings."""
26
+ mapping = {
27
+ 0: "Empty", 1: "Many Seats Available", 2: "Few Seats Available",
28
+ 3: "No Seats Available", 5: "Full", 6: "Not In Service"
29
+ }
30
+ return mapping.get(status, "Full") # when in doubt assume the bus is full
requirements.txt ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ fastapi
2
+ uvicorn
3
+ duckdb
4
+ httpx
5
+ python-dotenv
6
+ gtfs-realtime-bindings
7
+ protobuf
src/app.py ADDED
@@ -0,0 +1,321 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime # type: ignore
2
+ import sys # type: ignore
3
+ from pathlib import Path
4
+
5
+ # Add parent directory to path to allow imports from api/
6
+ sys.path.insert(0, str(Path(__file__).parent.parent))
7
+ # Add src directory to path to allow imports from same directory
8
+ sys.path.insert(0, str(Path(__file__).parent))
9
+
10
+ from api.bus_cache import AsyncBusCache # type: ignore
11
+ from api.utils import hms_to_seconds, get_service_day_start_ts, translate_occupancy # type: ignore
12
+ from db_manager import init_db # type: ignore
13
+ from dotenv import load_dotenv # type: ignore
14
+ from fastapi import FastAPI, HTTPException # type: ignore
15
+ from fastapi.middleware.cors import CORSMiddleware # type: ignore
16
+
17
+ load_dotenv()
18
+
19
+ ttc_cache = AsyncBusCache(ttl=20)
20
+
21
+ # Initialize database connection globally
22
+ db = init_db()
23
+
24
+ app = FastAPI(title="WheresMyBus v2.0 API")
25
+
26
+ # Setup CORS for your React frontend
27
+ app.add_middleware(
28
+ CORSMiddleware,
29
+ allow_origins=["*"], # In production, use your actual React URL
30
+ allow_methods=["*"],
31
+ allow_headers=["*"],
32
+ )
33
+
34
+ @app.get("/")
35
+ async def health_check():
36
+ """Simple health check endpoint"""
37
+ return "backend is running"
38
+
39
+ @app.get("/api/vehicles")
40
+ async def get_vehicles():
41
+ data = await ttc_cache.get_data()
42
+ vehicles = data.get("vehicles", [])
43
+ return {
44
+ "status": "success",
45
+ "count": len(vehicles),
46
+ "vehicles": vehicles
47
+ }
48
+
49
+ @app.get("/api/routes")
50
+ async def get_all_routes():
51
+ """
52
+ Returns a complete list of TTC routes with their display names and colors.
53
+ """
54
+ try:
55
+ # Run the query against DuckDB
56
+ # We handle missing colors by providing defaults (TTC Red: #FF0000)
57
+ query = """
58
+ SELECT
59
+ route_id,
60
+ route_short_name,
61
+ route_long_name,
62
+ COALESCE(route_color, 'FF0000') as route_color,
63
+ COALESCE(route_text_color, 'FFFFFF') as route_text_color
64
+ FROM routes
65
+ ORDER BY
66
+ CASE
67
+ WHEN CAST(route_short_name AS VARCHAR) ~ '^[0-9]+$' THEN CAST(route_short_name AS INTEGER)
68
+ ELSE 999
69
+ END,
70
+ route_short_name;
71
+ """
72
+
73
+ results = db.execute(query).fetchall()
74
+
75
+ # Convert to a clean list of dictionaries
76
+ route_list = [
77
+ {
78
+ "id": r[0],
79
+ "number": r[1],
80
+ "name": r[2],
81
+ "color": f"#{r[3]}",
82
+ "text_color": f"#{r[4]}"
83
+ }
84
+ for r in results
85
+ ]
86
+
87
+ return {
88
+ "status": "success",
89
+ "count": len(route_list),
90
+ "routes": route_list
91
+ }
92
+
93
+ except Exception as e:
94
+ return {"status": "error", "message": str(e)}
95
+
96
+ @app.get("/api/routes/{route_id}")
97
+ async def get_route_view(route_id: str):
98
+ data = await ttc_cache.get_data()
99
+ all_buses = data.get("vehicles", [])
100
+ route_buses = [v for v in all_buses if v['route'] == route_id]
101
+
102
+ if not route_buses:
103
+ return {"route": route_id, "vehicles": []}
104
+
105
+ # IMPORTANT: Cast Trip IDs to strings to ensure they match the DB
106
+ trip_ids = [str(v['trip_id']) for v in route_buses]
107
+ placeholders = ','.join(['?'] * len(trip_ids))
108
+
109
+ # We use CAST(? AS VARCHAR) to force DuckDB to match strings to strings
110
+ query = f"""
111
+ SELECT
112
+ CAST(st.trip_id AS VARCHAR),
113
+ CAST(st.stop_id AS VARCHAR),
114
+ st.arrival_time,
115
+ t.trip_headsign
116
+ FROM stop_times st
117
+ JOIN trips t ON CAST(st.trip_id AS VARCHAR) = CAST(t.trip_id AS VARCHAR)
118
+ WHERE CAST(st.trip_id AS VARCHAR) IN ({placeholders})
119
+ """
120
+ db_rows = db.execute(query, trip_ids).fetchall()
121
+
122
+ # Check if we got ANYTHING back from the DB
123
+ if not db_rows:
124
+ print(f"DEBUG: No matches in DB for Trip IDs: {trip_ids[:3]}")
125
+
126
+ schedule_map = {(r[0], r[1]): r[2] for r in db_rows}
127
+ name_map = {r[0]: r[3] for r in db_rows}
128
+
129
+ service_day_ts = get_service_day_start_ts()
130
+ enriched = []
131
+
132
+ for bus in route_buses:
133
+ # Default delay is 0 if no prediction exists
134
+ raw_delay_mins = 0
135
+
136
+ pred_time = bus.get('predicted_time')
137
+ stop_id = bus.get('next_stop_id')
138
+
139
+ if pred_time and stop_id:
140
+ sched_hms = schedule_map.get((str(bus['trip_id']), str(stop_id)))
141
+ if sched_hms:
142
+ # Math: (Reality Unix - Plan Unix) / 60
143
+ plan_ts = service_day_ts + hms_to_seconds(sched_hms)
144
+ raw_delay_mins = round((pred_time - plan_ts) / 60)
145
+
146
+ enriched.append({
147
+ "number": bus['id'],
148
+ "name": name_map.get(str(bus['trip_id']), "Not in Schedule"), # This is the destination
149
+ "location": {"lat": bus['lat'], "lon": bus['lon']},
150
+ "delay_mins": raw_delay_mins, # Actual integer: 5 = 5m late, -2 = 2m early
151
+ "fullness": translate_occupancy(bus['occupancy'])
152
+ })
153
+
154
+ return {
155
+ "route": route_id,
156
+ "count": len(enriched),
157
+ "vehicles": enriched
158
+ }
159
+
160
+ @app.get("/api/vehicles/{vehicle_id}")
161
+ async def get_vehicle_view(vehicle_id: str):
162
+ # 1. Pull latest from cache
163
+ data = await ttc_cache.get_data()
164
+ vehicles = data.get("vehicles", [])
165
+
166
+ # 2. Find this specific bus in the list
167
+ bus = next((v for v in vehicles if str(v['id']) == vehicle_id), None)
168
+
169
+ if not bus:
170
+ raise HTTPException(status_code=404, detail="Vehicle not active or not found")
171
+
172
+ trip_id = str(bus['trip_id'])
173
+ next_stop_id = bus.get('next_stop_id')
174
+ predicted_time = bus.get('predicted_time')
175
+
176
+ # 3. Handshake with Database (Cast to VARCHAR to avoid type errors)
177
+ # We get the destination name and the specific scheduled arrival time
178
+ destination = "Not in Schedule"
179
+ delay_mins = 0
180
+
181
+ if next_stop_id:
182
+ query = """
183
+ SELECT
184
+ t.trip_headsign,
185
+ st.arrival_time
186
+ FROM trips t
187
+ JOIN stop_times st ON CAST(t.trip_id AS VARCHAR) = CAST(st.trip_id AS VARCHAR)
188
+ WHERE CAST(t.trip_id AS VARCHAR) = ?
189
+ AND CAST(st.stop_id AS VARCHAR) = ?
190
+ LIMIT 1
191
+ """
192
+ row = db.execute(query, [trip_id, str(next_stop_id)]).fetchone()
193
+
194
+ if row:
195
+ destination = row[0]
196
+ scheduled_hms = row[1]
197
+
198
+ # Math: Reality (Unix Time) - Plan (Service Day + Scheduled Seconds)
199
+ if predicted_time:
200
+ service_day_ts = get_service_day_start_ts()
201
+ plan_ts = service_day_ts + hms_to_seconds(scheduled_hms)
202
+ delay_mins = round((predicted_time - plan_ts) / 60)
203
+ else:
204
+ # If no next_stop_id, try to get destination from trip_id only
205
+ query = """
206
+ SELECT trip_headsign
207
+ FROM trips
208
+ WHERE CAST(trip_id AS VARCHAR) = ?
209
+ LIMIT 1
210
+ """
211
+ row = db.execute(query, [trip_id]).fetchone()
212
+ if row:
213
+ destination = row[0]
214
+
215
+ return {
216
+ "vehicle_number": vehicle_id,
217
+ "route_id": bus['route'],
218
+ "name": destination,
219
+ "location": {
220
+ "lat": bus['lat'],
221
+ "lon": bus['lon']
222
+ },
223
+ "delay_mins": delay_mins,
224
+ "fullness": translate_occupancy(bus['occupancy']),
225
+ "trip_id": trip_id
226
+ }
227
+
228
+ @app.get("/api/stop/{stop_code}")
229
+ async def get_stop_view(stop_code: str):
230
+ # 1. Translate Pole Number to Database ID
231
+ stop_info = db.execute("SELECT stop_id, stop_name FROM stops WHERE CAST(stop_code AS VARCHAR) = ? LIMIT 1", [str(stop_code)]).fetchone()
232
+ if not stop_info:
233
+ return {"error": "Stop code not found"}
234
+
235
+ target_id = str(stop_info[0])
236
+ stop_name = stop_info[1]
237
+
238
+ # 2. Get the Cache structure (dict with vehicles, predictions, alerts)
239
+ cached_data = await ttc_cache.get_data()
240
+ vehicles_list = cached_data.get("vehicles", [])
241
+ predictions = cached_data.get("predictions", {})
242
+
243
+ # Build vehicles map for quick lookup
244
+ vehicles = {str(v['trip_id']): v for v in vehicles_list}
245
+
246
+ now = datetime.now().timestamp()
247
+ two_hours_out = now + 7200
248
+ arrivals = []
249
+
250
+ # 3. Search the FULL itineraries for our target_id
251
+ for trip_id, itinerary in predictions.items():
252
+ if target_id in itinerary:
253
+ pred_time = itinerary[target_id]
254
+
255
+ # Only include if the bus hasn't passed the stop yet and is within 2 hours
256
+ if now <= pred_time <= two_hours_out:
257
+
258
+ # 4. Handshake with DB for destination and schedule
259
+ query = """
260
+ SELECT t.trip_headsign, st.arrival_time, r.route_short_name
261
+ FROM trips t
262
+ JOIN stop_times st ON CAST(t.trip_id AS VARCHAR) = CAST(st.trip_id AS VARCHAR)
263
+ JOIN routes r ON t.route_id = r.route_id
264
+ WHERE CAST(t.trip_id AS VARCHAR) = ? AND CAST(st.stop_id AS VARCHAR) = ?
265
+ LIMIT 1
266
+ """
267
+ row = db.execute(query, [trip_id, target_id]).fetchone()
268
+
269
+ if row:
270
+ # Find the actual bus for fullness (if it's on the road)
271
+ bus = vehicles.get(trip_id)
272
+
273
+ plan_ts = get_service_day_start_ts() + hms_to_seconds(row[1])
274
+
275
+ arrivals.append({
276
+ "route": row[2],
277
+ "destination": row[0],
278
+ "eta_mins": round((pred_time - now) / 60),
279
+ "delay_mins": round((pred_time - plan_ts) / 60),
280
+ "fullness": translate_occupancy(bus['occupancy']) if bus else "Unknown",
281
+ "vehicle_id": bus['id'] if bus else "In Transit"
282
+ })
283
+
284
+ arrivals.sort(key=lambda x: x['eta_mins'])
285
+ return {"stop_name": stop_name, "stop_code": stop_code, "arrivals": arrivals}
286
+
287
+ @app.get("/api/alerts")
288
+ async def get_all_alerts():
289
+ """
290
+ Returns every active service alert for the entire TTC network.
291
+ """
292
+ data = await ttc_cache.get_data()
293
+ return {
294
+ "timestamp": datetime.now().timestamp(),
295
+ "count": len(data["alerts"]),
296
+ "alerts": data["alerts"]
297
+ }
298
+
299
+ @app.get("/api/alerts/{route_id}")
300
+ async def get_alerts_for_route(route_id: str):
301
+ data = await ttc_cache.get_data()
302
+ alerts = data.get("alerts", {})
303
+ route_alerts = alerts.get(route_id, [])
304
+
305
+ if not route_alerts:
306
+ return {
307
+ "route_id": route_id,
308
+ "count": 0,
309
+ "alerts": "No alerts"
310
+ }
311
+
312
+ return {
313
+ "route_id": route_id,
314
+ "count": len(route_alerts),
315
+ "alerts": route_alerts
316
+ }
317
+
318
+ if __name__ == "__main__":
319
+ import uvicorn # type: ignore
320
+ # Start the server
321
+ uvicorn.run(app, host="0.0.0.0", port=7860)
src/db_manager.py ADDED
@@ -0,0 +1,145 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import duckdb # type: ignore
2
+ import os # type: ignore
3
+ import sys # type: ignore
4
+ from pathlib import Path # type: ignore
5
+ from dotenv import load_dotenv # type: ignore
6
+
7
+ # Add parent directory to path to allow imports from api/
8
+ sys.path.insert(0, str(Path(__file__).parent.parent))
9
+
10
+ from api.bus_cache import AsyncBusCache # type: ignore
11
+
12
+ # Configuration - always save DB in src/ directory
13
+ DB_PATH = str(Path(__file__).parent / "ttc_gtfs.duckdb")
14
+ STATIC_DIR = str(Path(__file__).parent.parent / "static")
15
+
16
+ def init_db():
17
+ """
18
+ Connects to DuckDB and imports the GTFS-Static data from the static/ directory.
19
+ """
20
+ # 1. Connect to DuckDB (creates the file if it doesn't exist)
21
+ con = duckdb.connect(DB_PATH)
22
+
23
+ # 2. Check if the database is already populated
24
+ tables = con.execute("SHOW TABLES").fetchall()
25
+ if ('stop_times',) in tables:
26
+ print("--- Database already exists and is populated ---")
27
+ return con
28
+
29
+ print("--- Initializing DuckDB: Importing CSVs from /static ---")
30
+
31
+ # Core GTFS files we need for the rework
32
+ files = ["routes.txt", "trips.txt", "stops.txt", "stop_times.txt"]
33
+
34
+ for f in files:
35
+ file_path = Path(STATIC_DIR) / f
36
+ table_name = f.replace(".txt", "")
37
+
38
+ if file_path.exists():
39
+ print(f"Loading {f} into table '{table_name}'...")
40
+ # 'read_csv_auto' automatically detects headers and data types
41
+ # Use absolute path for DuckDB
42
+ abs_file_path = str(file_path.resolve())
43
+ con.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_csv_auto('{abs_file_path}')")
44
+ else:
45
+ print(f"Error: {file_path} not found! Please ensure it is in the static/ folder.")
46
+
47
+ print("--- Database Import Complete ---")
48
+ return con
49
+
50
+ async def test_data_integrity(con):
51
+ """
52
+ Runs a test join to confirm that a trip ID can be linked to a route name and stop list.
53
+ Uses AsyncBusCache to get a real trip_id from the live API.
54
+ """
55
+ print("--- Running Integrity Test ---")
56
+ try:
57
+ # Get trip_id from live API using AsyncBusCache
58
+ cache = AsyncBusCache(ttl=20)
59
+ vehicles = await cache.get_data()
60
+
61
+ if not vehicles:
62
+ print("No vehicles available from API, falling back to database trip_id")
63
+ sample_trip = con.execute("SELECT trip_id FROM trips LIMIT 1").fetchone()[0]
64
+ else:
65
+ # Extract trip_id from the first vehicle
66
+ # We need to get the raw GTFS data to access trip_id
67
+ import httpx # type: ignore
68
+ from google.transit import gtfs_realtime_pb2 # type: ignore
69
+
70
+ load_dotenv()
71
+ gtfs_rt_url = os.getenv("GTFS_RT_URL")
72
+ if not gtfs_rt_url:
73
+ raise ValueError("GTFS_RT_URL is not set")
74
+
75
+ async with httpx.AsyncClient() as client:
76
+ response = await client.get(gtfs_rt_url, timeout=10)
77
+ response.raise_for_status()
78
+
79
+ feed = gtfs_realtime_pb2.FeedMessage()
80
+ feed.ParseFromString(response.content)
81
+
82
+ # Get trip_id from first vehicle entity
83
+ sample_trip = None
84
+ for entity in feed.entity:
85
+ if entity.HasField('vehicle') and entity.vehicle.trip.trip_id:
86
+ sample_trip = entity.vehicle.trip.trip_id
87
+ break
88
+
89
+ if not sample_trip:
90
+ print("No trip_id found in API response, falling back to database")
91
+ sample_trip = con.execute("SELECT trip_id FROM trips LIMIT 1").fetchone()[0]
92
+ else:
93
+ print(f"Using trip_id from live API: {sample_trip}")
94
+
95
+ # First, get the total count
96
+ count_query = f"""
97
+ SELECT COUNT(*)
98
+ FROM trips t
99
+ JOIN stop_times st ON t.trip_id = st.trip_id
100
+ WHERE t.trip_id = '{sample_trip}'
101
+ """
102
+ total_count = con.execute(count_query).fetchone()[0]
103
+
104
+ # Determine sample size - show all if <= 20, otherwise show first 20
105
+ sample_size = min(20, total_count) if total_count > 20 else total_count
106
+
107
+ query = f"""
108
+ SELECT
109
+ r.route_short_name,
110
+ t.trip_headsign,
111
+ st.stop_sequence,
112
+ s.stop_name
113
+ FROM trips t
114
+ JOIN routes r ON t.route_id = r.route_id
115
+ JOIN stop_times st ON t.trip_id = st.trip_id
116
+ JOIN stops s ON st.stop_id = s.stop_id
117
+ WHERE t.trip_id = '{sample_trip}'
118
+ ORDER BY st.stop_sequence
119
+ LIMIT {sample_size};
120
+ """
121
+ results = con.execute(query).fetchall()
122
+
123
+ print(f"\nSuccessfully joined data for Trip ID: {sample_trip}")
124
+ print(f"Total stops in trip: {total_count}")
125
+ if total_count > sample_size:
126
+ print(f"Showing first {sample_size} stops (sample):\n")
127
+ else:
128
+ print(f"Showing all {total_count} stops:\n")
129
+
130
+ print(f"{'Route':<8} {'Headsign':<30} {'Stop #':<8} {'Stop Name':<50}")
131
+ print("-" * 100)
132
+ for res in results:
133
+ route = res[0] or "N/A"
134
+ headsign = (res[1] or "N/A")[:28] # Truncate if too long
135
+ stop_seq = res[2]
136
+ stop_name = (res[3] or "N/A")[:48] # Truncate if too long
137
+ print(f"{route:<8} {headsign:<30} {stop_seq:<8} {stop_name:<50}")
138
+
139
+ except Exception as e:
140
+ print(f"Integrity test failed: {e}")
141
+
142
+ if __name__ == "__main__":
143
+ import asyncio # type: ignore
144
+ db_con = init_db()
145
+ asyncio.run(test_data_integrity(db_con))
src/ttc_gtfs.duckdb ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:5eec5ac01f4d0f0dcd888097054dc7d4e4b849ecbe68012c0c8eb54a4e941d8f
3
+ size 53751808