visual-search-api / src /models.py
AdarshDRC's picture
fix : improved the retrival using face search
572243e
# src/models.py — Enterprise Lens V4
# ════════════════════════════════════════════════════════════════════
# Face Lane : InsightFace SCRFD-10GF + ArcFace-R100 (buffalo_l)
# + AdaFace IR-50 (WebFace4M) fused → 1024-D vector
# • det_size=(1280,1280) — catches small/group faces
# • Quality gate: det_score ≥ 0.60, face_px ≥ 40
# • Multi-scale: runs detection at 2 scales, merges
# • Stores one 1024-D vector PER face
# • Each vector carries base64 face-crop thumbnail
# • face_quality_score + face_width_px in metadata
#
# Object Lane: SigLIP + DINOv2 fused 1536-D (unchanged from V3)
# ════════════════════════════════════════════════════════════════════
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import asyncio
import base64
import functools
import hashlib
import io
import threading
import traceback
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
from transformers import AutoImageProcessor, AutoModel, AutoProcessor
from ultralytics import YOLO
# ── InsightFace ───────────────────────────────────────────────────
try:
import insightface
from insightface.app import FaceAnalysis
INSIGHTFACE_AVAILABLE = True
except ImportError:
INSIGHTFACE_AVAILABLE = False
print("⚠️ insightface not installed — face lane disabled")
print(" Run: pip install insightface onnxruntime-silicon (mac)")
print(" pip install insightface onnxruntime (linux/win)")
# ── AdaFace ──────────────────────────────────────────────────────
# Disabled by default — enable by setting ENABLE_ADAFACE=1 env var.
# When disabled: ArcFace(512) + zeros(512) = 1024-D (fully functional).
ADAFACE_WEIGHTS_AVAILABLE = False # controlled by ENABLE_ADAFACE env var
# ── Constants ─────────────────────────────────────────────────────
YOLO_PERSON_CLASS_ID = 0
MIN_FACE_SIZE = 20 # lowered: 40 missed small faces in group photos
MAX_FACES_PER_IMAGE = 12 # slightly higher cap for group photos
MAX_CROPS = 6 # max YOLO object crops per image
MAX_IMAGE_SIZE = 640 # object lane longest edge
DET_SIZE_PRIMARY = (1280, 1280) # V4: 1280 for small-face detection
DET_SIZE_SECONDARY = (640, 640) # fallback / 2nd scale
FACE_CROP_THUMB_SIZE = 112 # face thumbnail for Pinecone metadata
FACE_CROP_QUALITY = 80 # JPEG quality for thumbnails
FACE_QUALITY_GATE = 0.35 # lowered from 0.60 — accepts sunglasses, angles, smiles
# Multi-scale pyramid — tried in order, results merged with IoU dedup
DET_SCALES = [(1280, 1280), (960, 960), (640, 640)]
IOU_DEDUP_THRESHOLD = 0.45 # suppress duplicate detections across scales
FACE_DIM = 512 # ArcFace embedding dimension
ADAFACE_DIM = 512 # AdaFace embedding dimension
FUSED_FACE_DIM = 1024 # ArcFace + AdaFace concatenated
# ════════════════════════════════════════════════════════════════
# Utility functions
# ════════════════════════════════════════════════════════════════
def _resize_pil(img: Image.Image, max_side: int = MAX_IMAGE_SIZE) -> Image.Image:
w, h = img.size
if max(w, h) <= max_side:
return img
scale = max_side / max(w, h)
return img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
def _img_hash(image_path: str) -> str:
h = hashlib.md5()
with open(image_path, "rb") as f:
h.update(f.read(65536))
return h.hexdigest()
def _crop_to_b64(
img_bgr: np.ndarray,
x1: int, y1: int, x2: int, y2: int,
thumb_size: int = FACE_CROP_THUMB_SIZE,
) -> str:
"""Crop face from BGR image with 20% padding, return base64 JPEG thumbnail."""
H, W = img_bgr.shape[:2]
w, h = x2 - x1, y2 - y1
pad_x = int(w * 0.20)
pad_y = int(h * 0.20)
cx1 = max(0, x1 - pad_x)
cy1 = max(0, y1 - pad_y)
cx2 = min(W, x2 + pad_x)
cy2 = min(H, y2 + pad_y)
crop = img_bgr[cy1:cy2, cx1:cx2]
if crop.size == 0:
return ""
pil = Image.fromarray(crop[:, :, ::-1]) # BGR → RGB
pil = pil.resize((thumb_size, thumb_size), Image.LANCZOS)
buf = io.BytesIO()
pil.save(buf, format="JPEG", quality=FACE_CROP_QUALITY)
return base64.b64encode(buf.getvalue()).decode()
def _face_crop_for_adaface(
img_bgr: np.ndarray,
x1: int, y1: int, x2: int, y2: int,
) -> np.ndarray:
"""
Crop and normalise face for AdaFace IR-50 input.
Returns float32 numpy array (3, 112, 112) normalised to [-1, 1].
"""
H, W = img_bgr.shape[:2]
w, h = x2 - x1, y2 - y1
pad_x = int(w * 0.10)
pad_y = int(h * 0.10)
cx1 = max(0, x1 - pad_x)
cy1 = max(0, y1 - pad_y)
cx2 = min(W, x2 + pad_x)
cy2 = min(H, y2 + pad_y)
crop = img_bgr[cy1:cy2, cx1:cx2]
if crop.size == 0:
return None
rgb = crop[:, :, ::-1].copy() # BGR → RGB
pil = Image.fromarray(rgb).resize((112, 112), Image.LANCZOS)
arr = np.array(pil, dtype=np.float32) / 255.0
arr = (arr - 0.5) / 0.5 # normalise [-1, 1]
return arr.transpose(2, 0, 1) # HWC → CHW
def _clahe_enhance(bgr: np.ndarray) -> np.ndarray:
"""CLAHE on luminance — improves detection on dark/washed/low-contrast photos."""
lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
l_eq = clahe.apply(l)
return cv2.cvtColor(cv2.merge([l_eq, a, b]), cv2.COLOR_LAB2BGR)
def _iou(box_a: list, box_b: list) -> float:
"""IoU between two [x1,y1,x2,y2] boxes."""
xa = max(box_a[0], box_b[0]); ya = max(box_a[1], box_b[1])
xb = min(box_a[2], box_b[2]); yb = min(box_a[3], box_b[3])
inter = max(0, xb - xa) * max(0, yb - ya)
if inter == 0:
return 0.0
area_a = (box_a[2]-box_a[0]) * (box_a[3]-box_a[1])
area_b = (box_b[2]-box_b[0]) * (box_b[3]-box_b[1])
return inter / (area_a + area_b - inter)
def _dedup_faces(faces_list: list, iou_thresh: float = IOU_DEDUP_THRESHOLD) -> list:
"""Remove duplicate detections across scales/flips. Keep highest det_score."""
if not faces_list:
return []
faces_list = sorted(faces_list, key=lambda f: float(f.det_score), reverse=True)
kept = []
for face in faces_list:
b = face.bbox.astype(int)
box = [b[0], b[1], b[2], b[3]]
duplicate = any(_iou(box, [k.bbox.astype(int)[i] for i in range(4)]) > iou_thresh for k in kept)
if not duplicate:
kept.append(face)
return kept
# ════════════════════════════════════════════════════════════════
# AIModelManager — V4
# ════════════════════════════════════════════════════════════════
class AIModelManager:
def __init__(self):
self.device = (
"cuda" if torch.cuda.is_available()
else ("mps" if torch.backends.mps.is_available() else "cpu")
)
print(f"🚀 Loading models onto: {self.device.upper()}...")
# ── Object Lane: SigLIP + DINOv2 (unchanged) ─────────────
print("📦 Loading SigLIP...")
self.siglip_processor = AutoProcessor.from_pretrained(
"google/siglip-base-patch16-224", use_fast=True)
self.siglip_model = AutoModel.from_pretrained(
"google/siglip-base-patch16-224").to(self.device).eval()
print("📦 Loading DINOv2...")
self.dinov2_processor = AutoImageProcessor.from_pretrained("facebook/dinov2-base")
self.dinov2_model = AutoModel.from_pretrained(
"facebook/dinov2-base").to(self.device).eval()
if self.device == "cuda":
self.siglip_model = self.siglip_model.half()
self.dinov2_model = self.dinov2_model.half()
# ── YOLO for object segmentation ─────────────────────────
print("📦 Loading YOLO11n-seg...")
self.yolo = YOLO("yolo11n-seg.pt")
# ── Face Lane: InsightFace SCRFD + ArcFace-R100 ───────────
# V4: ALWAYS use buffalo_l (SCRFD-10GF + ArcFace-R100)
# even on CPU — accuracy matters more than speed here.
# det_size=1280 catches faces as small as ~10px in source.
self.face_app = None
if INSIGHTFACE_AVAILABLE:
try:
print("📦 Loading InsightFace buffalo_l (SCRFD-10GF + ArcFace-R100)...")
self.face_app = FaceAnalysis(
name="buffalo_l",
providers=(
["CUDAExecutionProvider", "CPUExecutionProvider"]
if self.device == "cuda"
else ["CPUExecutionProvider"]
),
)
self.face_app.prepare(
ctx_id=0 if self.device == "cuda" else -1,
det_size=DET_SIZE_PRIMARY, # 1280×1280 — key for small faces
)
# Warmup
test_img = np.zeros((112, 112, 3), dtype=np.uint8)
self.face_app.get(test_img)
print("✅ InsightFace buffalo_l loaded — SCRFD+ArcFace face lane ACTIVE")
print(f" det_size={DET_SIZE_PRIMARY} | quality_gate={FACE_QUALITY_GATE}")
except Exception as e:
print(f"❌ InsightFace init FAILED: {e}")
print(traceback.format_exc())
self.face_app = None
else:
print("❌ InsightFace NOT installed")
# ── AdaFace IR-50 (CVPR 2022) — quality-adaptive fusion ───
# Fused with ArcFace → 1024-D face vector
# Weights: adaface_ir50_webface4m.ckpt from HuggingFace
self.adaface_model = None
self._load_adaface()
# Thread safety for ONNX
self._face_lock = threading.Lock()
self._cache = {}
self._cache_maxsize = 128
adaface_status = "FULL FUSION u2705" if self.adaface_model else "ZERO-PADDED u26a0ufe0f (AdaFace weights missing)"
print("")
print("u2705 Enterprise Lens V4 u2014 Models Ready")
print(f" Device : {self.device.upper()}")
print(f" InsightFace : buffalo_l (SCRFD-10GF + ArcFace-R100)")
print(f" AdaFace : {adaface_status}")
print(f" Face vector dim : {FUSED_FACE_DIM} <- enterprise-faces MUST be {FUSED_FACE_DIM}-D")
print(f" Object vector dim : 1536 <- enterprise-objects MUST be 1536-D")
print(f" Quality gate : det_score >= {FACE_QUALITY_GATE}, face_px >= {MIN_FACE_SIZE}")
print(f" Detection size : {DET_SIZE_PRIMARY}")
print("")
def _load_adaface(self):
"""
AdaFace IR-50 MS1MV2 — disabled for now.
Face vectors use ArcFace(512) + zeros(512) = 1024-D.
This is fully functional — cosine similarity works correctly.
Re-enable by setting ENABLE_ADAFACE=1 env var when HF token
injection into Docker build is confirmed working.
"""
enable = os.getenv("ENABLE_ADAFACE", "0").strip() == "1"
hf_token_present = bool(os.getenv("HF_TOKEN", "").strip())
print(f" ENABLE_ADAFACE={os.getenv('ENABLE_ADAFACE', 'NOT SET')}")
print(f" HF_TOKEN present={'YES' if hf_token_present else 'NO (not set or empty)'}")
if not enable:
print("⚠️ AdaFace disabled (ENABLE_ADAFACE != 1) — using ArcFace zero-padded 1024-D")
self.adaface_model = None
return
# Full loading code kept here for when AdaFace is re-enabled
import sys
HF_TOKEN = os.getenv("HF_TOKEN", None)
REPO_ID = "minchul/cvlface_adaface_ir50_ms1mv2"
CACHE_PATH = os.path.expanduser("~/.cvlface_cache/minchul/cvlface_adaface_ir50_ms1mv2")
try:
from huggingface_hub import hf_hub_download
print("📦 Loading AdaFace IR-50 MS1MV2...")
os.makedirs(CACHE_PATH, exist_ok=True)
hf_hub_download(repo_id=REPO_ID, filename="files.txt",
token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False)
with open(os.path.join(CACHE_PATH, "files.txt")) as f:
extra = [x.strip() for x in f.read().split("\n") if x.strip()]
for fname in extra + ["config.json", "wrapper.py", "model.safetensors"]:
fpath = os.path.join(CACHE_PATH, fname)
if not os.path.exists(fpath):
hf_hub_download(repo_id=REPO_ID, filename=fname,
token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False)
cwd = os.getcwd()
os.chdir(CACHE_PATH)
sys.path.insert(0, CACHE_PATH)
try:
from transformers import AutoModel as _HF_AutoModel
model = _HF_AutoModel.from_pretrained(
CACHE_PATH, trust_remote_code=True, token=HF_TOKEN)
finally:
os.chdir(cwd)
if CACHE_PATH in sys.path: sys.path.remove(CACHE_PATH)
model = model.to(self.device).eval()
with torch.no_grad():
out = model(torch.zeros(1, 3, 112, 112).to(self.device))
emb = out if isinstance(out, torch.Tensor) else out.embedding
assert emb.shape[-1] == ADAFACE_DIM
self.adaface_model = model
print(f"✅ AdaFace IR-50 loaded — 1024-D FULL FUSION active")
except Exception as e:
print(f"⚠️ AdaFace load failed: {e} — falling back to zero-padded 1024-D")
self.adaface_model = None
# ── Object Lane: batched SigLIP + DINOv2 embedding ───────────
def _embed_crops_batch(self, crops: list) -> list:
"""Embed a list of PIL images → list of 1536-D numpy arrays."""
if not crops:
return []
with torch.no_grad():
# SigLIP
sig_in = self.siglip_processor(images=crops, return_tensors="pt", padding=True)
sig_in = {k: v.to(self.device) for k, v in sig_in.items()}
if self.device == "cuda":
sig_in = {k: v.half() if v.dtype == torch.float32 else v
for k, v in sig_in.items()}
sig_out = self.siglip_model.get_image_features(**sig_in)
# Handle all output types across transformers versions
if hasattr(sig_out, "image_embeds"):
sig_out = sig_out.image_embeds
elif hasattr(sig_out, "pooler_output"):
sig_out = sig_out.pooler_output
elif hasattr(sig_out, "last_hidden_state"):
sig_out = sig_out.last_hidden_state[:, 0, :]
elif isinstance(sig_out, tuple):
sig_out = sig_out[0]
# sig_out is now a tensor
if not isinstance(sig_out, torch.Tensor):
sig_out = sig_out[0]
sig_vecs = F.normalize(sig_out.float(), p=2, dim=1).cpu()
# DINOv2
dino_in = self.dinov2_processor(images=crops, return_tensors="pt")
dino_in = {k: v.to(self.device) for k, v in dino_in.items()}
if self.device == "cuda":
dino_in = {k: v.half() if v.dtype == torch.float32 else v
for k, v in dino_in.items()}
dino_out = self.dinov2_model(**dino_in)
dino_vecs = F.normalize(
dino_out.last_hidden_state[:, 0, :].float(), p=2, dim=1).cpu()
fused = F.normalize(torch.cat([sig_vecs, dino_vecs], dim=1), p=2, dim=1)
return [fused[i].numpy() for i in range(len(crops))]
# ── AdaFace embedding for a single face crop ─────────────────
def _adaface_embed(self, face_arr_chw: np.ndarray) -> np.ndarray:
"""
Run AdaFace IR-50 MS1MV2 on a preprocessed (3,112,112) float32 array.
Input : CHW float32, normalised to [-1, 1]
Output: 512-D L2-normalised numpy embedding, or None on failure.
The cvlface model may return a tensor directly or an object
with an .embedding attribute — both cases handled.
"""
if self.adaface_model is None or face_arr_chw is None:
return None
try:
t = torch.from_numpy(face_arr_chw).unsqueeze(0) # (1,3,112,112)
t = t.to(self.device)
if self.device == "cuda":
t = t.half()
with torch.no_grad():
out = self.adaface_model(t)
# Handle both raw tensor and object-with-embedding outputs
emb = out if isinstance(out, torch.Tensor) else out.embedding
emb = F.normalize(emb.float(), p=2, dim=1)
return emb[0].cpu().numpy()
except Exception as e:
print(f"⚠️ AdaFace inference error: {e}")
return None
# ── V4 Face detection + dual encoding ────────────────────────
def _detect_and_encode_faces(self, img_np: np.ndarray) -> list:
"""
Detect ALL faces using InsightFace SCRFD-10GF at 1280px.
For each face:
- ArcFace-R100 embedding (512-D, from InsightFace)
- AdaFace IR-50 embedding (512-D, fused quality-adaptive)
- Concatenate + L2-normalise → 1024-D final vector
- Quality gate: det_score ≥ 0.60, face width ≥ 40px
- Base64 thumbnail stored for UI
Returns list of dicts with keys:
type, vector (1024-D or 512-D), face_idx, bbox,
face_crop, det_score, face_quality, face_width_px
"""
if self.face_app is None:
print("⚠️ face_app is None — InsightFace not loaded")
return []
try:
# InsightFace expects BGR
if img_np.dtype != np.uint8:
img_np = (img_np * 255).astype(np.uint8)
bgr = img_np[:, :, ::-1].copy() if img_np.shape[2] == 3 else img_np.copy()
# ── Preprocessing: CLAHE contrast enhancement ─────────
# Helps with dark/overexposed/low-contrast photos
bgr_enhanced = _clahe_enhance(bgr)
# ── Multi-scale + flip detection ──────────────────────
# Run SCRFD at multiple resolutions AND on horizontally
# flipped image. Catches faces that one scale/orientation misses.
# Results are merged and deduplicated by IoU.
all_raw_faces = []
H, W = bgr.shape[:2]
for scale in DET_SCALES:
# Resize to this scale for detection
scale_w = min(W, scale[0])
scale_h = min(H, scale[1])
if scale_w == W and scale_h == H:
bgr_scaled = bgr_enhanced
else:
bgr_scaled = cv2.resize(bgr_enhanced, (scale_w, scale_h))
print(f"🔍 SCRFD detection at {scale_w}×{scale_h}...")
# Temporarily set det_size for this scale
try:
self.face_app.det_model.input_size = scale
with self._face_lock:
faces_at_scale = self.face_app.get(bgr_scaled)
# Scale bboxes back to original dimensions
sx = W / scale_w; sy = H / scale_h
for f in faces_at_scale:
if sx != 1.0 or sy != 1.0:
f.bbox[0] *= sx; f.bbox[1] *= sy
f.bbox[2] *= sx; f.bbox[3] *= sy
all_raw_faces.extend(faces_at_scale)
except Exception:
pass # scale failed, continue
# Horizontal flip pass — catches profile/turned faces
bgr_flip = cv2.flip(bgr_enhanced, 1)
try:
self.face_app.det_model.input_size = DET_SIZE_PRIMARY
with self._face_lock:
faces_flip = self.face_app.get(bgr_flip)
# Mirror bboxes back to original orientation
for f in faces_flip:
x1, y1, x2, y2 = f.bbox
f.bbox[0] = W - x2; f.bbox[2] = W - x1
all_raw_faces.extend(faces_flip)
except Exception:
pass
# Restore primary det_size
self.face_app.det_model.input_size = DET_SIZE_PRIMARY
# Deduplicate across scales and flip
faces = _dedup_faces(all_raw_faces)
print(f" Raw detections: {len(all_raw_faces)} → after dedup: {len(faces)}")
results = []
accepted = 0
for idx, face in enumerate(faces):
if accepted >= MAX_FACES_PER_IMAGE:
break
# ── Bounding box ──────────────────────────────────
bbox_raw = face.bbox.astype(int)
x1, y1, x2, y2 = bbox_raw
x1 = max(0, x1); y1 = max(0, y1)
x2 = min(bgr.shape[1], x2); y2 = min(bgr.shape[0], y2)
w, h = x2 - x1, y2 - y1
if w <= 0 or h <= 0:
continue
# ── Quality gate 1: minimum size ──────────────────
if w < MIN_FACE_SIZE or h < MIN_FACE_SIZE:
print(f" Face {idx}: SKIP — too small ({w}×{h}px)")
continue
# ── Quality gate 2: detection confidence ──────────
det_score = float(face.det_score) if hasattr(face, "det_score") else 1.0
if det_score < FACE_QUALITY_GATE:
print(f" Face {idx}: SKIP — low det_score ({det_score:.3f})")
continue
# ── ArcFace embedding (from InsightFace) ──────────
if face.embedding is None:
continue
arcface_vec = face.embedding.astype(np.float32)
n = np.linalg.norm(arcface_vec)
if n > 0:
arcface_vec = arcface_vec / n
# ── AdaFace embedding (quality-adaptive) ──────────
face_chw = _face_crop_for_adaface(bgr, x1, y1, x2, y2)
adaface_vec = self._adaface_embed(face_chw)
# ── Fuse: ArcFace + AdaFace → 1024-D ─────────────
# ALWAYS output FUSED_FACE_DIM (1024) so Pinecone index
# dimension never mismatches, regardless of AdaFace status.
if adaface_vec is not None:
# Full fusion: ArcFace(512) + AdaFace(512) → 1024-D
fused_raw = np.concatenate([arcface_vec, adaface_vec])
else:
# AdaFace unavailable — pad with zeros to maintain 1024-D
# The ArcFace half still carries full identity signal;
# zero padding is neutral and doesn't corrupt similarity.
print(" ⚠️ AdaFace unavailable — padding to 1024-D")
fused_raw = np.concatenate([arcface_vec,
np.zeros(ADAFACE_DIM, dtype=np.float32)])
n2 = np.linalg.norm(fused_raw)
final_vec = (fused_raw / n2) if n2 > 0 else fused_raw
vec_dim = FUSED_FACE_DIM # always 1024
# ── Face crop thumbnail for UI ─────────────────────
face_crop_b64 = _crop_to_b64(bgr, x1, y1, x2, y2)
results.append({
"type": "face",
"vector": final_vec,
"vec_dim": vec_dim,
"face_idx": accepted,
"bbox": [int(x1), int(y1), int(w), int(h)],
"face_crop": face_crop_b64,
"det_score": det_score,
"face_quality": det_score, # alias for metadata
"face_width_px": int(w),
})
accepted += 1
print(f" Face {idx}: ACCEPTED — {w}×{h}px | "
f"det={det_score:.3f} | dim={vec_dim}")
print(f"👤 {accepted} face(s) passed quality gate")
return results
except Exception as e:
print(f"🟠 InsightFace error: {e}")
print(traceback.format_exc()[-600:])
return []
# ── Main process_image ────────────────────────────────────────
def process_image(
self,
image_path: str,
is_query: bool = False,
detect_faces: bool = True,
) -> list:
"""
Full pipeline for one image.
Returns list of vector dicts:
Face: {type, vector (1024-D), face_idx, bbox, face_crop,
det_score, face_quality, face_width_px}
Object: {type, vector (1536-D)}
V4 changes vs V3:
- SCRFD at 1280px (not 640) — catches small/group faces
- buffalo_l always (not buffalo_sc on CPU)
- ArcFace + AdaFace fused 1024-D vectors
- Quality gate: det_score ≥ 0.60, width ≥ 40px
- Multi-scale: detect at 1280, retry at 640 if 0 faces found
"""
cache_key = f"{_img_hash(image_path)}_{detect_faces}_{is_query}"
if cache_key in self._cache:
print("⚡ Cache hit")
return self._cache[cache_key]
extracted = []
original_pil = Image.open(image_path).convert("RGB")
img_np = np.array(original_pil) # RGB uint8
faces_found = False
# ════════════════════════════════════════════════════════
# FACE LANE
# V4: Run at full resolution (up to 1280px) to catch small
# faces in group photos. If 0 faces detected, retry at
# the original resolution (multi-scale fallback).
# ════════════════════════════════════════════════════════
if detect_faces and self.face_app is not None:
# Multi-scale + CLAHE + flip all handled inside _detect_and_encode_faces
# Pass the full-resolution image — internal scaling handles the rest
face_results = self._detect_and_encode_faces(img_np)
if face_results:
faces_found = True
for fr in face_results:
extracted.append(fr)
# ════════════════════════════════════════════════════════
# OBJECT LANE
# Always runs — even when faces are found.
# PERSON-class YOLO crops are skipped when faces active
# to avoid double-counting people.
# ════════════════════════════════════════════════════════
crops_pil = [_resize_pil(original_pil, MAX_IMAGE_SIZE)] # full image
yolo_results = self.yolo(image_path, conf=0.5, verbose=False)
for r in yolo_results:
if r.masks is not None:
for seg_idx, mask_xy in enumerate(r.masks.xy):
cls_id = int(r.boxes.cls[seg_idx].item())
if faces_found and cls_id == YOLO_PERSON_CLASS_ID:
continue
polygon = np.array(mask_xy, dtype=np.int32)
if len(polygon) < 3:
continue
x, y, w, h = cv2.boundingRect(polygon)
if w < 30 or h < 30:
continue
crop = original_pil.crop((x, y, x + w, y + h))
crops_pil.append(crop)
if len(crops_pil) >= MAX_CROPS + 1:
break
elif r.boxes is not None:
for box in r.boxes:
cls_id = int(box.cls.item())
if faces_found and cls_id == YOLO_PERSON_CLASS_ID:
continue
x1, y1, x2, y2 = box.xyxy[0].tolist()
if (x2 - x1) < 30 or (y2 - y1) < 30:
continue
crop = original_pil.crop((x1, y1, x2, y2))
crops_pil.append(crop)
if len(crops_pil) >= MAX_CROPS + 1:
break
crops = [_resize_pil(c, MAX_IMAGE_SIZE) for c in crops_pil]
print(f"🧠 Embedding {len(crops)} object crop(s)...")
obj_vecs = self._embed_crops_batch(crops)
for vec in obj_vecs:
extracted.append({"type": "object", "vector": vec})
# Cache
if len(self._cache) >= self._cache_maxsize:
del self._cache[next(iter(self._cache))]
self._cache[cache_key] = extracted
return extracted
async def process_image_async(
self,
image_path: str,
is_query: bool = False,
detect_faces: bool = True,
) -> list:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
functools.partial(self.process_image, image_path, is_query, detect_faces),
)