Spaces:
Runtime error
Runtime error
| import os, io, base64, urllib.request, ssl, time, json, pathlib | |
| from typing import Optional, List | |
| import numpy as np, cv2 | |
| from ultralytics import YOLO | |
| import easyocr | |
| from PIL import Image | |
| import pillow_heif | |
| from fastapi import FastAPI, HTTPException, File, UploadFile, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| os.environ.setdefault("YOLO_CONFIG_DIR", "/tmp/Ultralytics") | |
| os.environ.setdefault("HF_HOME", "/tmp/.cache/huggingface") | |
| os.makedirs(os.environ["YOLO_CONFIG_DIR"], exist_ok=True) | |
| os.environ.setdefault("OMP_NUM_THREADS", "2") | |
| os.environ.setdefault("OPENBLAS_NUM_THREADS", "2") | |
| os.environ.setdefault("MKL_NUM_THREADS", "2") | |
| import torch | |
| torch.set_num_threads(2) | |
| TMP_DIR = "/tmp" | |
| paths = [ | |
| f"{TMP_DIR}/Ultralytics", | |
| f"{TMP_DIR}/.EasyOCR", | |
| f"{TMP_DIR}/.EasyOCR/user_network", | |
| f"{TMP_DIR}/mplconfig", | |
| ] | |
| for p in paths: | |
| os.makedirs(p, exist_ok=True) | |
| from huggingface_hub import hf_hub_download | |
| # --- PESOS COMPATIBLES ULTRALYTICS (YOLOv11) --- | |
| REPO_ID = "morsetechlab/yolov11-license-plate-detection" | |
| FILENAME = "license-plate-finetune-v1n.pt" # o v1s/v1m/v1l/v1x | |
| WEIGHTS = hf_hub_download(repo_id=REPO_ID, filename=FILENAME) | |
| yolo = YOLO(WEIGHTS) | |
| # EasyOCR con GPU si está disponible | |
| reader = easyocr.Reader( | |
| ['en'], | |
| gpu=torch.cuda.is_available(), | |
| model_storage_directory="/tmp/.EasyOCR", | |
| ) | |
| ALLOW = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | |
| def clamp(v, lo, hi): | |
| return max(lo, min(hi, v)) | |
| def expand_box(xyxy, w, h, pad_ratio=0.10): | |
| x1, y1, x2, y2 = [int(v) for v in xyxy] | |
| bw, bh = x2 - x1, y2 - y1 | |
| px, py = int(bw * pad_ratio), int(bh * pad_ratio) | |
| nx1 = clamp(x1 - px, 0, w - 1) | |
| ny1 = clamp(y1 - py, 0, h - 1) | |
| nx2 = clamp(x2 + px, 0, w - 1) | |
| ny2 = clamp(y2 + py, 0, h - 1) | |
| return nx1, ny1, nx2, ny2 | |
| def ensure_min_size(img_bgr, target_long=320): | |
| h, w = img_bgr.shape[:2] | |
| m = max(h, w) | |
| if m < target_long: | |
| scale = target_long / float(m) | |
| nh, nw = int(round(h * scale)), int(round(w * scale)) | |
| img_bgr = cv2.resize(img_bgr, (nw, nh), interpolation=cv2.INTER_CUBIC) | |
| return img_bgr | |
| def preproc_adaptive(plate_bgr): | |
| img = ensure_min_size(plate_bgr) # asegura tamaño | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.bilateralFilter(gray, 7, 50, 50) | |
| th = cv2.adaptiveThreshold( | |
| gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 5 | |
| ) | |
| # opcional: cerrar huecos finos | |
| k = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) | |
| th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, k, iterations=1) | |
| return th | |
| def preproc_clahe_otsu(plate_bgr): | |
| img = ensure_min_size(plate_bgr) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
| eq = clahe.apply(gray) | |
| _, th = cv2.threshold(eq, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| k = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) | |
| th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, k, iterations=1) | |
| return th | |
| def read_easy(img, allow=ALLOW): | |
| out = reader.readtext(img, detail=1, allowlist=allow) | |
| cands = [] | |
| for _, text, score in out: | |
| t = "".join(c for c in (text or "").upper() if c in allow) | |
| if len(t) >= 4: | |
| cands.append((t, float(score))) | |
| if not cands: | |
| return "", 0.0 | |
| cands.sort(key=lambda x: (x[1], len(x[0])), reverse=True) | |
| return cands[0] | |
| def preprocess_for_ocr(plate_bgr): | |
| img = plate_bgr.copy() | |
| h, w = img.shape[:2] | |
| if max(h, w) < 160: | |
| img = cv2.resize(img, (w*2, h*2), interpolation=cv2.INTER_CUBIC) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.bilateralFilter(gray, 7, 50, 50) | |
| th = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 31, 5) | |
| return th | |
| def ocr_plate(plate_bgr): | |
| # 1) adaptativa | |
| t, s = read_easy(preproc_adaptive(plate_bgr)) | |
| if t: | |
| return t, s | |
| # 2) CLAHE + Otsu (fallback) | |
| return read_easy(preproc_clahe_otsu(plate_bgr)) | |
| def draw_box_text(img, xyxy, text, color=(0, 255, 0)): | |
| x1, y1, x2, y2 = [int(v) for v in xyxy] | |
| cv2.rectangle(img, (x1,y1), (x2,y2), color, 2) | |
| if text: | |
| tsize = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] | |
| cv2.rectangle(img, (x1, y1 - tsize[1] - 6), (x1 + tsize[0] + 4, y1), color, -1) | |
| cv2.putText(img, text, (x1 + 2, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2, cv2.LINE_AA) | |
| def detect_plates_bgr(bgr, conf=0.25, iou=0.45): | |
| # 512 es buen sweet spot en CPU | |
| res = yolo.predict(bgr, conf=conf, iou=iou, imgsz=512, max_det=1, verbose=False)[0] | |
| boxes = res.boxes.xyxy.cpu().numpy() if res.boxes is not None else np.empty((0,4)) | |
| confs = res.boxes.conf.cpu().numpy() if res.boxes is not None else np.empty((0,)) | |
| return boxes, confs | |
| def run_on_image_bgr(bgr, conf=0.25, iou=0.45, with_ocr=True, annotate=True, max_plates=1): | |
| h, w = bgr.shape[:2] | |
| vis = bgr.copy() | |
| t0 = time.time() | |
| boxes, confs = detect_plates_bgr(bgr, conf, iou) | |
| idx = np.argsort(-confs)[:max_plates] | |
| boxes, confs = boxes[idx], confs[idx] | |
| detections = [] | |
| for xyxy, c in zip(boxes, confs): | |
| x1, y1, x2, y2 = expand_box(xyxy, w, h, pad_ratio=0.10) | |
| crop = bgr[y1:y2, x1:x2] | |
| txt, s = ("", 0.0) | |
| # 👇 no gastes OCR si la caja es floja | |
| if with_ocr and crop.size and float(c) >= 0.55: | |
| txt, s = ocr_plate(crop) | |
| if annotate: | |
| label = f"{txt or 'plate'} {c:.2f}" | |
| draw_box_text(vis, (x1, y1, x2, y2), label) | |
| detections.append({"box_xyxy":[x1,y1,x2,y2],"det_conf":float(c),"ocr_text":txt,"ocr_conf":float(s)}) | |
| dt_ms = int((time.time() - t0) * 1000) | |
| return vis, detections, (w, h), dt_ms | |
| def bgr_to_jpeg_base64(bgr): | |
| ok, buf = cv2.imencode(".jpg", bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) | |
| if not ok: | |
| return None | |
| return base64.b64encode(buf.tobytes()).decode("ascii") | |
| def pil_to_bgr(pil_img: Image.Image) -> np.ndarray: | |
| if pil_img.mode not in ("RGB", "RGBA"): | |
| pil_img = pil_img.convert("RGB") | |
| arr = np.array(pil_img) | |
| if arr.ndim == 2: | |
| arr = np.stack([arr]*3, axis=-1) | |
| if arr.shape[2] == 4: | |
| arr = arr[:, :, :3] | |
| return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) | |
| def decode_bytes_to_bgr(data: bytes, content_type: str = "") -> np.ndarray: | |
| # 1) OpenCV rápido | |
| arr = np.frombuffer(data, np.uint8) | |
| bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if bgr is not None: | |
| return bgr | |
| # 2) Fallback PIL (con HEIC soportado por pillow-heif) | |
| try: | |
| with Image.open(io.BytesIO(data)) as im: | |
| return pil_to_bgr(im) | |
| except Exception as e: | |
| raise ValueError(f"No pude decodificar la imagen ({content_type}): {e}") | |
| def load_image_from_url(url: str) -> np.ndarray: | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) | |
| with urllib.request.urlopen(req, timeout=20) as r: | |
| data = r.read() | |
| return decode_bytes_to_bgr(data, content_type=r.headers.get("Content-Type","")) | |
| def load_image_from_b64(b64_or_data_url: str) -> np.ndarray: | |
| s = b64_or_data_url | |
| if s.startswith("data:"): | |
| s = s.split(",", 1)[1] | |
| raw = base64.b64decode(s) | |
| return decode_bytes_to_bgr(raw, content_type="base64") | |
| # --- FastAPI --- | |
| app = FastAPI(title="Plates API (HF Space)") | |
| ALLOWED = [ | |
| "http://localhost:5173", "http://127.0.0.1:5173", | |
| "https://www.omar-cruz.com", "https://omar-cruz.com", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED, | |
| allow_origin_regex=r"^https?://([a-z0-9-]+\.)*hf\.space$", | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class Detection(BaseModel): | |
| box_xyxy: List[int] | |
| det_conf: float | |
| ocr_text: str = "" | |
| ocr_conf: float = 0.0 | |
| class DetectResponse(BaseModel): | |
| detections: List[Detection] | |
| count: int | |
| width: int | |
| height: int | |
| time_ms: int | |
| annotated_image_b64: Optional[str] = None | |
| class DetectRequest(BaseModel): | |
| image_url: Optional[str] = None | |
| image_b64: Optional[str] = None | |
| conf: float = Field(0.25, ge=0.05, le=0.95) | |
| iou: float = Field(0.45, ge=0.1, le=0.9) | |
| ocr: bool = True | |
| return_image: bool = False | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "service": "plates-api", | |
| "model": os.path.basename(WEIGHTS), | |
| "ocr_gpu": torch.cuda.is_available(), | |
| "allow_origins": ALLOWED, | |
| } | |
| def detect(req: DetectRequest): | |
| try: | |
| if not req.image_url and not req.image_b64: | |
| raise HTTPException(400, "Proporciona 'image_url' o 'image_b64'.") | |
| bgr = load_image_from_url(req.image_url) if req.image_url else load_image_from_b64(req.image_b64) | |
| vis, dets, (w, h), dt_ms = run_on_image_bgr( | |
| bgr, conf=req.conf, iou=req.iou, with_ocr=req.ocr, annotate=req.return_image | |
| ) | |
| b64 = bgr_to_jpeg_base64(vis) if req.return_image else None | |
| return DetectResponse( | |
| detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms, | |
| annotated_image_b64=b64 | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Error procesando la imagen: {e}") | |
| async def detect_upload( | |
| image: UploadFile = File(...), | |
| conf: float = Form(0.25), | |
| iou: float = Form(0.45), | |
| ocr: bool = Form(True), | |
| return_image: bool = Form(False), | |
| ): | |
| try: | |
| data = await image.read() | |
| if not data: | |
| raise HTTPException(400, "Archivo vacío.") | |
| bgr = decode_bytes_to_bgr(data, content_type=image.content_type or image.filename) | |
| if bgr is None: | |
| # si llega aquí es que ni cv2 ni PIL pudieron | |
| raise HTTPException(415, f"Formato no soportado: {image.content_type or image.filename}") | |
| vis, dets, (w, h), dt_ms = run_on_image_bgr( | |
| bgr, conf=conf, iou=iou, with_ocr=ocr, annotate=return_image | |
| ) | |
| b64 = bgr_to_jpeg_base64(vis) if return_image else None | |
| return DetectResponse( | |
| detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms, | |
| annotated_image_b64=b64 | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Error procesando la imagen: {e}") | |
| def _warmup(): | |
| import numpy as np, cv2 | |
| dummy = np.zeros((512, 512, 3), dtype=np.uint8) | |
| try: _ = yolo.predict(dummy, conf=0.25, iou=0.45, imgsz=512, verbose=False) | |
| except Exception as e: print("Warmup YOLO:", e) | |
| try: _ = reader.readtext(cv2.cvtColor(dummy, cv2.COLOR_BGR2GRAY), detail=0) | |
| except Exception as e: print("Warmup EasyOCR:", e) |