Spaces:
Build error
Build error
| from itertools import chain | |
| from random import randint | |
| import cv2 | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from ultralytics import YOLO | |
| from PIL import Image | |
| matplotlib.use('Agg') | |
| def highlight_vial_body(frame, vial_location, cap_ratio=0.2): | |
| """ | |
| Highlights only the vial body in the frame by masking out background and cap. | |
| Args: | |
| frame (np.ndarray): Original BGR image. | |
| vial_location (tuple): (x, y, w, h) bounding box of vial. | |
| cap_ratio (float): Fraction (0-1) of vial height considered as cap. | |
| Returns: | |
| masked_frame (np.ndarray): Frame with background and cap masked out. | |
| """ | |
| overlay = frame.copy() | |
| x, y, x2, y2 = vial_location | |
| h = y2 - y | |
| # Define cap and body regions | |
| cap_height = int(h * cap_ratio) | |
| body_y_start = y + cap_height | |
| # Draw gray background mask | |
| cv2.rectangle(overlay, (0, 0), (frame.shape[1], frame.shape[0]), (128, 128, 128), thickness=-1) | |
| # Draw red translucent cap over the vial's cap region | |
| cv2.rectangle(overlay, (x, y), (x2, body_y_start), (0, 0, 255), thickness=-1) | |
| masked = cv2.addWeighted(overlay, 0.5, frame, 0.5, 0) | |
| return masked | |
| class HeinSightConfig: | |
| """Configuration for the HeinSight system.""" | |
| NUM_ROWS = -1 | |
| SAVE_PLOT_VIDEO = True | |
| LIQUID_CONTENT = ["Homo", "Hetero"] | |
| CAP_RATIO = 0.3 | |
| STATUS_RULE = 0.7 | |
| DEFAULT_VIAL_LOCATION = None | |
| DEFAULT_VIAL_HEIGHT = None | |
| class HeinSight: | |
| """ | |
| The core of the HeinSight system, responsible for computer vision and analysis. | |
| """ | |
| def __init__(self, vial_model_path: str, contents_model_path: str, config: HeinSightConfig = HeinSightConfig()): | |
| self.fig, self.axs = plt.subplots(2, 2, figsize=(8, 6), height_ratios=[2, 1], constrained_layout=True) | |
| self._set_axes() | |
| self.config = config | |
| self.vial_model = YOLO(vial_model_path) | |
| self.contents_model = YOLO(contents_model_path) | |
| self.color_palette = self._register_colors([self.vial_model, self.contents_model]) | |
| self.clear_cache() | |
| def _set_axes(self): | |
| """creating plot axes""" | |
| ax0, ax1, ax2, ax3 = self.axs.flat | |
| ax0.set_position([0.21, 0.45, 0.22, 0.43]) # [left, bottom, width, height] | |
| ax1.set_position([0.47, 0.45, 0.45, 0.43]) # [left, bottom, width, height] | |
| ax2.set_position([0.12, 0.12, 0.35, 0.27]) | |
| ax3.set_position([0.56, 0.12, 0.35, 0.27]) | |
| self.fig.canvas.draw_idle() | |
| def clear_cache(self): | |
| """Resets the state of the HeinSight system.""" | |
| self.vial_location = self.config.DEFAULT_VIAL_LOCATION.copy() if self.config.DEFAULT_VIAL_LOCATION else None | |
| self.cap_rows = 0 | |
| self.vial_heigh = self.config.DEFAULT_VIAL_HEIGHT | |
| self.vial_size = [] | |
| self.content_info = None | |
| self.x_time = [] | |
| self.turbidity_2d = [] | |
| self.average_colors = [] | |
| self.average_turbidity = [] | |
| self.output = [] | |
| self.stream_output = [] | |
| self.status = {} | |
| self.status_queue = [] | |
| self.output_dataframe = pd.DataFrame() | |
| self.output_frame = None | |
| self.turbidity = [] | |
| def _register_colors(model_list): | |
| """ | |
| register default colors for models | |
| :param model_list: YOLO models list | |
| """ | |
| name_color_dict = { | |
| "Empty": (19, 69, 139), # Brown | |
| "Residue": (0, 165, 255), # Orange | |
| "Hetero": (255, 0, 255), # purple | |
| "Homo": (0, 0, 255), # Red | |
| "Solid": (255, 0, 0), # Blue | |
| } | |
| names = set(chain.from_iterable(model.names.values() for model in model_list if model)) | |
| for name in names: | |
| if name not in name_color_dict: | |
| name_color_dict[name] = (randint(0, 255), randint(0, 255), randint(0, 255)) | |
| return name_color_dict | |
| def find_vial(self, frame): | |
| """ | |
| Detect the vial in video frame with YOLOv8 | |
| :param frame: raw input frame | |
| :return result: np.ndarray or None: Detected vial bounding box or None if no vial is found. | |
| """ | |
| # vial location is not defined, use vial model to detect | |
| if not self.vial_location: | |
| results = self.vial_model(frame, conf=0.2, max_det=1) | |
| boxes = results[0].boxes.data.cpu().numpy() | |
| if boxes.size > 0: | |
| self.vial_location = [int(x) for x in boxes[0, :4]] | |
| if self.vial_location: | |
| self.cap_rows = int((self.vial_location[3] - self.vial_location[1]) * self.config.CAP_RATIO) | |
| return self.vial_location is not None | |
| def crop_rectangle(self, image, vial_location): | |
| """ | |
| crop and resize the image | |
| :param image: raw image capture | |
| :param vial_location: | |
| :return: cropped and resized vial frame | |
| """ | |
| x1, y1, x2, y2 = vial_location | |
| y1 = int(self.config.CAP_RATIO * (y2 - y1)) + y1 | |
| cropped_image = image[y1:y2, x1:x2] | |
| return cropped_image | |
| def content_detection(self, vial_frame): | |
| """ | |
| Detect content in a vial frame. | |
| :param vial_frame: (np.ndarray) Cropped vial frame. | |
| :return tuple: Bounding boxes, liquid boxes, and detected class titles. | |
| """ | |
| results = self.contents_model(vial_frame, max_det=4, agnostic_nms=False, conf=0.25, iou=0.25, verbose=False) | |
| bboxes = results[0].boxes.data.cpu().numpy() | |
| pred_classes = bboxes[:, 5] | |
| title = " ".join([self.contents_model.names[int(x)] for x in pred_classes]) | |
| liquid_boxes = [bboxes[i][:4] for i, cls in enumerate(pred_classes) if | |
| self.contents_model.names[int(cls)] in self.config.LIQUID_CONTENT] | |
| return bboxes, sorted(liquid_boxes, key=lambda x: x[1], reverse=True), title | |
| def process_vial_frame(self, vial_frame, update_od: bool = False): | |
| """ | |
| process single vial frame, detect content, draw bounding box and calculate turbidity and color | |
| :param vial_frame: vial frame image | |
| :param update_od: update object detection, True: run YOLO for this frame, False: use previous YOLO results | |
| """ | |
| if update_od or self.content_info is None: | |
| self.content_info = self.content_detection(vial_frame) | |
| bboxes, liquid_boxes, title = self.content_info | |
| phase_data, raw_turbidity = self.calculate_value_color(vial_frame, liquid_boxes) | |
| frame_image = self.draw_bounding_boxes(vial_frame, bboxes, self.contents_model.names, text_right=False) | |
| if self.config.SAVE_PLOT_VIDEO: | |
| self.display_frame(raw_turbidity, frame_image, title) | |
| self.fig.canvas.draw() | |
| frame_image = np.array(self.fig.canvas.renderer.buffer_rgba()) | |
| frame_image = cv2.cvtColor(frame_image, cv2.COLOR_RGBA2BGR) | |
| return frame_image, bboxes, raw_turbidity, phase_data | |
| def calculate_value_color(self, vial_frame, liquid_boxes): | |
| """ | |
| Calculate the value and color for a given vial image and bounding boxes | |
| :param vial_frame: the vial image | |
| :param liquid_boxes: the liquid boxes (["Homo", "Hetero"]) | |
| :return: the output dict and raw turbidity per row | |
| """ | |
| height, _, _ = vial_frame.shape | |
| hsv_image = cv2.cvtColor(vial_frame, cv2.COLOR_BGR2HSV) | |
| output = { | |
| 'time': self.x_time[-1], | |
| 'color': np.mean(hsv_image[:, :, 0]), | |
| 'turbidity': np.mean(hsv_image[:, :, 2]) | |
| } | |
| raw_value = np.mean(hsv_image[:, :, 2], axis=1) | |
| for i, bbox in enumerate(liquid_boxes): | |
| _, top, _, bottom = map(int, bbox) | |
| roi = hsv_image[top:bottom, :] | |
| output[f'volume_{i + 1}'] = (bottom - top) / height | |
| output[f'color_{i + 1}'] = np.mean(roi[:, :, 0]) | |
| output[f'turbidity_{i + 1}'] = np.mean(roi[:, :, 2]) | |
| self.average_colors.append(output['color']) | |
| self.average_turbidity.append(output['turbidity']) | |
| return output, raw_value | |
| def _get_dynamic_font_params(img_height, base_height=200, base_font_scale=1, base_thickness=2): | |
| scale_factor = img_height / base_height | |
| font_scale = base_font_scale * scale_factor | |
| thickness = max(2, int(base_thickness * scale_factor)) | |
| return font_scale, thickness | |
| def draw_bounding_boxes(self, image, bboxes, class_names, thickness=None, text_right=False, on_raw=False): | |
| """Draws bounding boxes on the image.""" | |
| output_image = image.copy() | |
| height = image.shape[1] | |
| font_scale, text_thickness = self._get_dynamic_font_params(height) | |
| margin = 2 | |
| thickness = thickness or max(2, int(height / 200)) | |
| for rect in bboxes: | |
| x1, y1, x2, y2, _, class_id = map(int, rect) | |
| class_name = class_names[class_id] | |
| color = self.color_palette.get(class_name, (255, 255, 255)) | |
| if on_raw and self.vial_location: | |
| x1, y1 = x1 + self.vial_location[0], y1 + self.vial_location[1] + self.cap_rows | |
| x2, y2 = x2 + self.vial_location[0], y2 + self.vial_location[1] + self.cap_rows | |
| cv2.rectangle(output_image, (x1, y1), (x2, y2), color, thickness) | |
| (text_width, text_height), baseline = cv2.getTextSize(class_name, cv2.FONT_HERSHEY_SIMPLEX, font_scale, | |
| text_thickness) | |
| text_location = ( | |
| x2 - text_width - margin if text_right ^ (class_name == "Solid") else x1 + margin, | |
| y1 + text_height + margin | |
| ) | |
| cv2.putText(output_image, class_name, text_location, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, | |
| text_thickness) | |
| return output_image | |
| def display_frame(self, y_values, image, title=None): | |
| """ | |
| Display the image (top-left) and its turbidity values per row (top-right) | |
| turbidity over time (bottom-left) and color over time (bottom-right) | |
| :param y_values: the turbidity value per row | |
| :param image: vial image frame to display | |
| :param title: title of the image frame | |
| """ | |
| # init plot | |
| for ax in self.axs.flat: | |
| ax.clear() | |
| ax0, ax1, ax2, ax3 = self.axs.flat | |
| # top left - vial frame and bounding boxes | |
| image_copy = image.copy() | |
| image_copy = cv2.cvtColor(image_copy, cv2.COLOR_BGR2RGB) | |
| ax0.imshow(np.flipud(image_copy), origin='lower') | |
| if title: | |
| ax0.set_title(title) | |
| # use fill between to optimize the speed 154.9857677 -> 68.15193 | |
| x_values = np.arange(len(y_values)) | |
| ax1.fill_betweenx(x_values, 0, y_values[::-1], color='green', alpha=0.5) | |
| ax1.set_ylim(0, len(y_values)) | |
| ax1.set_xlim(0, 255) | |
| ax1.xaxis.set_label_position('top') | |
| ax1.set_xlabel('Turbidity per row') | |
| realtime_tick_label = None | |
| # bottom left - turbidity | |
| ax2.set_ylabel('Turbidity') | |
| ax2.set_xlabel('Time / min') | |
| ax2.plot(self.x_time, self.average_turbidity) | |
| ax2.set_xticks([self.x_time[0], self.x_time[-1]], realtime_tick_label) | |
| # bottom right - color | |
| ax3.set_ylabel('Color (hue)') | |
| ax3.set_xlabel('Time / min') | |
| ax3.plot(self.x_time, self.average_colors) | |
| ax3.set_xticks([self.x_time[0], self.x_time[-1]], realtime_tick_label) | |
| def image_demo(self, pil_image:Image.Image, cap_ratio=0): | |
| """ | |
| analyze a vial image using heinsight. detect vial and dectet chemical content inside the vial. | |
| Args: | |
| pil_image (Image.Image): The input text to search through | |
| cap_ratio (float): The ratio of the vial cap to the vial height | |
| Returns: | |
| output_image: output image with bounding boxes and detected chemical content | |
| output_dict: output dict with turbidity, volume and color values | |
| """ | |
| self.clear_cache() | |
| frame = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) # PIL → OpenCV | |
| phase_data = {} | |
| self.config.CAP_RATIO = cap_ratio | |
| if self.find_vial(frame): | |
| vial_frame = self.crop_rectangle(frame, self.vial_location) | |
| x1, y1, x2, y2 = self.vial_location | |
| self.x_time.append(0) | |
| frame_image, bboxes, _, phase_data = self.process_vial_frame(vial_frame) | |
| boxes_on_vial = self.draw_bounding_boxes(vial_frame, bboxes, self.contents_model.names, on_raw=False) | |
| masked_frame = highlight_vial_body(frame, self.vial_location, cap_ratio=cap_ratio) | |
| masked_frame[y1 + self.cap_rows :y2, x1:x2] = boxes_on_vial | |
| # bboxes_on_raw = self.draw_bounding_boxes(masked_frame, bboxes, self.contents_model.names, on_raw=True) | |
| result = masked_frame | |
| else: | |
| result = frame | |
| result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB) # OpenCV → RGB | |
| return Image.fromarray(result_rgb), phase_data | |