sbv Claude commited on
Commit
2d6c64f
Β·
1 Parent(s): b3590f6

Add real-time progress updates via Server-Sent Events

Browse files

Users now see step-by-step progress during document upload:
⏳ Uploading document...
πŸ“„ Converting document to markdown...
🌍 Detecting document language...
βœ“ Language: Portuguese
βœ‚οΈ Creating smart chunks...
πŸ€– Loading Portuguese model (first time, ~30-60s)...
🧠 Generating embeddings...
πŸ’Ύ Storing in vector database...
βœ… Ready! Ask your questions below.

Backend changes:
- Add SSE endpoint /api/progress/{session_id} for streaming progress
- Upload endpoint returns immediately and processes in background
- send_progress() helper sends updates through asyncio queue
- Progress messages added throughout document processing flow

Frontend changes:
- Connect to EventSource for real-time progress updates
- Display progress messages as they arrive
- Auto-close connection when processing complete

Benefits:
- Users know exactly what's happening (no more "is it stuck?")
- Clear indication when models are downloading
- Professional UX with real-time feedback

πŸ€– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Files changed (2) hide show
  1. backend/main.py +93 -21
  2. frontend/script.js +41 -16
backend/main.py CHANGED
@@ -1,12 +1,15 @@
1
  from fastapi import FastAPI, UploadFile, File, Form, HTTPException
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from fastapi.staticfiles import StaticFiles
4
- from fastapi.responses import JSONResponse
5
  import tempfile
6
  import os
7
  from typing import Optional
8
  import uuid
9
  import warnings
 
 
 
10
 
11
  # Suppress ChromaDB telemetry warnings
12
  warnings.filterwarnings("ignore", message=".*capture.*takes 1 positional argument.*")
