#!/usr/bin/env python3 """ BackgroundFX Pro β€” Core Functionality All processing logic, utilities, background generators, and handlers Enhanced with file safety, robust logging, and runtime diagnostics. """ import os import sys import io import gc import time import json import uuid import shutil import logging import tempfile import requests import threading import traceback import subprocess from datetime import datetime from concurrent.futures import ThreadPoolExecutor from typing import Optional, Tuple, List, Dict, Any, Union, Callable from pathlib import Path import torch import numpy as np from PIL import Image, ImageDraw, ImageFont import cv2 # ============================================================================== # PATHS & ENV # ============================================================================== # Repo root (…/app) APP_ROOT = Path(__file__).resolve().parent DATA_ROOT = APP_ROOT / "data" TMP_ROOT = APP_ROOT / "tmp" JOB_ROOT = TMP_ROOT / "backgroundfx_jobs" for p in ( DATA_ROOT, TMP_ROOT, JOB_ROOT, APP_ROOT / ".hf", APP_ROOT / ".torch", APP_ROOT / "checkpoints", APP_ROOT / "models", APP_ROOT / "utils", ): p.mkdir(parents=True, exist_ok=True) # Cache dirs (stable on Spaces) os.environ.setdefault("HF_HOME", str(APP_ROOT / ".hf")) os.environ.setdefault("TORCH_HOME", str(APP_ROOT / ".torch")) # Quiet BLAS/OpenMP spam (in case ui.py wasn't first) if not os.environ.get("OMP_NUM_THREADS", "").isdigit(): os.environ["OMP_NUM_THREADS"] = "4" os.environ.setdefault("OMP_NUM_THREADS", "4") os.environ.setdefault("OPENBLAS_NUM_THREADS", "1") os.environ.setdefault("MKL_NUM_THREADS", "1") os.environ.setdefault("NUMEXPR_NUM_THREADS", "1") os.environ.setdefault("PYTHONFAULTHANDLER", "1") # ============================================================================== # LOGGING + DIAGNOSTICS (console + file + heartbeat) # ============================================================================== # Line-buffer logs so Space UI shows them promptly try: sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) except Exception: pass LOG_FILE = DATA_ROOT / "run.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, encoding="utf-8")], force=True, ) logger = logging.getLogger("bgfx") # Faulthandler (native crashes -> stacks) try: import faulthandler, signal # type: ignore faulthandler.enable(all_threads=True) if hasattr(signal, "SIGUSR1"): faulthandler.register(signal.SIGUSR1, file=sys.stderr, all_threads=True) except Exception as e: logger.warning("faulthandler setup skipped: %s", e) def _disk_stats(p: Path) -> str: try: total, used, free = shutil.disk_usage(str(p)) mb = lambda x: x // (1024 * 1024) return f"disk(total={mb(total)}MB, used={mb(used)}MB, free={mb(free)}MB)" except Exception: return "disk(n/a)" def _cgroup_limit_bytes(): for fp in ("/sys/fs/cgroup/memory.max", "/sys/fs/cgroup/memory/memory.limit_in_bytes"): try: s = Path(fp).read_text().strip() if s and s != "max": return int(s) except Exception: pass def _rss_bytes(): try: for line in Path("/proc/self/status").read_text().splitlines(): if line.startswith("VmRSS:"): return int(line.split()[1]) * 1024 except Exception: return None def _heartbeat(): lim = _cgroup_limit_bytes() while True: rss = _rss_bytes() logger.info( "HEARTBEAT | rss=%s MB | limit=%s MB | %s", f"{rss//2**20}" if rss else "n/a", f"{lim//2**20}" if lim else "n/a", _disk_stats(APP_ROOT), ) time.sleep(2) # Start heartbeat as a daemon thread (only once) try: threading.Thread(target=_heartbeat, name="heartbeat", daemon=True).start() except Exception as e: logger.warning("heartbeat skipped: %s", e) import atexit @atexit.register def _on_exit(): logger.info("PROCESS EXITING (atexit) β€” if you don't see this, it was a hard kill (OOM/SIGKILL)") # ============================================================================== # STARTUP VALIDATION # ============================================================================== def startup_probe(): """Comprehensive startup probe - validates system readiness""" try: logger.info("πŸš€ BACKGROUNDFX PRO STARTUP PROBE") logger.info("πŸ“ Working directory: %s", os.getcwd()) logger.info("🐍 Python executable: %s", sys.executable) # Write probe (fail fast if not writable) probe_file = TMP_ROOT / "startup_probe.txt" probe_file.write_text("startup_test_ok", encoding="utf-8") assert probe_file.read_text(encoding="utf-8") == "startup_test_ok" logger.info("βœ… WRITE PROBE OK: %s | %s", probe_file, _disk_stats(APP_ROOT)) probe_file.unlink(missing_ok=True) # GPU/Torch status try: logger.info("πŸ”§ Torch=%s | cu=%s | cuda_available=%s", torch.__version__, getattr(torch.version, "cuda", None), torch.cuda.is_available()) if torch.cuda.is_available(): gpu_count = torch.cuda.device_count() name = torch.cuda.get_device_name(0) if gpu_count else "Unknown" vram_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) if gpu_count else 0 logger.info("πŸ”₯ GPU Available: %s (%d device(s)) β€” VRAM %.1f GB", name, gpu_count, vram_gb) else: logger.warning("⚠️ No GPU available β€” using CPU") except Exception as e: logger.warning("⚠️ Torch check failed: %s", e) # Directory verification (and creation if missing) for d in ("checkpoints", "models", "utils"): dp = APP_ROOT / d dp.mkdir(parents=True, exist_ok=True) logger.info("βœ… Directory %s: %s", d, dp) # Job dir isolation test test_job = JOB_ROOT / "startup_test_job" test_job.mkdir(parents=True, exist_ok=True) tfile = test_job / "test.tmp" tfile.write_text("job_isolation_test") assert tfile.read_text() == "job_isolation_test" logger.info("βœ… Job isolation directory ready: %s", JOB_ROOT) shutil.rmtree(test_job, ignore_errors=True) # Env summary logger.info("🌍 Env: OMP_NUM_THREADS=%s | HF_HOME=%s | TORCH_HOME=%s", os.environ.get("OMP_NUM_THREADS", "unset"), os.environ.get("HF_HOME", "default"), os.environ.get("TORCH_HOME", "default")) logger.info("🎯 Startup probe completed β€” system ready!") except Exception as e: logger.error("❌ STARTUP PROBE FAILED: %s", e) logger.error("πŸ“Š %s", _disk_stats(APP_ROOT)) raise RuntimeError(f"Startup probe failed β€” system not ready: {e}") from e # ============================================================================== # FILE SAFETY UTILITIES # ============================================================================== def new_tmp_path(suffix: str) -> Path: """Generate safe temporary path within TMP_ROOT""" return TMP_ROOT / f"{uuid.uuid4().hex}{suffix}" def atomic_write_bytes(dst: Path, data: bytes): """Atomic file write to prevent corruption""" tmp = new_tmp_path(dst.suffix + ".part") try: with open(tmp, "wb") as f: f.write(data) tmp.replace(dst) # atomic on same FS logger.debug("βœ… Atomic write: %s", dst) except Exception as e: if tmp.exists(): tmp.unlink(missing_ok=True) raise e def safe_name(name: str, default="file") -> str: """Sanitize filename to prevent traversal/unicode issues""" import re base = re.sub(r"[^A-Za-z0-9._-]+", "_", (name or default)) return base[:120] or default def place_uploaded(in_path: str, sub="uploads") -> Path: """Safely handle uploaded files with sanitized names""" target_dir = DATA_ROOT / sub target_dir.mkdir(exist_ok=True, parents=True) out = target_dir / safe_name(Path(in_path).name) shutil.copy2(in_path, out) logger.info("πŸ“ Uploaded file placed: %s", out) return out def tmp_video_path(ext=".mp4") -> Path: return new_tmp_path(ext) def tmp_image_path(ext=".png") -> Path: return new_tmp_path(ext) def run_safely(fn: Callable, *args, **kwargs): """Execute function with comprehensive error logging""" try: return fn(*args, **kwargs) except Exception: logger.error("PROCESSING FAILED\n%s", "".join(traceback.format_exc())) logger.error("CWD=%s | DATA_ROOT=%s | TMP_ROOT=%s | %s", os.getcwd(), DATA_ROOT, TMP_ROOT, _disk_stats(APP_ROOT)) try: logger.error("Env: OMP_NUM_THREADS=%s | CUDA=%s | torch=%s | cu=%s", os.environ.get("OMP_NUM_THREADS"), os.environ.get("CUDA_VISIBLE_DEVICES", "default"), torch.__version__, getattr(torch.version, "cuda", None)) except Exception: pass raise # ============================================================================== # SYSTEM UTILITIES # ============================================================================== def get_device(): """Get optimal device for processing""" return torch.device("cuda" if torch.cuda.is_available() else "cpu") def clear_gpu_memory(): """Aggressive GPU memory cleanup""" try: if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() logger.info("🧹 GPU memory cleared") except Exception as e: logger.warning("GPU cleanup warning: %s", e) def safe_file_operation(operation: Callable, *args, max_retries: int = 3, **kwargs): """Safely execute file operations with retries""" last_error = None for attempt in range(max_retries): try: return operation(*args, **kwargs) except Exception as e: last_error = e if attempt < max_retries - 1: time.sleep(0.1 * (attempt + 1)) logger.warning("File op retry %d: %s", attempt + 1, e) else: logger.error("File op failed after %d attempts: %s", max_retries, e) raise last_error # ============================================================================== # BACKGROUND GENERATORS # ============================================================================== def generate_ai_background(prompt: str, width: int, height: int) -> Image.Image: """Generate AI-like background using prompt cues (procedural)""" try: logger.info("Generating AI background: '%s' (%dx%d)", prompt, width, height) img = np.zeros((height, width, 3), dtype=np.uint8) prompt_lower = prompt.lower() if any(w in prompt_lower for w in ('city', 'urban', 'futuristic', 'cyberpunk')): for i in range(height): r = int(20 + 80 * (i / height)) g = int(30 + 100 * (i / height)) b = int(60 + 120 * (i / height)) img[i, :] = [r, g, b] elif any(w in prompt_lower for w in ('beach', 'tropical', 'ocean', 'sea')): for i in range(height): r = int(135 + 120 * (i / height)) g = int(206 + 49 * (i / height)) b = int(235 + 20 * (i / height)) img[i, :] = [r, g, b] elif any(w in prompt_lower for w in ('forest', 'jungle', 'nature', 'green')): for i in range(height): r = int(34 + 105 * (i / height)) g = int(139 + 30 * (i / height)) b = int(34 - 15 * (i / height)) img[i, :] = [max(0, r), max(0, g), max(0, b)] elif any(w in prompt_lower for w in ('space', 'galaxy', 'stars', 'cosmic')): for i in range(height): r = int(10 + 50 * (i / height)) g = int(0 + 30 * (i / height)) b = int(30 + 100 * (i / height)) img[i, :] = [r, g, b] elif any(w in prompt_lower for w in ('desert', 'sand', 'canyon')): for i in range(height): r = int(238 + 17 * (i / height)) g = int(203 + 52 * (i / height)) b = int(173 + 82 * (i / height)) img[i, :] = [min(255, r), min(255, g), min(255, b)] else: colors = [(255, 182, 193), (255, 218, 185), (176, 224, 230)] color = colors[len(prompt) % len(colors)] for i in range(height): t = 1 - (i / height) * 0.3 img[i, :] = [int(color[0] * t), int(color[1] * t), int(color[2] * t)] noise = np.random.randint(-15, 15, (height, width, 3)) img = np.clip(img.astype(np.int16) + noise, 0, 255).astype(np.uint8) return Image.fromarray(img) except Exception as e: logger.warning("AI background generation failed: %s β€” using fallback", e) return create_gradient_background("sunset", width, height) def create_gradient_background(gradient_type: str, width: int, height: int) -> Image.Image: img = np.zeros((height, width, 3), dtype=np.uint8) gradients = { "sunset": [(255, 165, 0), (128, 64, 128)], "ocean": [(0, 100, 255), (30, 144, 255)], "forest": [(34, 139, 34), (139, 69, 19)], "sky": [(135, 206, 235), (206, 235, 255)], } if gradient_type in gradients: start, end = gradients[gradient_type] for i in range(height): r = int(start[0] * (1 - i/height) + end[0] * (i/height)) g = int(start[1] * (1 - i/height) + end[1] * (i/height)) b = int(start[2] * (1 - i/height) + end[2] * (i/height)) img[i, :] = [r, g, b] else: img.fill(128) return Image.fromarray(img) def create_solid_background(color: str, width: int, height: int) -> Image.Image: color_map = { "white": (255, 255, 255), "black": (0, 0, 0), "red": (255, 0, 0), "green": (0, 255, 0), "blue": (0, 0, 255), "yellow": (255, 255, 0), "purple": (128, 0, 128), "orange": (255, 165, 0), "pink": (255, 192, 203), "gray": (128, 128, 128) } rgb = color_map.get(color.lower(), (128, 128, 128)) return Image.new("RGB", (width, height), rgb) def download_unsplash_image(query: str, width: int, height: int) -> Image.Image: try: url = f"https://source.unsplash.com/{width}x{height}/?{query}" resp = requests.get(url, timeout=10) resp.raise_for_status() img = Image.open(io.BytesIO(resp.content)) if img.size != (width, height): img = img.resize((width, height), Image.Resampling.LANCZOS) return img.convert("RGB") except Exception as e: logger.warning("Unsplash download failed: %s", e) return create_solid_background("gray", width, height) # ============================================================================== # VIDEO UTILITIES # ============================================================================== def get_video_info(video_path: str) -> Dict[str, Any]: try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Cannot open video file") fps = cap.get(cv2.CAP_PROP_FPS) frames= int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() return {"fps": fps, "frame_count": frames, "width": w, "height": h, "duration": (frames / fps if fps > 0 else 0)} except Exception as e: logger.error("get_video_info failed: %s", e) return {"fps": 30.0, "frame_count": 0, "width": 1920, "height": 1080, "duration": 0} def extract_frame(video_path: str, frame_number: int) -> Optional[np.ndarray]: try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() cap.release() if ret: return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return None except Exception as e: logger.error("extract_frame failed: %s", e) return None def ffmpeg_safe_call(inp: Path, out: Path, extra=()): cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", str(inp), *extra, str(out)] logger.info("FFMPEG %s", " ".join(cmd)) subprocess.run(cmd, check=True, timeout=300) # ============================================================================== # PROGRESS TRACKING # ============================================================================== class ProgressTracker: """Thread-safe progress tracking for video processing""" def __init__(self): self.current_step = "" self.progress = 0.0 self.total_frames = 0 self.processed_frames = 0 self.start_time = time.time() self.lock = threading.Lock() def update(self, step: str, progress: float = None): with self.lock: self.current_step = step if progress is not None: self.progress = max(0.0, min(1.0, progress)) def update_frames(self, processed: int, total: int = None): with self.lock: self.processed_frames = processed if total is not None: self.total_frames = total if self.total_frames > 0: self.progress = self.processed_frames / self.total_frames def get_status(self) -> Dict[str, Any]: with self.lock: elapsed = time.time() - self.start_time eta = 0 if self.progress > 0.01: eta = elapsed * (1.0 - self.progress) / self.progress return { "step": self.current_step, "progress": self.progress, "processed_frames": self.processed_frames, "total_frames": self.total_frames, "elapsed": elapsed, "eta": eta } # Global tracker progress_tracker = ProgressTracker() # ============================================================================== # SAFE FILE OPS # ============================================================================== def create_job_directory() -> Path: job_id = str(uuid.uuid4())[:8] job_dir = JOB_ROOT / f"job_{job_id}_{int(time.time())}" job_dir.mkdir(parents=True, exist_ok=True) logger.info("πŸ“ Created job directory: %s", job_dir) return job_dir def atomic_file_write(filepath: Path, content: bytes): # Use with_name to append ".tmp" without breaking pathlib rules temp_path = filepath.with_name(f"{filepath.name}.tmp") try: with open(temp_path, 'wb') as f: f.write(content) temp_path.rename(filepath) logger.debug("βœ… Atomic write: %s", filepath) except Exception as e: if temp_path.exists(): temp_path.unlink(missing_ok=True) raise e def safe_download(url: str, filepath: Path, max_size: int = 500 * 1024 * 1024): # Use with_name to append ".download" safely (e.g., "video.mp4.download") temp_path = filepath.with_name(f"{filepath.name}.download") try: r = requests.get(url, stream=True, timeout=30) r.raise_for_status() cl = r.headers.get('content-length') if cl and int(cl) > max_size: raise ValueError(f"File too large: {cl} bytes") downloaded = 0 with open(temp_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): if chunk: downloaded += len(chunk) if downloaded > max_size: raise ValueError(f"Download exceeded size limit: {downloaded} bytes") f.write(chunk) if not temp_path.exists() or temp_path.stat().st_size == 0: raise ValueError("Download resulted in empty file") temp_path.rename(filepath) logger.info("βœ… Downloaded: %s (%d bytes)", filepath, downloaded) except Exception as e: if temp_path.exists(): temp_path.unlink(missing_ok=True) logger.error("❌ Download failed: %s", e) raise # ============================================================================== # ENHANCED PIPELINE INTEGRATION # ============================================================================== def process_video_pipeline( video_path: str, background_image: Optional[Image.Image], background_type: str, background_prompt: str, job_dir: Path, progress_callback: Optional[Callable] = None ) -> str: """Process video using the two-stage pipeline with enhanced safety and monitoring""" def _inner_process(): logger.info("=" * 60) logger.info("=== ENHANCED TWO-STAGE PIPELINE (WITH SAFETY) ===") logger.info("=" * 60) logger.info("DEBUG video_path=%s exists=%s size=%s bytes", video_path, Path(video_path).exists(), (Path(video_path).stat().st_size if Path(video_path).exists() else "N/A")) logger.info("DEBUG job_dir=%s writable=%s", job_dir, os.access(job_dir, os.W_OK)) logger.info("DEBUG bg_image=%s bg_type=%s | %s", (background_image.size if background_image else None), background_type, _disk_stats(APP_ROOT)) if not Path(video_path).exists(): raise FileNotFoundError(f"Video file not found: {video_path}") # Copy into controlled area safe_video_path = place_uploaded(video_path, "videos") logger.info("DEBUG safe_video_path=%s", safe_video_path) logger.info("DEBUG importing two-stage pipeline…") try: from two_stage_pipeline import process_two_stage as pipeline_process logger.info("βœ“ two-stage pipeline import OK") except ImportError as e: logger.error("Import two_stage_pipeline failed: %s", e) raise progress_tracker.update("Initializing enhanced two-stage pipeline…") current_stage = {"stage": "init", "start_time": time.time()} def safe_progress_callback(step: str, progress: float = None): try: now = time.time() elapsed = now - current_stage["start_time"] if "Stage 1" in step and current_stage["stage"] != "stage1": current_stage["stage"] = "stage1" current_stage["start_time"] = now logger.info("πŸ”„ Entering Stage 1 (SAM2) | %s", _disk_stats(APP_ROOT)) elif "Stage 2" in step and current_stage["stage"] != "stage2": d1 = now - current_stage["start_time"] current_stage["stage"] = "stage2" current_stage["start_time"] = now logger.info("πŸ”„ Entering Stage 2 (Composition) β€” Stage 1 time %.1fs | %s", d1, _disk_stats(APP_ROOT)) elif "Done" in step and current_stage["stage"] != "complete": d2 = now - current_stage["start_time"] current_stage["stage"] = "complete" logger.info("πŸ”„ Pipeline complete β€” Stage 2 time %.1fs | %s", d2, _disk_stats(APP_ROOT)) logger.info("PROGRESS [%s] (%.1fs): %s (%s)", current_stage['stage'].upper(), elapsed, step, progress) progress_tracker.update(step, progress) if progress_callback: progress_callback(f"Progress: {progress:.1%} - {step}" if progress is not None else step) if current_stage["stage"] == "stage1" and elapsed > 15: logger.warning("⚠️ Stage 1 running for %.1fs β€” monitoring memory", elapsed) except Exception as e: logger.error("Progress callback error: %s", e) if background_image is None: raise ValueError("Background image is required") logger.info("DEBUG: calling two-stage pipeline…") result_path = pipeline_process( video_path=str(safe_video_path), background_image=background_image, workdir=job_dir, progress=safe_progress_callback, use_matany=True ) logger.info("DEBUG: pipeline returned %s (%s)", result_path, type(result_path)) if result_path: result_file = Path(result_path) logger.info("DEBUG: result exists=%s", result_file.exists()) if result_file.exists(): size = result_file.stat().st_size logger.info("DEBUG: result size=%d bytes", size) if size == 0: raise RuntimeError("Pipeline produced empty output file") # Quick validity check try: cap = cv2.VideoCapture(str(result_file)) if cap.isOpened(): frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) logger.info("DEBUG: output frame_count=%d", frames) cap.release() else: logger.warning("⚠️ Output may not be a valid video (cannot open)") except Exception as e: logger.warning("⚠️ Could not verify output video: %s", e) if not result_path or not Path(result_path).exists(): raise RuntimeError("Two-stage pipeline failed β€” no output produced") logger.info("=" * 60) logger.info("βœ… ENHANCED TWO-STAGE PIPELINE COMPLETED: %s", result_path) logger.info("=" * 60) return result_path try: return run_safely(_inner_process) except Exception as e: logger.error("🧹 Error cleanup…") clear_gpu_memory() logger.error("Job dir state: %s", (list(job_dir.iterdir()) if job_dir.exists() else "does not exist")) raise