# src/models.py — Enterprise Lens V4 # ════════════════════════════════════════════════════════════════════ # Face Lane : InsightFace SCRFD-10GF + ArcFace-R100 (buffalo_l) # + AdaFace IR-50 (WebFace4M) fused → 1024-D vector # • det_size=(1280,1280) — catches small/group faces # • Quality gate: det_score ≥ 0.60, face_px ≥ 40 # • Multi-scale: runs detection at 2 scales, merges # • Stores one 1024-D vector PER face # • Each vector carries base64 face-crop thumbnail # • face_quality_score + face_width_px in metadata # # Object Lane: SigLIP + DINOv2 fused 1536-D (unchanged from V3) # ════════════════════════════════════════════════════════════════════ import os os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" import asyncio import base64 import functools import hashlib import io import threading import traceback import cv2 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from PIL import Image from transformers import AutoImageProcessor, AutoModel, AutoProcessor from ultralytics import YOLO # ── InsightFace ─────────────────────────────────────────────────── try: import insightface from insightface.app import FaceAnalysis INSIGHTFACE_AVAILABLE = True except ImportError: INSIGHTFACE_AVAILABLE = False print("⚠️ insightface not installed — face lane disabled") print(" Run: pip install insightface onnxruntime-silicon (mac)") print(" pip install insightface onnxruntime (linux/win)") # ── AdaFace ────────────────────────────────────────────────────── # Disabled by default — enable by setting ENABLE_ADAFACE=1 env var. # When disabled: ArcFace(512) + zeros(512) = 1024-D (fully functional). ADAFACE_WEIGHTS_AVAILABLE = False # controlled by ENABLE_ADAFACE env var # ── Constants ───────────────────────────────────────────────────── YOLO_PERSON_CLASS_ID = 0 MIN_FACE_SIZE = 20 # lowered: 40 missed small faces in group photos MAX_FACES_PER_IMAGE = 12 # slightly higher cap for group photos MAX_CROPS = 6 # max YOLO object crops per image MAX_IMAGE_SIZE = 640 # object lane longest edge DET_SIZE_PRIMARY = (1280, 1280) # V4: 1280 for small-face detection DET_SIZE_SECONDARY = (640, 640) # fallback / 2nd scale FACE_CROP_THUMB_SIZE = 112 # face thumbnail for Pinecone metadata FACE_CROP_QUALITY = 80 # JPEG quality for thumbnails FACE_QUALITY_GATE = 0.35 # lowered from 0.60 — accepts sunglasses, angles, smiles # Multi-scale pyramid — tried in order, results merged with IoU dedup DET_SCALES = [(1280, 1280), (960, 960), (640, 640)] IOU_DEDUP_THRESHOLD = 0.45 # suppress duplicate detections across scales FACE_DIM = 512 # ArcFace embedding dimension ADAFACE_DIM = 512 # AdaFace embedding dimension FUSED_FACE_DIM = 1024 # ArcFace + AdaFace concatenated # ════════════════════════════════════════════════════════════════ # Utility functions # ════════════════════════════════════════════════════════════════ def _resize_pil(img: Image.Image, max_side: int = MAX_IMAGE_SIZE) -> Image.Image: w, h = img.size if max(w, h) <= max_side: return img scale = max_side / max(w, h) return img.resize((int(w * scale), int(h * scale)), Image.LANCZOS) def _img_hash(image_path: str) -> str: h = hashlib.md5() with open(image_path, "rb") as f: h.update(f.read(65536)) return h.hexdigest() def _crop_to_b64( img_bgr: np.ndarray, x1: int, y1: int, x2: int, y2: int, thumb_size: int = FACE_CROP_THUMB_SIZE, ) -> str: """Crop face from BGR image with 20% padding, return base64 JPEG thumbnail.""" H, W = img_bgr.shape[:2] w, h = x2 - x1, y2 - y1 pad_x = int(w * 0.20) pad_y = int(h * 0.20) cx1 = max(0, x1 - pad_x) cy1 = max(0, y1 - pad_y) cx2 = min(W, x2 + pad_x) cy2 = min(H, y2 + pad_y) crop = img_bgr[cy1:cy2, cx1:cx2] if crop.size == 0: return "" pil = Image.fromarray(crop[:, :, ::-1]) # BGR → RGB pil = pil.resize((thumb_size, thumb_size), Image.LANCZOS) buf = io.BytesIO() pil.save(buf, format="JPEG", quality=FACE_CROP_QUALITY) return base64.b64encode(buf.getvalue()).decode() def _face_crop_for_adaface( img_bgr: np.ndarray, x1: int, y1: int, x2: int, y2: int, ) -> np.ndarray: """ Crop and normalise face for AdaFace IR-50 input. Returns float32 numpy array (3, 112, 112) normalised to [-1, 1]. """ H, W = img_bgr.shape[:2] w, h = x2 - x1, y2 - y1 pad_x = int(w * 0.10) pad_y = int(h * 0.10) cx1 = max(0, x1 - pad_x) cy1 = max(0, y1 - pad_y) cx2 = min(W, x2 + pad_x) cy2 = min(H, y2 + pad_y) crop = img_bgr[cy1:cy2, cx1:cx2] if crop.size == 0: return None rgb = crop[:, :, ::-1].copy() # BGR → RGB pil = Image.fromarray(rgb).resize((112, 112), Image.LANCZOS) arr = np.array(pil, dtype=np.float32) / 255.0 arr = (arr - 0.5) / 0.5 # normalise [-1, 1] return arr.transpose(2, 0, 1) # HWC → CHW def _clahe_enhance(bgr: np.ndarray) -> np.ndarray: """CLAHE on luminance — improves detection on dark/washed/low-contrast photos.""" lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) l_eq = clahe.apply(l) return cv2.cvtColor(cv2.merge([l_eq, a, b]), cv2.COLOR_LAB2BGR) def _iou(box_a: list, box_b: list) -> float: """IoU between two [x1,y1,x2,y2] boxes.""" xa = max(box_a[0], box_b[0]); ya = max(box_a[1], box_b[1]) xb = min(box_a[2], box_b[2]); yb = min(box_a[3], box_b[3]) inter = max(0, xb - xa) * max(0, yb - ya) if inter == 0: return 0.0 area_a = (box_a[2]-box_a[0]) * (box_a[3]-box_a[1]) area_b = (box_b[2]-box_b[0]) * (box_b[3]-box_b[1]) return inter / (area_a + area_b - inter) def _dedup_faces(faces_list: list, iou_thresh: float = IOU_DEDUP_THRESHOLD) -> list: """Remove duplicate detections across scales/flips. Keep highest det_score.""" if not faces_list: return [] faces_list = sorted(faces_list, key=lambda f: float(f.det_score), reverse=True) kept = [] for face in faces_list: b = face.bbox.astype(int) box = [b[0], b[1], b[2], b[3]] duplicate = any(_iou(box, [k.bbox.astype(int)[i] for i in range(4)]) > iou_thresh for k in kept) if not duplicate: kept.append(face) return kept # ════════════════════════════════════════════════════════════════ # AIModelManager — V4 # ════════════════════════════════════════════════════════════════ class AIModelManager: def __init__(self): self.device = ( "cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu") ) print(f"🚀 Loading models onto: {self.device.upper()}...") # ── Object Lane: SigLIP + DINOv2 (unchanged) ───────────── print("📦 Loading SigLIP...") self.siglip_processor = AutoProcessor.from_pretrained( "google/siglip-base-patch16-224", use_fast=True) self.siglip_model = AutoModel.from_pretrained( "google/siglip-base-patch16-224").to(self.device).eval() print("📦 Loading DINOv2...") self.dinov2_processor = AutoImageProcessor.from_pretrained("facebook/dinov2-base") self.dinov2_model = AutoModel.from_pretrained( "facebook/dinov2-base").to(self.device).eval() if self.device == "cuda": self.siglip_model = self.siglip_model.half() self.dinov2_model = self.dinov2_model.half() # ── YOLO for object segmentation ───────────────────────── print("📦 Loading YOLO11n-seg...") self.yolo = YOLO("yolo11n-seg.pt") # ── Face Lane: InsightFace SCRFD + ArcFace-R100 ─────────── # V4: ALWAYS use buffalo_l (SCRFD-10GF + ArcFace-R100) # even on CPU — accuracy matters more than speed here. # det_size=1280 catches faces as small as ~10px in source. self.face_app = None if INSIGHTFACE_AVAILABLE: try: print("📦 Loading InsightFace buffalo_l (SCRFD-10GF + ArcFace-R100)...") self.face_app = FaceAnalysis( name="buffalo_l", providers=( ["CUDAExecutionProvider", "CPUExecutionProvider"] if self.device == "cuda" else ["CPUExecutionProvider"] ), ) self.face_app.prepare( ctx_id=0 if self.device == "cuda" else -1, det_size=DET_SIZE_PRIMARY, # 1280×1280 — key for small faces ) # Warmup test_img = np.zeros((112, 112, 3), dtype=np.uint8) self.face_app.get(test_img) print("✅ InsightFace buffalo_l loaded — SCRFD+ArcFace face lane ACTIVE") print(f" det_size={DET_SIZE_PRIMARY} | quality_gate={FACE_QUALITY_GATE}") except Exception as e: print(f"❌ InsightFace init FAILED: {e}") print(traceback.format_exc()) self.face_app = None else: print("❌ InsightFace NOT installed") # ── AdaFace IR-50 (CVPR 2022) — quality-adaptive fusion ─── # Fused with ArcFace → 1024-D face vector # Weights: adaface_ir50_webface4m.ckpt from HuggingFace self.adaface_model = None self._load_adaface() # Thread safety for ONNX self._face_lock = threading.Lock() self._cache = {} self._cache_maxsize = 128 adaface_status = "FULL FUSION u2705" if self.adaface_model else "ZERO-PADDED u26a0ufe0f (AdaFace weights missing)" print("") print("u2705 Enterprise Lens V4 u2014 Models Ready") print(f" Device : {self.device.upper()}") print(f" InsightFace : buffalo_l (SCRFD-10GF + ArcFace-R100)") print(f" AdaFace : {adaface_status}") print(f" Face vector dim : {FUSED_FACE_DIM} <- enterprise-faces MUST be {FUSED_FACE_DIM}-D") print(f" Object vector dim : 1536 <- enterprise-objects MUST be 1536-D") print(f" Quality gate : det_score >= {FACE_QUALITY_GATE}, face_px >= {MIN_FACE_SIZE}") print(f" Detection size : {DET_SIZE_PRIMARY}") print("") def _load_adaface(self): """ AdaFace IR-50 MS1MV2 — disabled for now. Face vectors use ArcFace(512) + zeros(512) = 1024-D. This is fully functional — cosine similarity works correctly. Re-enable by setting ENABLE_ADAFACE=1 env var when HF token injection into Docker build is confirmed working. """ enable = os.getenv("ENABLE_ADAFACE", "0").strip() == "1" hf_token_present = bool(os.getenv("HF_TOKEN", "").strip()) print(f" ENABLE_ADAFACE={os.getenv('ENABLE_ADAFACE', 'NOT SET')}") print(f" HF_TOKEN present={'YES' if hf_token_present else 'NO (not set or empty)'}") if not enable: print("⚠️ AdaFace disabled (ENABLE_ADAFACE != 1) — using ArcFace zero-padded 1024-D") self.adaface_model = None return # Full loading code kept here for when AdaFace is re-enabled import sys HF_TOKEN = os.getenv("HF_TOKEN", None) REPO_ID = "minchul/cvlface_adaface_ir50_ms1mv2" CACHE_PATH = os.path.expanduser("~/.cvlface_cache/minchul/cvlface_adaface_ir50_ms1mv2") try: from huggingface_hub import hf_hub_download print("📦 Loading AdaFace IR-50 MS1MV2...") os.makedirs(CACHE_PATH, exist_ok=True) hf_hub_download(repo_id=REPO_ID, filename="files.txt", token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False) with open(os.path.join(CACHE_PATH, "files.txt")) as f: extra = [x.strip() for x in f.read().split("\n") if x.strip()] for fname in extra + ["config.json", "wrapper.py", "model.safetensors"]: fpath = os.path.join(CACHE_PATH, fname) if not os.path.exists(fpath): hf_hub_download(repo_id=REPO_ID, filename=fname, token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False) cwd = os.getcwd() os.chdir(CACHE_PATH) sys.path.insert(0, CACHE_PATH) try: from transformers import AutoModel as _HF_AutoModel model = _HF_AutoModel.from_pretrained( CACHE_PATH, trust_remote_code=True, token=HF_TOKEN) finally: os.chdir(cwd) if CACHE_PATH in sys.path: sys.path.remove(CACHE_PATH) model = model.to(self.device).eval() with torch.no_grad(): out = model(torch.zeros(1, 3, 112, 112).to(self.device)) emb = out if isinstance(out, torch.Tensor) else out.embedding assert emb.shape[-1] == ADAFACE_DIM self.adaface_model = model print(f"✅ AdaFace IR-50 loaded — 1024-D FULL FUSION active") except Exception as e: print(f"⚠️ AdaFace load failed: {e} — falling back to zero-padded 1024-D") self.adaface_model = None # ── Object Lane: batched SigLIP + DINOv2 embedding ─────────── def _embed_crops_batch(self, crops: list) -> list: """Embed a list of PIL images → list of 1536-D numpy arrays.""" if not crops: return [] with torch.no_grad(): # SigLIP sig_in = self.siglip_processor(images=crops, return_tensors="pt", padding=True) sig_in = {k: v.to(self.device) for k, v in sig_in.items()} if self.device == "cuda": sig_in = {k: v.half() if v.dtype == torch.float32 else v for k, v in sig_in.items()} sig_out = self.siglip_model.get_image_features(**sig_in) # Handle all output types across transformers versions if hasattr(sig_out, "image_embeds"): sig_out = sig_out.image_embeds elif hasattr(sig_out, "pooler_output"): sig_out = sig_out.pooler_output elif hasattr(sig_out, "last_hidden_state"): sig_out = sig_out.last_hidden_state[:, 0, :] elif isinstance(sig_out, tuple): sig_out = sig_out[0] # sig_out is now a tensor if not isinstance(sig_out, torch.Tensor): sig_out = sig_out[0] sig_vecs = F.normalize(sig_out.float(), p=2, dim=1).cpu() # DINOv2 dino_in = self.dinov2_processor(images=crops, return_tensors="pt") dino_in = {k: v.to(self.device) for k, v in dino_in.items()} if self.device == "cuda": dino_in = {k: v.half() if v.dtype == torch.float32 else v for k, v in dino_in.items()} dino_out = self.dinov2_model(**dino_in) dino_vecs = F.normalize( dino_out.last_hidden_state[:, 0, :].float(), p=2, dim=1).cpu() fused = F.normalize(torch.cat([sig_vecs, dino_vecs], dim=1), p=2, dim=1) return [fused[i].numpy() for i in range(len(crops))] # ── AdaFace embedding for a single face crop ───────────────── def _adaface_embed(self, face_arr_chw: np.ndarray) -> np.ndarray: """ Run AdaFace IR-50 MS1MV2 on a preprocessed (3,112,112) float32 array. Input : CHW float32, normalised to [-1, 1] Output: 512-D L2-normalised numpy embedding, or None on failure. The cvlface model may return a tensor directly or an object with an .embedding attribute — both cases handled. """ if self.adaface_model is None or face_arr_chw is None: return None try: t = torch.from_numpy(face_arr_chw).unsqueeze(0) # (1,3,112,112) t = t.to(self.device) if self.device == "cuda": t = t.half() with torch.no_grad(): out = self.adaface_model(t) # Handle both raw tensor and object-with-embedding outputs emb = out if isinstance(out, torch.Tensor) else out.embedding emb = F.normalize(emb.float(), p=2, dim=1) return emb[0].cpu().numpy() except Exception as e: print(f"⚠️ AdaFace inference error: {e}") return None # ── V4 Face detection + dual encoding ──────────────────────── def _detect_and_encode_faces(self, img_np: np.ndarray) -> list: """ Detect ALL faces using InsightFace SCRFD-10GF at 1280px. For each face: - ArcFace-R100 embedding (512-D, from InsightFace) - AdaFace IR-50 embedding (512-D, fused quality-adaptive) - Concatenate + L2-normalise → 1024-D final vector - Quality gate: det_score ≥ 0.60, face width ≥ 40px - Base64 thumbnail stored for UI Returns list of dicts with keys: type, vector (1024-D or 512-D), face_idx, bbox, face_crop, det_score, face_quality, face_width_px """ if self.face_app is None: print("⚠️ face_app is None — InsightFace not loaded") return [] try: # InsightFace expects BGR if img_np.dtype != np.uint8: img_np = (img_np * 255).astype(np.uint8) bgr = img_np[:, :, ::-1].copy() if img_np.shape[2] == 3 else img_np.copy() # ── Preprocessing: CLAHE contrast enhancement ───────── # Helps with dark/overexposed/low-contrast photos bgr_enhanced = _clahe_enhance(bgr) # ── Multi-scale + flip detection ────────────────────── # Run SCRFD at multiple resolutions AND on horizontally # flipped image. Catches faces that one scale/orientation misses. # Results are merged and deduplicated by IoU. all_raw_faces = [] H, W = bgr.shape[:2] for scale in DET_SCALES: # Resize to this scale for detection scale_w = min(W, scale[0]) scale_h = min(H, scale[1]) if scale_w == W and scale_h == H: bgr_scaled = bgr_enhanced else: bgr_scaled = cv2.resize(bgr_enhanced, (scale_w, scale_h)) print(f"🔍 SCRFD detection at {scale_w}×{scale_h}...") # Temporarily set det_size for this scale try: self.face_app.det_model.input_size = scale with self._face_lock: faces_at_scale = self.face_app.get(bgr_scaled) # Scale bboxes back to original dimensions sx = W / scale_w; sy = H / scale_h for f in faces_at_scale: if sx != 1.0 or sy != 1.0: f.bbox[0] *= sx; f.bbox[1] *= sy f.bbox[2] *= sx; f.bbox[3] *= sy all_raw_faces.extend(faces_at_scale) except Exception: pass # scale failed, continue # Horizontal flip pass — catches profile/turned faces bgr_flip = cv2.flip(bgr_enhanced, 1) try: self.face_app.det_model.input_size = DET_SIZE_PRIMARY with self._face_lock: faces_flip = self.face_app.get(bgr_flip) # Mirror bboxes back to original orientation for f in faces_flip: x1, y1, x2, y2 = f.bbox f.bbox[0] = W - x2; f.bbox[2] = W - x1 all_raw_faces.extend(faces_flip) except Exception: pass # Restore primary det_size self.face_app.det_model.input_size = DET_SIZE_PRIMARY # Deduplicate across scales and flip faces = _dedup_faces(all_raw_faces) print(f" Raw detections: {len(all_raw_faces)} → after dedup: {len(faces)}") results = [] accepted = 0 for idx, face in enumerate(faces): if accepted >= MAX_FACES_PER_IMAGE: break # ── Bounding box ────────────────────────────────── bbox_raw = face.bbox.astype(int) x1, y1, x2, y2 = bbox_raw x1 = max(0, x1); y1 = max(0, y1) x2 = min(bgr.shape[1], x2); y2 = min(bgr.shape[0], y2) w, h = x2 - x1, y2 - y1 if w <= 0 or h <= 0: continue # ── Quality gate 1: minimum size ────────────────── if w < MIN_FACE_SIZE or h < MIN_FACE_SIZE: print(f" Face {idx}: SKIP — too small ({w}×{h}px)") continue # ── Quality gate 2: detection confidence ────────── det_score = float(face.det_score) if hasattr(face, "det_score") else 1.0 if det_score < FACE_QUALITY_GATE: print(f" Face {idx}: SKIP — low det_score ({det_score:.3f})") continue # ── ArcFace embedding (from InsightFace) ────────── if face.embedding is None: continue arcface_vec = face.embedding.astype(np.float32) n = np.linalg.norm(arcface_vec) if n > 0: arcface_vec = arcface_vec / n # ── AdaFace embedding (quality-adaptive) ────────── face_chw = _face_crop_for_adaface(bgr, x1, y1, x2, y2) adaface_vec = self._adaface_embed(face_chw) # ── Fuse: ArcFace + AdaFace → 1024-D ───────────── # ALWAYS output FUSED_FACE_DIM (1024) so Pinecone index # dimension never mismatches, regardless of AdaFace status. if adaface_vec is not None: # Full fusion: ArcFace(512) + AdaFace(512) → 1024-D fused_raw = np.concatenate([arcface_vec, adaface_vec]) else: # AdaFace unavailable — pad with zeros to maintain 1024-D # The ArcFace half still carries full identity signal; # zero padding is neutral and doesn't corrupt similarity. print(" ⚠️ AdaFace unavailable — padding to 1024-D") fused_raw = np.concatenate([arcface_vec, np.zeros(ADAFACE_DIM, dtype=np.float32)]) n2 = np.linalg.norm(fused_raw) final_vec = (fused_raw / n2) if n2 > 0 else fused_raw vec_dim = FUSED_FACE_DIM # always 1024 # ── Face crop thumbnail for UI ───────────────────── face_crop_b64 = _crop_to_b64(bgr, x1, y1, x2, y2) results.append({ "type": "face", "vector": final_vec, "vec_dim": vec_dim, "face_idx": accepted, "bbox": [int(x1), int(y1), int(w), int(h)], "face_crop": face_crop_b64, "det_score": det_score, "face_quality": det_score, # alias for metadata "face_width_px": int(w), }) accepted += 1 print(f" Face {idx}: ACCEPTED — {w}×{h}px | " f"det={det_score:.3f} | dim={vec_dim}") print(f"👤 {accepted} face(s) passed quality gate") return results except Exception as e: print(f"🟠 InsightFace error: {e}") print(traceback.format_exc()[-600:]) return [] # ── Main process_image ──────────────────────────────────────── def process_image( self, image_path: str, is_query: bool = False, detect_faces: bool = True, ) -> list: """ Full pipeline for one image. Returns list of vector dicts: Face: {type, vector (1024-D), face_idx, bbox, face_crop, det_score, face_quality, face_width_px} Object: {type, vector (1536-D)} V4 changes vs V3: - SCRFD at 1280px (not 640) — catches small/group faces - buffalo_l always (not buffalo_sc on CPU) - ArcFace + AdaFace fused 1024-D vectors - Quality gate: det_score ≥ 0.60, width ≥ 40px - Multi-scale: detect at 1280, retry at 640 if 0 faces found """ cache_key = f"{_img_hash(image_path)}_{detect_faces}_{is_query}" if cache_key in self._cache: print("⚡ Cache hit") return self._cache[cache_key] extracted = [] original_pil = Image.open(image_path).convert("RGB") img_np = np.array(original_pil) # RGB uint8 faces_found = False # ════════════════════════════════════════════════════════ # FACE LANE # V4: Run at full resolution (up to 1280px) to catch small # faces in group photos. If 0 faces detected, retry at # the original resolution (multi-scale fallback). # ════════════════════════════════════════════════════════ if detect_faces and self.face_app is not None: # Multi-scale + CLAHE + flip all handled inside _detect_and_encode_faces # Pass the full-resolution image — internal scaling handles the rest face_results = self._detect_and_encode_faces(img_np) if face_results: faces_found = True for fr in face_results: extracted.append(fr) # ════════════════════════════════════════════════════════ # OBJECT LANE # Always runs — even when faces are found. # PERSON-class YOLO crops are skipped when faces active # to avoid double-counting people. # ════════════════════════════════════════════════════════ crops_pil = [_resize_pil(original_pil, MAX_IMAGE_SIZE)] # full image yolo_results = self.yolo(image_path, conf=0.5, verbose=False) for r in yolo_results: if r.masks is not None: for seg_idx, mask_xy in enumerate(r.masks.xy): cls_id = int(r.boxes.cls[seg_idx].item()) if faces_found and cls_id == YOLO_PERSON_CLASS_ID: continue polygon = np.array(mask_xy, dtype=np.int32) if len(polygon) < 3: continue x, y, w, h = cv2.boundingRect(polygon) if w < 30 or h < 30: continue crop = original_pil.crop((x, y, x + w, y + h)) crops_pil.append(crop) if len(crops_pil) >= MAX_CROPS + 1: break elif r.boxes is not None: for box in r.boxes: cls_id = int(box.cls.item()) if faces_found and cls_id == YOLO_PERSON_CLASS_ID: continue x1, y1, x2, y2 = box.xyxy[0].tolist() if (x2 - x1) < 30 or (y2 - y1) < 30: continue crop = original_pil.crop((x1, y1, x2, y2)) crops_pil.append(crop) if len(crops_pil) >= MAX_CROPS + 1: break crops = [_resize_pil(c, MAX_IMAGE_SIZE) for c in crops_pil] print(f"🧠 Embedding {len(crops)} object crop(s)...") obj_vecs = self._embed_crops_batch(crops) for vec in obj_vecs: extracted.append({"type": "object", "vector": vec}) # Cache if len(self._cache) >= self._cache_maxsize: del self._cache[next(iter(self._cache))] self._cache[cache_key] = extracted return extracted async def process_image_async( self, image_path: str, is_query: bool = False, detect_faces: bool = True, ) -> list: loop = asyncio.get_event_loop() return await loop.run_in_executor( None, functools.partial(self.process_image, image_path, is_query, detect_faces), )