@@ -46,6 +49,9 @@ app.add_middleware(
46
  # In-memory storage for sessions
47
  sessions = {}
48
 
 
 
 
49
  # Language code mapping (Docling uses ISO 639-1, langdetect returns similar codes)
50
  SUPPORTED_LANGUAGES = {
51
  'en': 'English',
@@ -134,6 +140,15 @@ def translate_text(text: str, source_lang: str, target_lang: str) -> str:
134
  return text # Return original text if translation fails
135
 
136
 
 
 
 
 
 
 
 
 
 
137
  class DocumentProcessor:
138
  """Handles document processing and RAG functionality"""
139
 
@@ -160,6 +175,8 @@ class DocumentProcessor:
160
  def process_document(self, file_path: str, file_extension: str) -> dict:
161
  """Process document with Docling"""
162
  try:
 
 
163
  # Check PDF page limit
164
  if file_extension == "pdf":
165
  within_limit, num_pages = self.check_pdf_pages(file_path)
@@ -177,6 +194,8 @@ class DocumentProcessor:
177
  self.docling_document = result.document
178
  markdown_content = self.docling_document.export_to_markdown()
179
 
 
 
180
  # Extract document language from Docling metadata
181
  doc_lang = None
182
  if hasattr(result.document, 'lang') and result.document.lang:
@@ -198,6 +217,8 @@ class DocumentProcessor:
198
  print(f"Language from Docling: {doc_lang}")
199
 
200
  self.document_language = doc_lang
 
 
201
 
202
  # Estimate pages for non-PDF formats
203
  if file_extension != "pdf":
@@ -210,9 +231,11 @@ class DocumentProcessor:
210
 
211
  self.document_content = markdown_content
212
 
 
213
  # Chunk using HybridChunker with document structure
214
  chunks = self._chunk_with_hybrid_chunker()
215
 
 
216
  # Create embeddings and store in ChromaDB
217
  self._create_vector_store(chunks)
218
 
@@ -275,6 +298,8 @@ class DocumentProcessor:
275
 
276
  # Lazy-load model if not already loaded
277
  if self.document_language not in embedding_models:
 
 
278
  load_language_model(self.document_language)
279
 
280
  # Get the appropriate embedding model for document language
@@ -286,6 +311,7 @@ class DocumentProcessor:
286
  # Generate embeddings
287
  embeddings = embedding_model.encode(chunks).tolist()
288
 
 
289
  # Add to ChromaDB
290
  ids = [f"chunk_{i}" for i in range(len(chunks))]
291
  self.collection.add(
@@ -294,6 +320,8 @@ class DocumentProcessor:
294
  ids=ids
295
  )
296
 
 
 
297
  except Exception as e:
298
  raise HTTPException(status_code=500, detail=f"Error creating vector store: {str(e)}")
299
 
@@ -395,6 +423,41 @@ async def model_status():
395
  }
396
 
397
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
398
  @app.post("/upload")
399
  async def upload_document(
400
  file: UploadFile = File(...),
@@ -425,34 +488,43 @@ async def upload_document(
425
  # Create session
426
  session_id = str(uuid.uuid4())
427
 
 
 
 
428
  # Save uploaded file temporarily
429
  with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file:
430
  content = await file.read()
431
  tmp_file.write(content)
432
  tmp_file_path = tmp_file.name
433
 
434
- try:
435
- # Process document
436
- processor = DocumentProcessor(session_id)
437
- result = processor.process_document(tmp_file_path, file_extension)
438
-
439
- # Store session
440
- sessions[session_id] = {
441
- "processor": processor,
442
- "api_key": api_key,
443
- "filename": file.filename
444
- }
 
 
 
 
 
 
 
445
 
446
- return {
447
- **result,
448
- "session_id": session_id,
449
- "filename": file.filename
450
- }
451
 
452
- finally:
453
- # Clean up temp file
454
- if os.path.exists(tmp_file_path):
455
- os.unlink(tmp_file_path)
 
 
456
 
457
 
458
  @app.post("/ask")
 
1
  from fastapi import FastAPI, UploadFile, File, Form, HTTPException
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from fastapi.staticfiles import StaticFiles
4
+ from fastapi.responses import JSONResponse, StreamingResponse
5
  import tempfile
6
  import os
7
  from typing import Optional
8
  import uuid
9
  import warnings
10
+ import asyncio
11
+ import json
12
+ from collections import defaultdict
13
 
14
  # Suppress ChromaDB telemetry warnings
15
  warnings.filterwarnings("ignore", message=".*capture.*takes 1 positional argument.*")
 
49
  # In-memory storage for sessions
50
  sessions = {}
51
 
52
+ # Progress tracking for real-time updates
53
+ progress_queues = defaultdict(asyncio.Queue)
54
+
55
  # Language code mapping (Docling uses ISO 639-1, langdetect returns similar codes)
56
  SUPPORTED_LANGUAGES = {
57
  'en': 'English',
 
140
  return text # Return original text if translation fails
141
 
142
 
143
+ def send_progress(session_id: str, message: str, step: str = ""):
144
+ """Send progress update to the session's queue"""
145
+ try:
146
+ queue = progress_queues[session_id]
147
+ queue.put_nowait({"message": message, "step": step})
148
+ except Exception as e:
149
+ print(f"Error sending progress: {e}")
150
+
151
+
152
  class DocumentProcessor:
153
  """Handles document processing and RAG functionality"""
154
 
 
175
  def process_document(self, file_path: str, file_extension: str) -> dict:
176
  """Process document with Docling"""
177
  try:
178
+ send_progress(self.session_id, "πŸ“„ Converting document to markdown...", "converting")
179
+
180
  # Check PDF page limit
181
  if file_extension == "pdf":
182
  within_limit, num_pages = self.check_pdf_pages(file_path)
 
194
  self.docling_document = result.document
195
  markdown_content = self.docling_document.export_to_markdown()
196
 
197
+ send_progress(self.session_id, "🌍 Detecting document language...", "detecting_language")
198
+
199
  # Extract document language from Docling metadata
200
  doc_lang = None
201
  if hasattr(result.document, 'lang') and result.document.lang:
 
217
  print(f"Language from Docling: {doc_lang}")
218
 
219
  self.document_language = doc_lang
220
+ lang_name = SUPPORTED_LANGUAGES.get(doc_lang, doc_lang.upper())
221
+ send_progress(self.session_id, f"βœ“ Language: {lang_name}", "language_detected")
222
 
223
  # Estimate pages for non-PDF formats
224
  if file_extension != "pdf":
 
231
 
232
  self.document_content = markdown_content
233
 
234
+ send_progress(self.session_id, "βœ‚οΈ Creating smart chunks...", "chunking")
235
  # Chunk using HybridChunker with document structure
236
  chunks = self._chunk_with_hybrid_chunker()
237
 
238
+ send_progress(self.session_id, "🧠 Generating embeddings...", "embedding")
239
  # Create embeddings and store in ChromaDB
240
  self._create_vector_store(chunks)
241
 
 
298
 
299
  # Lazy-load model if not already loaded
300
  if self.document_language not in embedding_models:
301
+ lang_name = SUPPORTED_LANGUAGES.get(self.document_language, self.document_language.upper())
302
+ send_progress(self.session_id, f"πŸ€– Loading {lang_name} model (first time, ~30-60s)...", "loading_model")
303
  load_language_model(self.document_language)
304
 
305
  # Get the appropriate embedding model for document language
 
311
  # Generate embeddings
312
  embeddings = embedding_model.encode(chunks).tolist()
313
 
314
+ send_progress(self.session_id, "πŸ’Ύ Storing in vector database...", "storing")
315
  # Add to ChromaDB
316
  ids = [f"chunk_{i}" for i in range(len(chunks))]
317
  self.collection.add(
 
320
  ids=ids
321
  )
322
 
323
+ send_progress(self.session_id, "βœ… Ready! Ask your questions below.", "complete")
324
+
325
  except Exception as e:
326
  raise HTTPException(status_code=500, detail=f"Error creating vector store: {str(e)}")
327
 
 
423
  }
424
 
425
 
426
+ @app.get("/api/progress/{session_id}")
427
+ async def progress_stream(session_id: str):
428
+ """Server-Sent Events endpoint for real-time progress updates"""
429
+ async def event_generator():
430
+ queue = progress_queues[session_id]
431
+ try:
432
+ while True:
433
+ # Wait for progress message with timeout
434
+ try:
435
+ progress = await asyncio.wait_for(queue.get(), timeout=120.0)
436
+ # Send SSE formatted message
437
+ yield f"data: {json.dumps(progress)}\n\n"
438
+ # If complete, stop streaming
439
+ if progress.get('step') == 'complete':
440
+ break
441
+ except asyncio.TimeoutError:
442
+ # Send keepalive
443
+ yield f": keepalive\n\n"
444
+ except Exception as e:
445
+ print(f"Progress stream error: {e}")
446
+ finally:
447
+ # Clean up queue
448
+ if session_id in progress_queues:
449
+ del progress_queues[session_id]
450
+
451
+ return StreamingResponse(
452
+ event_generator(),
453
+ media_type="text/event-stream",
454
+ headers={
455
+ "Cache-Control": "no-cache",
456
+ "Connection": "keep-alive",
457
+ }
458
+ )
459
+
460
+
461
  @app.post("/upload")
462
  async def upload_document(
463
  file: UploadFile = File(...),
 
488
  # Create session
489
  session_id = str(uuid.uuid4())
490
 
491
+ # Send initial progress message
492
+ send_progress(session_id, "⏳ Uploading document...", "uploading")
493
+
494
  # Save uploaded file temporarily
495
  with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file:
496
  content = await file.read()
497
  tmp_file.write(content)
498
  tmp_file_path = tmp_file.name
499
 
500
+ # Process document in background
501
+ async def process_in_background():
502
+ try:
503
+ processor = DocumentProcessor(session_id)
504
+ result = processor.process_document(tmp_file_path, file_extension)
505
+
506
+ # Store session
507
+ sessions[session_id] = {
508
+ "processor": processor,
509
+ "api_key": api_key,
510
+ "filename": file.filename
511
+ }
512
+ except Exception as e:
513
+ send_progress(session_id, f"❌ Error: {str(e)}", "error")
514
+ finally:
515
+ # Clean up temp file
516
+ if os.path.exists(tmp_file_path):
517
+ os.unlink(tmp_file_path)
518
 
519
+ # Start processing in background
520
+ asyncio.create_task(process_in_background())
 
 
 
521
 
522
+ # Return immediately with session_id so frontend can connect to SSE
523
+ return {
524
+ "session_id": session_id,
525
+ "filename": file.filename,
526
+ "status": "processing"
527
+ }
528
 
529
 
530
  @app.post("/ask")
frontend/script.js CHANGED
@@ -490,6 +490,8 @@ async function handleUpload() {
490
  return;
491
  }
492
 
 
 
493
  try {
494
  showLoading(getNestedTranslation('loading.processing'));
495
  hideStatus();
@@ -499,37 +501,60 @@ async function handleUpload() {
499
  formData.append('file', file);
500
  formData.append('api_key', apiKey);
501
 
502
- // Show model download message after 15 seconds
503
- const loadingTimeoutId = setTimeout(() => {
504
- showLoading(getNestedTranslation('loading.modelDownload'));
505
- }, 15000);
506
-
507
- // Upload (no timeout - CPU processing can take a while)
508
- const response = await fetch(`${API_BASE}/upload`, {
509
  method: 'POST',
510
  body: formData
511
  });
512
 
513
- clearTimeout(loadingTimeoutId);
514
-
515
  const data = await response.json();
516
 
517
  if (!response.ok) {
518
  throw new Error(data.detail || 'Upload failed');
519
  }
520
 
521
- // Success
522
- sessionId = data.session_id;
523
- documentNameEl.textContent = data.filename;
524
-
525
- // Show chat interface
526
- chatOverlay.classList.add('active');
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
527
 
528
  } catch (error) {
529
  console.error('Upload error:', error);
530
  showStatus(error.message, 'error');
531
- } finally {
532
  hideLoading();
 
533
  }
534
  }
535
 
 
490
  return;
491
  }
492
 
493
+ let eventSource = null;
494
+
495
  try {
496
  showLoading(getNestedTranslation('loading.processing'));
497
  hideStatus();
 
501
  formData.append('file', file);
502
  formData.append('api_key', apiKey);
503
 
504
+ // Start upload (returns immediately with session_id)
505
+ const uploadPromise = fetch(`${API_BASE}/upload`, {
 
 
 
 
 
506
  method: 'POST',
507
  body: formData
508
  });
509
 
510
+ // Get session_id from response to connect to progress stream
511
+ const response = await uploadPromise;
512
  const data = await response.json();
513
 
514
  if (!response.ok) {
515
  throw new Error(data.detail || 'Upload failed');
516
  }
517
 
518
+ // Connect to Server-Sent Events for progress updates
519
+ const tempSessionId = data.session_id;
520
+ eventSource = new EventSource(`${API_BASE}/api/progress/${tempSessionId}`);
521
+
522
+ eventSource.onmessage = (event) => {
523
+ try {
524
+ const progress = JSON.parse(event.data);
525
+ // Update loading message with progress
526
+ showLoading(progress.message);
527
+
528
+ // If complete, finalize
529
+ if (progress.step === 'complete') {
530
+ sessionId = tempSessionId;
531
+ documentNameEl.textContent = data.filename;
532
+ chatOverlay.classList.add('active');
533
+ hideLoading();
534
+ eventSource.close();
535
+ }
536
+ } catch (e) {
537
+ console.error('Progress parse error:', e);
538
+ }
539
+ };
540
+
541
+ eventSource.onerror = (error) => {
542
+ console.error('Progress stream error:', error);
543
+ eventSource.close();
544
+ // If error but response was OK, show success anyway
545
+ if (data.session_id) {
546
+ sessionId = data.session_id;
547
+ documentNameEl.textContent = data.filename;
548
+ chatOverlay.classList.add('active');
549
+ hideLoading();
550
+ }
551
+ };
552
 
553
  } catch (error) {
554
  console.error('Upload error:', error);
555
  showStatus(error.message, 'error');
 
556
  hideLoading();
557
+ if (eventSource) eventSource.close();
558
  }
559
  }
560