diff --git a/docs/source/python/models/detection_model.md b/docs/source/python/models/detection_model.md index fce04dca..980b58a8 100644 --- a/docs/source/python/models/detection_model.md +++ b/docs/source/python/models/detection_model.md @@ -12,15 +12,12 @@ A single input image of shape (H, W, 3) where H and W are the height and width o ### Outputs -Detection model outputs a list of detection objects (i.e `list[Detection]`) wrapped in `DetectionResult`, each object containing the following attributes: +Detection model outputs a `DetectionResult` objects containing the following attributes: -- `score` (float) - Confidence score of the object. -- `id` (int) - Class label of the object. -- `str_label` (str) - String label of the object. -- `xmin` (int) - X-coordinate of the top-left corner of the bounding box. -- `ymin` (int) - Y-coordinate of the top-left corner of the bounding box. -- `xmax` (int) - X-coordinate of the bottom-right corner of the bounding box. -- `ymax` (int) - Y-coordinate of the bottom-right corner of the bounding box. +- `boxes` (np.ndarray) - Bounding boxes of the detected objects. Each in format of x1, y1, x2 y2. +- `scores` (np.ndarray) - Confidence scores of the detected objects. +- `labels` (np.ndarray) - Class labels of the detected objects. +- `label_names` (list[str]) - List of class names of the detected objects. ## Example @@ -34,11 +31,14 @@ model = SSD.create_model("model.xml") # Forward pass predictions = model(image) -# Iterate over the segmented objects -for pred_obj in predictions.objects: - pred_score = pred_obj.score - label_id = pred_obj.id - bbox = [pred_obj.xmin, pred_obj.ymin, pred_obj.xmax, pred_obj.ymax] +# Iterate over detection result +for box, score, label, label_name in zip( + predictions.boxes, + predictions.scores, + predictions.labels, + predictions.label_names, +): + print(f"Box: {box}, Score: {score}, Label: {label}, Label Name: {label_name}") ``` ```{eval-rst} diff --git a/docs/source/python/models/instance_segmentation.md b/docs/source/python/models/instance_segmentation.md index 47d1f83d..5f50084b 100644 --- a/docs/source/python/models/instance_segmentation.md +++ b/docs/source/python/models/instance_segmentation.md @@ -12,16 +12,13 @@ A single input image of shape (H, W, 3) where H and W are the height and width o ### Outputs -Instance segmentation model outputs a list of segmented objects (i.e `list[SegmentedObject]`)wrapped in `InstanceSegmentationResult.segmentedObjects`, each containing the following attributes: +Instance segmentation model outputs a `InstanceSegmentationResult` object containing the following attributes: -- `mask` (numpy.ndarray) - A binary mask of the object. -- `score` (float) - Confidence score of the object. -- `id` (int) - Class label of the object. -- `str_label` (str) - String label of the object. -- `xmin` (int) - X-coordinate of the top-left corner of the bounding box. -- `ymin` (int) - Y-coordinate of the top-left corner of the bounding box. -- `xmax` (int) - X-coordinate of the bottom-right corner of the bounding box. -- `ymax` (int) - Y-coordinate of the bottom-right corner of the bounding box. +- `boxes` (np.ndarray) - Bounding boxes of the detected objects. Each in format of x1, y1, x2 y2. +- `scores` (np.ndarray) - Confidence scores of the detected objects. +- `masks` (np.ndarray) - Segmentation masks of the detected objects. +- `labels` (np.ndarray) - Class labels of the detected objects. +- `label_names` (list[str]) - List of class names of the detected objects. ## Example @@ -36,11 +33,17 @@ model = MaskRCNNModel.create_model("model.xml") predictions = model(image) # Iterate over the segmented objects -for pred_obj in predictions.segmentedObjects: - pred_mask = pred_obj.mask - pred_score = pred_obj.score - label_id = pred_obj.id - bbox = [pred_obj.xmin, pred_obj.ymin, pred_obj.xmax, pred_obj.ymax] +for box, score, mask, label, label_name in zip( + predictions.boxes, + predictions.scores, + predictions.masks, + predictions.labels, + predictions.label_names, +): + print(f"Box: {box}, Score: {score}, Label: {label}, Label Name: {label_name}") + cv2.imshow("Mask", mask) + cv2.waitKey(0) + cv2.destroyAllWindows() ``` ```{eval-rst} diff --git a/model_api/python/model_api/models/__init__.py b/model_api/python/model_api/models/__init__.py index 364e7cac..70b7e33d 100644 --- a/model_api/python/model_api/models/__init__.py +++ b/model_api/python/model_api/models/__init__.py @@ -16,13 +16,11 @@ ClassificationResult, Contour, DetectedKeypoints, - Detection, DetectionResult, ImageResultWithSoftPrediction, InstanceSegmentationResult, PredictedMask, - SegmentedObject, - SegmentedObjectWithRects, + RotatedSegmentationResult, VisualPromptingResult, ZSLVisualPromptingResult, ) @@ -90,14 +88,12 @@ "SAMImageEncoder", "ClassificationResult", "Prompt", - "Detection", "DetectionResult", "DetectedKeypoints", "classification_models", "detection_models", "segmentation_models", - "SegmentedObject", - "SegmentedObjectWithRects", + "RotatedSegmentationResult", "add_rotated_rects", "get_contours", ] diff --git a/model_api/python/model_api/models/detection_model.py b/model_api/python/model_api/models/detection_model.py index b56d0074..4c22d83f 100644 --- a/model_api/python/model_api/models/detection_model.py +++ b/model_api/python/model_api/models/detection_model.py @@ -3,8 +3,10 @@ # SPDX-License-Identifier: Apache-2.0 # +import numpy as np + from .image_model import ImageModel -from .result_types import Detection +from .result_types import DetectionResult from .types import ListValue, NumericalValue, StringValue from .utils import load_labels @@ -65,18 +67,15 @@ def parameters(cls): return parameters - def _resize_detections(self, detections: list[Detection], meta): + def _resize_detections(self, detection_result: DetectionResult, meta: dict): """Resizes detection bounding boxes according to initial image shape. It implements image resizing depending on the set `resize_type`(see `ImageModel` for details). Next, it applies bounding boxes clipping. Args: - detections (List[Detection]): list of detections with coordinates in normalized form + detection_result (DetectionList): detection result with coordinates in normalized form meta (dict): the input metadata obtained from `preprocess` method - - Returns: - - list of detections with resized and clipped coordinates to fit the initial image """ input_img_height, input_img_widht = meta["original_shape"][:2] inverted_scale_x = input_img_widht / self.w @@ -92,63 +91,35 @@ def _resize_detections(self, detections: list[Detection], meta): pad_left = (self.w - round(input_img_widht / inverted_scale_x)) // 2 pad_top = (self.h - round(input_img_height / inverted_scale_y)) // 2 - def _clamp_and_round(val, min_value, max_value): - return round(max(min_value, min(max_value, val))) + boxes = detection_result.bboxes + boxes[:, 0::2] = (boxes[:, 0::2] * self.w - pad_left) * inverted_scale_x + boxes[:, 1::2] = (boxes[:, 1::2] * self.h - pad_top) * inverted_scale_y + np.round(boxes, out=boxes) + boxes[:, 0::2] = np.clip(boxes[:, 0::2], 0, input_img_widht) + boxes[:, 1::2] = np.clip(boxes[:, 1::2], 0, input_img_height) + detection_result.bboxes = boxes.astype(np.int32) - for detection in detections: - detection.xmin = _clamp_and_round( - (detection.xmin * self.w - pad_left) * inverted_scale_x, - 0, - input_img_widht, - ) - detection.ymin = _clamp_and_round( - (detection.ymin * self.h - pad_top) * inverted_scale_y, - 0, - input_img_height, - ) - detection.xmax = _clamp_and_round( - (detection.xmax * self.w - pad_left) * inverted_scale_x, - 0, - input_img_widht, - ) - detection.ymax = _clamp_and_round( - (detection.ymax * self.h - pad_top) * inverted_scale_y, - 0, - input_img_height, - ) - - return detections - - def _filter_detections(self, detections: list[Detection], box_area_threshold=0.0): + def _filter_detections(self, detection_result: DetectionResult, box_area_threshold=0.0): """Filters detections by confidence threshold and box size threshold Args: - detections (List[Detection]): list of detections with coordinates in normalized form + detection_result (DetectionResult): DetectionResult object with coordinates in normalized form box_area_threshold (float): minimal area of the bounding to be considered Returns: - list of detections with confidence above the threshold """ - filtered_detections = [] - for detection in detections: - if ( - detection.score < self.confidence_threshold - or (detection.xmax - detection.xmin) * (detection.ymax - detection.ymin) < box_area_threshold - ): - continue - filtered_detections.append(detection) - - return filtered_detections - - def _add_label_names(self, detections: list[Detection]): + keep = (detection_result.get_obj_sizes() > box_area_threshold) & ( + detection_result.scores > self.confidence_threshold + ) + detection_result.bboxes = detection_result.bboxes[keep] + detection_result.labels = detection_result.labels[keep] + detection_result.scores = detection_result.scores[keep] + + def _add_label_names(self, detection_result: DetectionResult) -> None: """Adds labels names to detections if they are available Args: - detections (List[Detection]): list of detections with coordinates in normalized form - - Returns: - - list of detections with label strings + detection_result (List[Detection]): list of detections with coordinates in normalized form """ - for detection in detections: - detection.str_label = self.get_label_name(detection.id) - return detections + detection_result.label_names = [self.get_label_name(label_idx) for label_idx in detection_result.labels] diff --git a/model_api/python/model_api/models/instance_segmentation.py b/model_api/python/model_api/models/instance_segmentation.py index 621c4d61..4d7078c0 100644 --- a/model_api/python/model_api/models/instance_segmentation.py +++ b/model_api/python/model_api/models/instance_segmentation.py @@ -9,7 +9,7 @@ from model_api.adapters.inference_adapter import InferenceAdapter from .image_model import ImageModel -from .result_types import InstanceSegmentationResult, SegmentedObject +from .result_types import InstanceSegmentationResult from .types import BooleanValue, ListValue, NumericalValue, StringValue from .utils import load_labels @@ -176,7 +176,6 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: out=boxes, ) - objects = [] has_feature_vector_name = _feature_vector_name in self.outputs if has_feature_vector_name: if not self.labels: @@ -184,19 +183,24 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: saliency_maps: list = [[] for _ in range(len(self.labels))] else: saliency_maps = [] - for box, confidence, cls, raw_mask in zip(boxes, scores, labels, masks): - x1, y1, x2, y2 = box - if (x2 - x1) * (y2 - y1) < 1 or (confidence <= self.confidence_threshold and not has_feature_vector_name): - continue - # Skip if label index is out of bounds - if self.labels and cls >= len(self.labels): - continue + # Apply confidence threshold, bounding box area filter and label index filter. + keep = (scores > self.confidence_threshold) & ((boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) > 1) + + if self.labels: + keep &= labels < len(self.labels) + + boxes = boxes[keep].astype(np.int32) + scores = scores[keep] + labels = labels[keep] + masks = masks[keep] - # Get label string - str_label = self.labels[cls] if self.labels else f"#{cls}" + resized_masks, label_names = [], [] + for box, label_idx, raw_mask in zip(boxes, labels, masks): + if self.labels: + label_names.append(self.labels[label_idx]) - raw_cls_mask = raw_mask[cls, ...] if self.is_segmentoly else raw_mask + raw_cls_mask = raw_mask[label_idx, ...] if self.is_segmentoly else raw_mask if self.postprocess_semantic_masks or has_feature_vector_name: resized_mask = _segm_postprocess( box, @@ -205,27 +209,21 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: ) else: resized_mask = raw_cls_mask - if confidence > self.confidence_threshold: - output_mask = resized_mask if self.postprocess_semantic_masks else raw_cls_mask - xmin, ymin, xmax, ymax = box.astype(int) - objects.append( - SegmentedObject( - xmin, - ymin, - xmax, - ymax, - score=confidence, - id=cls, - str_label=str_label, - mask=output_mask, - ), - ) - if has_feature_vector_name and confidence > self.confidence_threshold: - saliency_maps[cls - 1].append(resized_mask) + + output_mask = resized_mask if self.postprocess_semantic_masks else raw_cls_mask + resized_masks.append(output_mask) + if has_feature_vector_name: + saliency_maps[label_idx - 1].append(resized_mask) + + _masks = np.stack(resized_masks) if len(resized_masks) > 0 else np.empty((0, 16, 16), dtype=np.uint8) return InstanceSegmentationResult( - objects, - _average_and_normalize(saliency_maps), - outputs.get(_feature_vector_name, np.ndarray(0)), + bboxes=boxes, + labels=labels, + scores=scores, + masks=_masks, + label_names=label_names if label_names else None, + saliency_map=_average_and_normalize(saliency_maps), + feature_vector=outputs.get(_feature_vector_name, np.ndarray(0)), ) diff --git a/model_api/python/model_api/models/keypoint_detection.py b/model_api/python/model_api/models/keypoint_detection.py index 5ecaa718..da16778a 100644 --- a/model_api/python/model_api/models/keypoint_detection.py +++ b/model_api/python/model_api/models/keypoint_detection.py @@ -10,7 +10,7 @@ import numpy as np from .image_model import ImageModel -from .result_types import DetectedKeypoints, Detection +from .result_types import DetectedKeypoints, DetectionResult from .types import ListValue @@ -77,25 +77,27 @@ def __init__(self, base_model: KeypointDetectionModel) -> None: def predict( self, image: np.ndarray, - detections: list[Detection], + detection_result: DetectionResult, ) -> list[DetectedKeypoints]: """Predicts keypoints for the given image and detections. Args: image (np.ndarray): input full-size image - detections (list[Detection]): detections located within the given image + detection_result (detection_result): detections located within the given image Returns: list[DetectedKeypoints]: per detection keypoints in detection coordinates """ crops = [] - for det in detections: - crops.append(image[det.ymin : det.ymax, det.xmin : det.xmax]) + for box in detection_result.bboxes: + x1, y1, x2, y2 = box + crops.append(image[y1:y2, x1:x2]) crops_results = self.predict_crops(crops) - for i, det in enumerate(detections): + for i, box in enumerate(detection_result.bboxes): + x1, y1, x2, y2 = box crops_results[i] = DetectedKeypoints( - crops_results[i].keypoints + np.array([det.xmin, det.ymin]), + crops_results[i].keypoints + np.array([x1, y1]), crops_results[i].scores, ) diff --git a/model_api/python/model_api/models/result_types/__init__.py b/model_api/python/model_api/models/result_types/__init__.py index f863f712..2352d462 100644 --- a/model_api/python/model_api/models/result_types/__init__.py +++ b/model_api/python/model_api/models/result_types/__init__.py @@ -5,14 +5,13 @@ from .anomaly import AnomalyResult from .classification import ClassificationResult, Label -from .detection import Detection, DetectionResult +from .detection import DetectionResult from .keypoint import DetectedKeypoints from .segmentation import ( Contour, ImageResultWithSoftPrediction, InstanceSegmentationResult, - SegmentedObject, - SegmentedObjectWithRects, + RotatedSegmentationResult, ) from .visual_prompting import PredictedMask, VisualPromptingResult, ZSLVisualPromptingResult @@ -20,15 +19,13 @@ "AnomalyResult", "ClassificationResult", "Contour", - "Detection", "DetectionResult", "DetectedKeypoints", "Label", - "SegmentedObject", - "SegmentedObjectWithRects", "ImageResultWithSoftPrediction", "InstanceSegmentationResult", "PredictedMask", "VisualPromptingResult", "ZSLVisualPromptingResult", + "RotatedSegmentationResult", ] diff --git a/model_api/python/model_api/models/result_types/detection.py b/model_api/python/model_api/models/result_types/detection.py index 7c86ad32..49bf3f20 100644 --- a/model_api/python/model_api/models/result_types/detection.py +++ b/model_api/python/model_api/models/result_types/detection.py @@ -5,53 +5,127 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import numpy as np from .utils import array_shape_to_str -if TYPE_CHECKING: - import numpy as np - - -class Detection: - def __init__( - self, - xmin: int, - ymin: int, - xmax: int, - ymax: int, - score: float, - id: int, - str_label: str | None = None, - ) -> None: - self.xmin = xmin - self.ymin = ymin - self.xmax = xmax - self.ymax = ymax - self.score = score - self.id = int(id) - self.str_label = str_label - - def __str__(self): - return f"{self.xmin}, {self.ymin}, {self.xmax}, {self.ymax}, {self.id} ({self.str_label}): {self.score:.3f}" - class DetectionResult: - """Result for detection model.""" + """Result for detection model. + + Args: + bboxes (np.ndarray): bounding boxes in dim (N, 4) where N is the number of boxes. + labels (np.ndarray): labels for each bounding box in dim (N,). + scores (np.ndarray| None, optional): confidence scores for each bounding box in dim (N,). + label_names (list[str] | None, optional): class names for each label. Defaults to None. + saliency_map (np.ndarray | None, optional): saliency map for XAI. Defaults to None. + feature_vector (np.ndarray | None, optional): feature vector for XAI. Defaults to None. + """ def __init__( self, - objects: list[Detection] | None = None, + bboxes: np.ndarray, + labels: np.ndarray, + scores: np.ndarray | None = None, + label_names: list[str] | None = None, saliency_map: np.ndarray | None = None, feature_vector: np.ndarray | None = None, - ) -> None: - self.objects = objects - self.saliency_map = saliency_map - self.feature_vector = feature_vector - - def __str__(self): - assert self.objects is not None - obj_str = "; ".join(str(obj) for obj in self.objects) - if obj_str: - obj_str += "; " - return f"{obj_str}{array_shape_to_str(self.saliency_map)}; {array_shape_to_str(self.feature_vector)}" + ): + super().__init__() + self._bboxes = bboxes + self._labels = labels + self._scores = scores if scores is not None else np.zeros(len(bboxes)) + self._label_names = ["#"] * len(bboxes) if label_names is None else label_names + self._saliency_map = saliency_map + self._feature_vector = feature_vector + + def __len__(self) -> int: + return len(self.bboxes) + + def __str__(self) -> str: + repr_str = "" + for box, score, label, name in zip( + self.bboxes, + self.scores, + self.labels, + self.label_names, + ): + x1, y1, x2, y2 = box + repr_str += f"{x1}, {y1}, {x2}, {y2}, {label} ({name}): {score:.3f}; " + + repr_str += f"{array_shape_to_str(self.saliency_map)}; {array_shape_to_str(self.feature_vector)}" + return repr_str + + def get_obj_sizes(self) -> np.ndarray: + """Get object sizes. + + Returns: + np.ndarray: Object sizes in dim of (N,). + """ + return (self._bboxes[:, 2] - self._bboxes[:, 0]) * (self._bboxes[:, 3] - self._bboxes[:, 1]) + + @property + def bboxes(self) -> np.ndarray: + return self._bboxes + + @bboxes.setter + def bboxes(self, value): + if not isinstance(value, np.ndarray): + msg = "Bounding boxes must be numpy array." + raise ValueError(msg) + self._bboxes = value + + @property + def labels(self) -> np.ndarray: + return self._labels + + @labels.setter + def labels(self, value): + if not isinstance(value, np.ndarray): + msg = "Labels must be numpy array." + raise ValueError(msg) + self._labels = value + + @property + def scores(self) -> np.ndarray: + return self._scores + + @scores.setter + def scores(self, value): + if not isinstance(value, np.ndarray): + msg = "Scores must be numpy array." + raise ValueError(msg) + self._scores = value + + @property + def label_names(self) -> list[str]: + return self._label_names + + @label_names.setter + def label_names(self, value): + if not isinstance(value, list): + msg = "Label names must be list." + raise ValueError(msg) + self._label_names = value + + @property + def saliency_map(self): + return self._saliency_map + + @saliency_map.setter + def saliency_map(self, value: np.ndarray): + if not isinstance(value, np.ndarray): + msg = "Saliency map must be numpy array." + raise ValueError(msg) + self._saliency_map = value + + @property + def feature_vector(self) -> np.ndarray: + return self._feature_vector + + @feature_vector.setter + def feature_vector(self, value): + if not isinstance(value, np.ndarray): + msg = "Feature vector must be numpy array." + raise ValueError(msg) + self._feature_vector = value diff --git a/model_api/python/model_api/models/result_types/segmentation.py b/model_api/python/model_api/models/result_types/segmentation.py index 68ed535e..b7c82e23 100644 --- a/model_api/python/model_api/models/result_types/segmentation.py +++ b/model_api/python/model_api/models/result_types/segmentation.py @@ -10,81 +10,141 @@ import cv2 import numpy as np +from .detection import DetectionResult from .utils import array_shape_to_str if TYPE_CHECKING: from cv2.typing import RotatedRect -class SegmentedObject: - def __init__( - self, - xmin: int, - ymin: int, - xmax: int, - ymax: int, - score: float, - id: int, - str_label: str, - mask: np.ndarray, - ) -> None: - self.xmin = xmin - self.ymin = ymin - self.xmax = xmax - self.ymax = ymax - self.score = score - self.id = id - self.str_label = str_label - self.mask = mask - - def __str__(self): - return ( - f"{self.xmin}, {self.ymin}, {self.xmax}, {self.ymax}, {self.id} ({self.str_label}): {self.score:.3f}" - f", {(self.mask > 0.5).sum()}" - ) - +class InstanceSegmentationResult(DetectionResult): + """Instance segmentation result type. -class SegmentedObjectWithRects(SegmentedObject): - def __init__(self, segmented_object: SegmentedObject, rotated_rect: RotatedRect) -> None: - super().__init__( - segmented_object.xmin, - segmented_object.ymin, - segmented_object.xmax, - segmented_object.ymax, - segmented_object.score, - segmented_object.id, - segmented_object.str_label, - segmented_object.mask, - ) - self.rotated_rect = rotated_rect + Args: + bboxes (np.ndarray): bounding boxes in dim (N, 4) where N is the number of boxes. + labels (np.ndarray): labels for each bounding box in dim (N,). + masks (np.ndarray): masks for each bounding box in dim (N, H, W). + scores (np.ndarray | None, optional): confidence scores for each bounding box in dim (N,). Defaults to None. + label_names (list[str] | None, optional): class names for each label. Defaults to None. + saliency_map (list[np.ndarray] | None, optional): saliency maps for XAI. Defaults to None. + feature_vector (np.ndarray | None, optional): feature vector for XAI. Defaults to None. + """ - def __str__(self): - res = super().__str__() - rect = self.rotated_rect - res += f", RotatedRect: {rect[0][0]:.3f} {rect[0][1]:.3f} {rect[1][0]:.3f} {rect[1][1]:.3f} {rect[2]:.3f}" - return res + def __init__( + self, + bboxes: np.ndarray, + labels: np.ndarray, + masks: np.ndarray, + scores: np.ndarray | None = None, + label_names: list[str] | None = None, + saliency_map: list[np.ndarray] | None = None, + feature_vector: np.ndarray | None = None, + ): + super().__init__(bboxes, labels, scores, label_names, saliency_map, feature_vector) + self._masks = masks + + def __str__(self) -> str: + repr_str = "" + for box, score, label, name, mask in zip( + self.bboxes, + self.scores, + self.labels, + self.label_names, + self.masks, + ): + x1, y1, x2, y2 = box + repr_str += f"{x1}, {y1}, {x2}, {y2}, {label} ({name}): {score:.3f}, {(mask > 0.5).sum()}; " + filled = 0 + for cls_map in self.saliency_map: + if cls_map.size: + filled += 1 + prefix = f"{repr_str}" if len(repr_str) else "" + return prefix + f"{filled}; {array_shape_to_str(self.feature_vector)}" + + @property + def masks(self) -> np.ndarray: + return self._masks + + @masks.setter + def masks(self, value): + if not isinstance(value, np.ndarray): + msg = "Masks must be numpy array." + raise ValueError(msg) + self._masks = value + + @property + def saliency_map(self): + return self._saliency_map + + @saliency_map.setter + def saliency_map(self, value: list[np.ndarray]): + if not isinstance(value, list): + msg = "Saliency maps must be list." + raise ValueError(msg) + self._saliency_map = value + + +class RotatedSegmentationResult(InstanceSegmentationResult): + """Rotated instance segmentation result type. + + Args: + bboxes (np.ndarray): bounding boxes in dim (N, 4) where N is the number of boxes. + labels (np.ndarray): labels for each bounding box in dim (N,). + masks (np.ndarray): masks for each bounding box in dim (N, H, W). + rotated_rects (list[RotatedRect]): rotated rectangles for each bounding box. + scores (np.ndarray | None, optional): confidence scores for each bounding box in dim (N,). Defaults to None. + label_names (list[str] | None, optional): class names for each label. Defaults to None. + saliency_map (list[np.ndarray] | None, optional): saliency maps for XAI. Defaults to None. + feature_vector (np.ndarray | None, optional): feature vector for XAI. Defaults to None. + """ -class InstanceSegmentationResult: def __init__( self, - segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects], - saliency_map: list[np.ndarray], - feature_vector: np.ndarray, + bboxes: np.ndarray, + labels: np.ndarray, + masks: np.ndarray, + rotated_rects: list[RotatedRect], + scores: np.ndarray | None = None, + label_names: list[str] | None = None, + saliency_map: list[np.ndarray] | None = None, + feature_vector: np.ndarray | None = None, ): - self.segmentedObjects = segmentedObjects - # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists - self.saliency_map = saliency_map - self.feature_vector = feature_vector + super().__init__(bboxes, labels, masks, scores, label_names, saliency_map, feature_vector) + self.rotated_rects = rotated_rects + + def __str__(self) -> str: + repr_str = "" + for box, score, label, name, mask, rotated_rect in zip( + self.bboxes, + self.scores, + self.labels, + self.label_names, + self.masks, + self.rotated_rects, + ): + x1, y1, x2, y2 = box + (cx, cy), (w, h), angle = rotated_rect + repr_str += f"{x1}, {y1}, {x2}, {y2}, {label} ({name}): {score:.3f}, {(mask > 0.5).sum()}," + repr_str += f" RotatedRect: {cx:.3f} {cy:.3f} {w:.3f} {h:.3f} {angle:.3f}; " - def __str__(self): - obj_str = "; ".join(str(obj) for obj in self.segmentedObjects) filled = 0 for cls_map in self.saliency_map: if cls_map.size: filled += 1 - prefix = f"{obj_str}; " if len(obj_str) else "" - return prefix + f"{filled}; [{','.join(str(i) for i in self.feature_vector.shape)}]" + prefix = f"{repr_str}" if len(repr_str) else "" + return prefix + f"{filled}; {array_shape_to_str(self.feature_vector)}" + + @property + def rotated_rects(self) -> list[RotatedRect]: + return self._rotated_rects + + @rotated_rects.setter + def rotated_rects(self, value): + if not isinstance(value, list): + msg = "RotatedRects must be list." + raise ValueError(msg) + self._rotated_rects = value class Contour: diff --git a/model_api/python/model_api/models/ssd.py b/model_api/python/model_api/models/ssd.py index ee6776c9..947fbac6 100644 --- a/model_api/python/model_api/models/ssd.py +++ b/model_api/python/model_api/models/ssd.py @@ -6,66 +6,11 @@ import numpy as np from .detection_model import DetectionModel -from .result_types import Detection, DetectionResult +from .result_types import DetectionResult - -class SSD(DetectionModel): - __model__ = "SSD" - - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): - super().__init__(inference_adapter, configuration, preload) - self.image_info_blob_name = self.image_info_blob_names[0] if len(self.image_info_blob_names) == 1 else None - self.output_parser = self._get_output_parser(self.image_blob_name) - - def preprocess(self, inputs): - dict_inputs, meta = super().preprocess(inputs) - if self.image_info_blob_name: - dict_inputs[self.image_info_blob_name] = np.array([[self.h, self.w, 1]]) - return dict_inputs, meta - - def postprocess(self, outputs, meta): - detections = self._parse_outputs(outputs) - detections = self._resize_detections(detections, meta) - detections = self._filter_detections(detections, _bbox_area_threshold) - detections = self._add_label_names(detections) - return DetectionResult( - detections, - outputs.get(_saliency_map_name, np.ndarray(0)), - outputs.get(_feature_vector_name, np.ndarray(0)), - ) - - def _get_output_parser( - self, - image_blob_name, - bboxes="bboxes", - labels="labels", - scores="scores", - ): - try: - parser = SingleOutputParser(self.outputs) - self.logger.debug("\tUsing SSD model with single output parser") - return parser - except ValueError: - pass - - try: - parser = MultipleOutputParser(self.outputs, bboxes, scores, labels) - self.logger.debug("\tUsing SSD model with multiple output parser") - return parser - except ValueError: - pass - - try: - parser = BoxesLabelsParser(self.outputs, (self.w, self.h)) - self.logger.debug('\tUsing SSD model with "boxes-labels" output parser') - return parser - except ValueError: - pass - msg = "Unsupported model outputs" - raise ValueError(msg) - - def _parse_outputs(self, outputs): - return self.output_parser(outputs) +BBOX_AREA_THRESHOLD = 1.0 +SALIENCY_MAP_NAME = "saliency_map" +FEATURE_VECTOR_NAME = "feature_vector" def find_layer_by_name(name, layers): @@ -92,11 +37,30 @@ def __init__(self, all_outputs): msg = f"The last dimension of the output blob must be equal to 7, got {last_dim} instead." raise ValueError(msg) - def __call__(self, outputs): - return [ - Detection(xmin, ymin, xmax, ymax, score, label) - for _, label, score, xmin, ymin, xmax, ymax in outputs[self.output_name][0][0] - ] + def __call__(self, outputs) -> DetectionResult: + """Parse model outputs. + + Args: + outputs (dict): Model outputs wrapped in dict. + + Returns: + DetectionResult: Parsed model outputs. + """ + bboxes = [] + scores = [] + labels = [] + for _, label, score, xmin, ymin, xmax, ymax in outputs[self.output_name][0][0]: + bboxes.append((xmin, ymin, xmax, ymax)) + scores.append(score) + labels.append(label) + bboxes = np.array(bboxes) + scores = np.array(scores) + labels = np.array(labels).astype(np.int32) + return DetectionResult( + bboxes=bboxes, + labels=labels, + scores=scores, + ) class MultipleOutputParser: @@ -111,11 +75,19 @@ def __init__( self.scores_layer = find_layer_by_name(scores_layer, layers) self.bboxes_layer = find_layer_by_name(bboxes_layer, layers) - def __call__(self, outputs): - bboxes = outputs[self.bboxes_layer][0] - scores = outputs[self.scores_layer][0] - labels = outputs[self.labels_layer][0] - return [Detection(*bbox, score, label) for label, score, bbox in zip(labels, scores, bboxes)] + def __call__(self, outputs) -> DetectionResult: + """Parse model outputs. + + Args: + outputs (dict): Model outputs wrapped in dict. + + Returns: + DetectionResult: Parsed model outputs. + """ + bboxes = np.array(outputs[self.bboxes_layer][0]) + scores = np.array(outputs[self.scores_layer][0]) + labels = np.array(outputs[self.labels_layer][0]) + return DetectionResult(bboxes, scores, labels) class BoxesLabelsParser: @@ -144,7 +116,17 @@ def find_layer_bboxes_output(layers): raise ValueError(msg) return filter_outputs[0] - def __call__(self, outputs): + def __call__(self, outputs) -> DetectionResult: + """Parse model outputs. + + Note: Bounding boxes layer from outputs are expected to be in format [xmin, ymin, xmax, ymax, score]. + + Args: + outputs (dict): Model outputs wrapped in dict. + + Returns: + DetectionResult: Parsed model outputs. + """ bboxes = outputs[self.bboxes_layer] bboxes = bboxes.squeeze(0) scores = bboxes[:, 4] @@ -157,9 +139,65 @@ def __call__(self, outputs): labels = np.full(len(bboxes), self.default_label, dtype=bboxes.dtype) labels = labels.squeeze(0) - return [Detection(*bbox, score, label) for label, score, bbox in zip(labels, scores, bboxes)] + return DetectionResult( + bboxes=bboxes, + labels=labels, + scores=scores, + ) + + +class SSD(DetectionModel): + __model__ = "SSD" + + def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + super().__init__(inference_adapter, configuration, preload) + self.image_info_blob_name = self.image_info_blob_names[0] if len(self.image_info_blob_names) == 1 else None + self.output_parser = self._get_output_parser(self.image_blob_name) + + def preprocess(self, inputs): + dict_inputs, meta = super().preprocess(inputs) + if self.image_info_blob_name: + dict_inputs[self.image_info_blob_name] = np.array([[self.h, self.w, 1]]) + return dict_inputs, meta + + def postprocess(self, outputs, meta) -> DetectionResult: + detections = self._parse_outputs(outputs) + self._resize_detections(detections, meta) + self._filter_detections(detections, BBOX_AREA_THRESHOLD) + self._add_label_names(detections) + detections.saliency_map = outputs.get(SALIENCY_MAP_NAME, np.ndarray(0)) + detections.feature_vector = outputs.get(FEATURE_VECTOR_NAME, np.ndarray(0)) + return detections + + def _get_output_parser( + self, + image_blob_name, + bboxes="bboxes", + labels="labels", + scores="scores", + ): + try: + parser = SingleOutputParser(self.outputs) + self.logger.debug("\tUsing SSD model with single output parser") + return parser + except ValueError: + pass + + try: + parser = MultipleOutputParser(self.outputs, bboxes, scores, labels) + self.logger.debug("\tUsing SSD model with multiple output parser") + return parser + except ValueError: + pass + try: + parser = BoxesLabelsParser(self.outputs, (self.w, self.h)) + self.logger.debug('\tUsing SSD model with "boxes-labels" output parser') + return parser + except ValueError: + pass + msg = "Unsupported model outputs" + raise ValueError(msg) -_bbox_area_threshold = 1.0 -_saliency_map_name = "saliency_map" -_feature_vector_name = "feature_vector" + def _parse_outputs(self, outputs): + return self.output_parser(outputs) diff --git a/model_api/python/model_api/models/utils.py b/model_api/python/model_api/models/utils.py index 339e232f..a04e761f 100644 --- a/model_api/python/model_api/models/utils.py +++ b/model_api/python/model_api/models/utils.py @@ -6,37 +6,49 @@ from __future__ import annotations # TODO: remove when Python3.9 support is dropped from pathlib import Path +from typing import TYPE_CHECKING import cv2 import numpy as np -from model_api.models.result_types import Contour, Detection, SegmentedObject, SegmentedObjectWithRects +from model_api.models.result_types import Contour, InstanceSegmentationResult, RotatedSegmentationResult +if TYPE_CHECKING: + from model_api.models.result_types.detection import DetectionResult -def add_rotated_rects(segmented_objects: list[SegmentedObject]) -> list[SegmentedObjectWithRects]: + +def add_rotated_rects(inst_seg_result: InstanceSegmentationResult) -> RotatedSegmentationResult: objects_with_rects = [] - for segmented_object in segmented_objects: - mask = segmented_object.mask.astype(np.uint8) + for mask in inst_seg_result.masks: + mask = mask.astype(np.uint8) contours, _ = cv2.findContours( mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE, ) - contour = np.vstack(contours) - objects_with_rects.append( - SegmentedObjectWithRects(segmented_object, cv2.minAreaRect(contour)), - ) - return objects_with_rects - - -def get_contours( - segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects], -) -> list[Contour]: + objects_with_rects.append(cv2.minAreaRect(contour)) + return RotatedSegmentationResult( + bboxes=inst_seg_result.bboxes, + masks=inst_seg_result.masks, + scores=inst_seg_result.scores, + labels=inst_seg_result.labels, + label_names=inst_seg_result.label_names, + rotated_rects=objects_with_rects, + feature_vector=inst_seg_result.feature_vector, + saliency_map=inst_seg_result.saliency_map, + ) + + +def get_contours(seg_result: RotatedSegmentationResult | InstanceSegmentationResult) -> list[Contour]: combined_contours = [] - for obj in segmentedObjects: + for mask, score, label_name in zip( + seg_result.masks, + seg_result.scores, + seg_result.label_names, + ): contours, _ = cv2.findContours( - obj.mask, + mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE, ) @@ -45,17 +57,19 @@ def get_contours( if len(contours) != 1: msg = "findContours() must have returned only one contour" raise RuntimeError(msg) - combined_contours.append(Contour(label=str(obj.str_label), probability=obj.score, shape=contours[0])) + combined_contours.append(Contour(label=label_name, probability=score, shape=contours[0])) return combined_contours -def clip_detections(detections: list[Detection], size: tuple[int, int]) -> list[Detection]: - for detection in detections: - detection.xmin = min(max(round(detection.xmin), 0), size[1]) - detection.ymin = min(max(round(detection.ymin), 0), size[0]) - detection.xmax = min(max(round(detection.xmax), 0), size[1]) - detection.ymax = min(max(round(detection.ymax), 0), size[0]) - return detections +def clip_detections(detections: DetectionResult, size: tuple[int, int]): + """Clip bounding boxes to image size. + + Args: + detections (DetectionResult): detection results including boxes, labels and scores. + size (tuple[int, int]): image size of format (height, width). + """ + detections.bboxes[:, 0::2] = np.clip(detections.bboxes[:, 0::2], 0, size[1]) + detections.bboxes[:, 1::2] = np.clip(detections.bboxes[:, 1::2], 0, size[0]) class OutputTransform: diff --git a/model_api/python/model_api/models/yolo.py b/model_api/python/model_api/models/yolo.py index 2d950072..d72c2ae4 100644 --- a/model_api/python/model_api/models/yolo.py +++ b/model_api/python/model_api/models/yolo.py @@ -4,14 +4,13 @@ # from collections import namedtuple -from itertools import starmap import numpy as np from model_api.adapters.utils import INTERPOLATION_TYPES, resize_image_ocv from .detection_model import DetectionModel -from .result_types import Detection, DetectionResult +from .result_types import DetectionResult from .types import BooleanValue, ListValue, NumericalValue from .utils import clip_detections, multiclass_nms, nms @@ -190,14 +189,14 @@ def parameters(cls): parameters["confidence_threshold"].update_default_value(0.5) return parameters - def postprocess(self, outputs, meta): + def postprocess(self, outputs, meta) -> DetectionResult: detections = self._parse_outputs(outputs, meta) - detections = self._resize_detections(detections, meta) - return self._add_label_names(detections) + self._resize_detections(detections, meta) + self._add_label_names(detections) + return detections - def _parse_yolo_region(self, predictions, input_size, params): + def _parse_yolo_region(self, predictions, input_size, params) -> DetectionResult: # ------------------------------------------ Extracting layer parameters --------------------------------------- - objects = [] size_normalizer = input_size if params.use_input_size else params.sides predictions = permute_to_N_HWA_K( predictions, @@ -205,6 +204,7 @@ def _parse_yolo_region(self, predictions, input_size, params): params.output_layout, ) # ------------------------------------------- Parsing YOLO Region output --------------------------------------- + bboxes, labels, scores = [], [], [] for prediction in predictions: # Getting probabilities from raw outputs class_probabilities = self._get_probabilities(prediction, params.classes) @@ -232,18 +232,32 @@ def _parse_yolo_region(self, predictions, input_size, params): # Define class_label and cofidence label = class_idx[ind] confidence = class_probabilities[ind] - objects.append( - Detection( + + bboxes.append( + [ predicted_box.x - predicted_box.w / 2, predicted_box.y - predicted_box.h / 2, predicted_box.x + predicted_box.w / 2, predicted_box.y + predicted_box.h / 2, - confidence.item(), - label.item(), - ), + ], ) + scores.append(confidence.item()) + labels.append(label.item()) - return objects + if len(bboxes): + bboxes = np.stack(bboxes) + labels = np.array(labels) + scores = np.array(scores) + else: + bboxes = np.empty((0, 4), dtype=np.float32) + labels = np.empty((0,), dtype=np.int32) + scores = np.empty((0,), dtype=np.float32) + + return DetectionResult( + bboxes=bboxes, + labels=labels, + scores=scores, + ) @staticmethod def _get_probabilities(prediction, classes): @@ -280,7 +294,7 @@ def _get_absolute_det_box( return DetectionBox(x, y, width, height) @staticmethod - def _filter(detections, iou_threshold): + def _filter(detections: DetectionResult, iou_threshold: float) -> DetectionResult: def iou(box_1, box_2): width_of_overlap_area = min(box_1.xmax, box_2.xmax) - max( box_1.xmin, @@ -301,33 +315,59 @@ def iou(box_1, box_2): return 0 return area_of_overlap / area_of_union - detections = sorted(detections, key=lambda obj: obj.score, reverse=True) + indices = np.argsort(detections.scores)[::-1] + detections.bboxes = detections.bboxes[indices] + detections.scores = detections.scores[indices] + detections.labels = detections.labels[indices] + for i in range(len(detections)): - if detections[i].score == 0: + if detections.scores[i] == 0: continue for j in range(i + 1, len(detections)): # We perform IOU only on objects of same class - if detections[i].id != detections[j].id: + if detections.labels[i] != detections.labels[j]: continue - if iou(detections[i], detections[j]) > iou_threshold: - detections[j].score = 0 + if iou(detections.bboxes[i], detections.bboxes[j]) > iou_threshold: + detections.scores[j] = 0.0 - return [det for det in detections if det.score > 0] + keep = detections.scores > 0.0 + detections.bboxes = detections.bboxes[keep] + detections.scores = detections.scores[keep] + detections.labels = detections.labels[keep] + return detections - def _parse_outputs(self, outputs, meta): - detections = [] + def _parse_outputs(self, outputs, meta) -> DetectionResult: + bboxes, scores, labels = [], [], [] for layer_name in self.yolo_layer_params: out_blob = outputs[layer_name] layer_params = self.yolo_layer_params[layer_name] out_blob.shape = layer_params[0] - detections += self._parse_yolo_region( + detection_result = self._parse_yolo_region( out_blob, meta["resized_shape"], layer_params[1], ) + bboxes.extend(detection_result.bboxes) + scores.extend(detection_result.scores) + labels.extend(detection_result.labels) + + if len(bboxes): + bboxes = np.stack(bboxes) + labels = np.array(labels) + scores = np.array(scores) + else: + bboxes = np.empty((0, 4), dtype=np.float32) + labels = np.empty((0,), dtype=np.int32) + scores = np.empty((0,), dtype=np.float32) + + detection_result = DetectionResult( + bboxes=bboxes, + labels=labels, + scores=scores, + ) - return self._filter(detections, self.iou_threshold) + return self._filter(detection_result, self.iou_threshold) # type: ignore[attr-defined] class YoloV4(YOLO): @@ -525,7 +565,7 @@ def preprocess(self, inputs): dict_inputs = {self.image_blob_name: preprocessed_image} return dict_inputs, meta - def postprocess(self, outputs, meta): + def postprocess(self, outputs, meta) -> DetectionResult: output = outputs[self.output_blob_name][0] if np.size(self.expanded_strides) != 0 and np.size(self.grids) != 0: @@ -546,24 +586,18 @@ def postprocess(self, outputs, meta): x_maxs, y_maxs, scores, - self.iou_threshold, + self.iou_threshold, # type: ignore[attr-defined] include_boundaries=True, ) - detections = list( - starmap( - Detection, - zip( - x_mins[keep_nms], - y_mins[keep_nms], - x_maxs[keep_nms], - y_maxs[keep_nms], - scores[keep_nms], - j[keep_nms], - ), - ), + detections = DetectionResult( + bboxes=boxes[i][keep_nms], + scores=scores[keep_nms], + labels=j[keep_nms], ) - return clip_detections(detections, meta["original_shape"]) + + clip_detections(detections, meta["original_shape"]) + return detections def set_strides_grids(self): grids = [] @@ -675,11 +709,12 @@ def preprocess(self, inputs): return dict_inputs, meta - def postprocess(self, outputs, meta): + def postprocess(self, outputs, meta) -> DetectionResult: detections = self._parse_outputs(outputs) - return clip_detections(detections, meta["original_shape"]) + clip_detections(detections, meta["original_shape"]) + return detections - def _parse_outputs(self, outputs): + def _parse_outputs(self, outputs) -> DetectionResult: boxes = outputs[self.bboxes_blob_name][0] scores = outputs[self.scores_blob_name][0] indices = ( @@ -695,24 +730,33 @@ def _parse_outputs(self, outputs): out_classes.append(idx_[1]) out_scores.append(scores[tuple(idx_[1:])]) out_boxes.append(boxes[idx_[2]]) - transposed_boxes = np.array(out_boxes).T if out_boxes else ([], [], [], []) + + _boxes = np.stack(out_boxes) if out_boxes else np.empty((0, 4), dtype=np.float32) + x_mins = _boxes[:, 1] + y_mins = _boxes[:, 0] + x_maxs = _boxes[:, 3] + y_maxs = _boxes[:, 2] + _boxes = np.stack((x_mins, y_mins, x_maxs, y_maxs)).T mask = np.array(out_scores) > self.confidence_threshold if mask.size == 0: - return [] + return DetectionResult( + bboxes=np.empty((0, 4), dtype=np.float32), + labels=np.empty((0,), dtype=np.int32), + scores=np.empty((0,), dtype=np.float32), + ) - out_classes, out_scores, transposed_boxes = ( + _classes, _scores, _boxes = ( np.array(out_classes)[mask], np.array(out_scores)[mask], - transposed_boxes[:, mask], + _boxes[mask], ) - x_mins = transposed_boxes[1] - y_mins = transposed_boxes[0] - x_maxs = transposed_boxes[3] - y_maxs = transposed_boxes[2] - - return list(starmap(Detection, zip(x_mins, y_mins, x_maxs, y_maxs, out_scores, out_classes))) + return DetectionResult( + bboxes=_boxes, + labels=_classes, + scores=_scores, + ) class YOLOv5(DetectionModel): @@ -760,7 +804,7 @@ def parameters(cls): ) return parameters - def postprocess(self, outputs, meta): + def postprocess(self, outputs, meta) -> DetectionResult: if len(outputs) != 1: self.raise_error("expect 1 output") prediction = next(iter(outputs.values())) @@ -784,7 +828,7 @@ def postprocess(self, outputs, meta): dtype=np.float32, ) keep_top_k = 30000 - if self.agnostic_nms: + if self.agnostic_nms: # type: ignore[attr-defined] boxes = boxes[ nms( boxes[:, 2], @@ -792,12 +836,12 @@ def postprocess(self, outputs, meta): boxes[:, 4], boxes[:, 5], boxes[:, 1], - self.iou_threshold, + self.iou_threshold, # type: ignore[attr-defined] keep_top_k=keep_top_k, ) ] else: - boxes, _ = multiclass_nms(boxes, self.iou_threshold, keep_top_k) + boxes, _ = multiclass_nms(boxes, self.iou_threshold, keep_top_k) # type: ignore[attr-defined] inputImgWidth = meta["original_shape"][1] inputImgHeight = meta["original_shape"][0] invertedScaleX, invertedScaleY = ( @@ -823,17 +867,12 @@ def postprocess(self, outputs, meta): ) intid = boxes[:, 0].astype(np.int32) return DetectionResult( - [ - Detection( - *intboxes[i], - boxes[i, 1], - intid[i], - self.get_label_name(intid[i]), - ) - for i in range(len(boxes)) - ], - np.ndarray(0), - np.ndarray(0), + bboxes=intboxes, + scores=boxes[:, 1], + labels=intid, + label_names=[self.get_label_name(i) for i in intid], + saliency_map=np.ndarray(0), + feature_vector=np.ndarray(0), ) diff --git a/model_api/python/model_api/tilers/detection.py b/model_api/python/model_api/tilers/detection.py index 624ca845..5fb46b39 100644 --- a/model_api/python/model_api/tilers/detection.py +++ b/model_api/python/model_api/tilers/detection.py @@ -6,7 +6,7 @@ import cv2 as cv import numpy as np -from model_api.models import Detection, DetectionResult +from model_api.models import DetectionResult from model_api.models.types import NumericalValue from model_api.models.utils import multiclass_nms @@ -27,7 +27,7 @@ def parameters(cls): """Defines the description and type of configurable data parameters for the tiler. Returns: - - the dictionary with defined wrapper tiler parameters + - the dictionary with defined wrapper tiler parameters """ parameters = super().parameters() parameters.update( @@ -49,26 +49,21 @@ def parameters(cls): ) return parameters - def _postprocess_tile(self, predictions, coord): + def _postprocess_tile( + self, + predictions: DetectionResult, + coord: list[int], + ) -> dict: """Converts predictions to a format convenient for further merging. Args: - predictions: predictions from a detection model: a list of `Detection` objects - or one `DetectionResult` - coord: a list containing coordinates for the processed tile + predictions: predictions wrapped in DetectionResult from a detection model + coord: a list containing coordinates for the processed tile Returns: - a dict with postprocessed predictions in 6-items format: (label id, score, bbox) + a dict with postprocessed predictions in 6-items format: (label id, score, bbox) """ output_dict = {} - if hasattr(predictions, "objects"): - detections = _detection2array(predictions.objects) - elif hasattr(predictions, "segmentedObjects"): - detections = _detection2array(predictions.segmentedObjects) - else: - msg = "Unsupported model predictions format" - raise RuntimeError(msg) - output_dict["saliency_map"] = predictions.saliency_map output_dict["features"] = predictions.feature_vector @@ -77,22 +72,25 @@ def _postprocess_tile(self, predictions, coord): output_dict["features"] = np.copy(output_dict["features"]) offset_x, offset_y = coord[:2] - detections[:, 2:] += np.tile((offset_x, offset_y), 2) - output_dict["bboxes"] = detections + predictions.bboxes += np.tile((offset_x, offset_y), 2) + output_dict["bboxes"] = np.concatenate( + (predictions.labels[:, np.newaxis], predictions.scores[:, np.newaxis], predictions.bboxes), + -1, + ) output_dict["coords"] = coord return output_dict - def _merge_results(self, results, shape): + def _merge_results(self, results: list[dict], shape: tuple[int, int, int]) -> DetectionResult: """Merge results from all tiles. To merge detections, per-class NMS is applied. Args: - results: list of per-tile results - shape: original full-res image shape + results: list of per-tile results + shape: original full-res image shape Returns: - merged prediction + merged prediction """ detections_array = np.empty((0, 6), dtype=np.float32) feature_vectors = [] @@ -108,29 +106,29 @@ def _merge_results(self, results, shape): if np.prod(detections_array.shape): detections_array, _ = multiclass_nms( detections_array, - max_num=self.max_pred_number, - iou_threshold=self.iou_threshold, + max_num=self.max_pred_number, # type: ignore[attr-defined] + iou_threshold=self.iou_threshold, # type: ignore[attr-defined] ) merged_vector = np.mean(feature_vectors, axis=0) if feature_vectors else np.ndarray(0) saliency_map = self._merge_saliency_maps(saliency_maps, shape, tiles_coords) if saliency_maps else np.ndarray(0) - - detected_objects = [] - for i in range(detections_array.shape[0]): - label = int(detections_array[i][0]) - score = float(detections_array[i][1]) - bbox = list(detections_array[i][2:].astype(np.int32)) - detected_objects.append( - Detection(*bbox, score, label, self.model.labels[label]), - ) + label_names = [self.model.labels[int(label_idx)] for label_idx in detections_array[:, 0]] return DetectionResult( - detected_objects, - saliency_map, - merged_vector, + bboxes=detections_array[:, 2:].astype(np.int32), + labels=detections_array[:, 0].astype(np.int32), + scores=detections_array[:, 1], + label_names=label_names, + saliency_map=saliency_map, + feature_vector=merged_vector, ) - def _merge_saliency_maps(self, saliency_maps, shape, tiles_coords): + def _merge_saliency_maps( + self, + saliency_maps: list[np.ndarray], + shape: tuple[int, int, int], + tiles_coords: list[tuple[int, int, int, int]], + ) -> np.ndarray: """Merged saliency maps from each tile Args: @@ -159,11 +157,11 @@ def _merge_saliency_maps(self, saliency_maps, shape, tiles_coords): image_h, image_w, _ = shape ratio = ( - map_h / min(image_h, self.tile_size), + map_h / min(image_h, self.tile_size), # type: ignore[attr-defined] map_w / min( image_w, - self.tile_size, + self.tile_size, # type: ignore[attr-defined] ), ) @@ -171,7 +169,7 @@ def _merge_saliency_maps(self, saliency_maps, shape, tiles_coords): image_map_w = int(image_w * ratio[1]) merged_map = np.zeros((num_classes, image_map_h, image_map_w)) - start_idx = 1 if self.tile_with_full_img else 0 + start_idx = 1 if self.tile_with_full_img else 0 # type: ignore[attr-defined] for i, saliency_map in enumerate(saliency_maps[start_idx:], start_idx): for class_idx in range(num_classes): if len(saliency_map.shape) == 4: @@ -199,7 +197,7 @@ def _merge_saliency_maps(self, saliency_maps, shape, tiles_coords): merged_map[class_idx][y_1 + hi, x_1 + wi] = map_pixel for class_idx in range(num_classes): - if self.tile_with_full_img: + if self.tile_with_full_img: # type: ignore[attr-defined] image_map_cls = image_saliency_map[class_idx] image_map_cls = cv.resize(image_map_cls, (image_map_w, image_map_h)) merged_map[class_idx] += 0.5 * image_map_cls @@ -212,7 +210,7 @@ def _merge_saliency_maps(self, saliency_maps, shape, tiles_coords): return merged_map.astype(np.uint8) -def _non_linear_normalization(saliency_map): +def _non_linear_normalization(saliency_map) -> np.ndarray: """Use non-linear normalization y=x**1.5 for 2D saliency maps.""" min_soft_score = np.min(saliency_map) # make merged_map distribution positive to perform non-linear normalization y=x**1.5 @@ -222,28 +220,3 @@ def _non_linear_normalization(saliency_map): saliency_map = 255.0 / (max_soft_score + 1e-12) * saliency_map return np.floor(saliency_map) - - -def _detection2array(detections): - """Convert list of OpenVINO Detection to a numpy array. - - Args: - detections (List): List of OpenVINO Detection containing score, id, xmin, ymin, xmax, ymax - - Returns: - np.ndarray: numpy array with [label, confidence, x1, y1, x2, y2] - """ - scores = np.empty((0, 1), dtype=np.float32) - labels = np.empty((0, 1), dtype=np.uint32) - boxes = np.empty((0, 4), dtype=np.float32) - for det in detections: - if (det.xmax - det.xmin) * (det.ymax - det.ymin) < 1.0: - continue - scores = np.append(scores, [[det.score]], axis=0) - labels = np.append(labels, [[det.id]], axis=0) - boxes = np.append( - boxes, - [[float(det.xmin), float(det.ymin), float(det.xmax), float(det.ymax)]], - axis=0, - ) - return np.concatenate((labels, scores, boxes), -1) diff --git a/model_api/python/model_api/tilers/instance_segmentation.py b/model_api/python/model_api/tilers/instance_segmentation.py index 5de1db14..23b95ce8 100644 --- a/model_api/python/model_api/tilers/instance_segmentation.py +++ b/model_api/python/model_api/tilers/instance_segmentation.py @@ -8,10 +8,7 @@ import cv2 as cv import numpy as np -from model_api.models import ( - InstanceSegmentationResult, - SegmentedObject, -) +from model_api.models import InstanceSegmentationResult from model_api.models.instance_segmentation import MaskRCNNModel, _segm_postprocess from model_api.models.utils import multiclass_nms @@ -68,7 +65,7 @@ def _filter_tiles(self, image, tile_coords, confidence_threshold=0.35): return tile_coords - def _postprocess_tile(self, predictions, coord): + def _postprocess_tile(self, predictions: InstanceSegmentationResult, coord) -> dict: # type: ignore[override] """Converts predictions to a format convenient for further merging. Args: @@ -80,21 +77,21 @@ def _postprocess_tile(self, predictions, coord): """ output_dict = super()._postprocess_tile(predictions, coord) output_dict["masks"] = [] - for segm_res in predictions.segmentedObjects: - output_dict["masks"].append(segm_res.mask) + for mask in predictions.masks: + output_dict["masks"].append(mask) return output_dict - def _merge_results(self, results, shape): + def _merge_results(self, results, shape) -> InstanceSegmentationResult: """Merge results from all tiles. To merge detections, per-class NMS is applied. Args: - results: list of per-tile results - shape: original full-res image shape + results: list of per-tile results + shape: original full-res image shape Returns: - merged prediction + merged prediction """ detections_array = np.empty((0, 6), dtype=np.float32) feature_vectors = [] @@ -114,28 +111,30 @@ def _merge_results(self, results, shape): if np.prod(detections_array.shape): detections_array, keep_idxs = multiclass_nms( detections_array, - max_num=self.max_pred_number, - iou_threshold=self.iou_threshold, + max_num=self.max_pred_number, # type: ignore[attr-defined] + iou_threshold=self.iou_threshold, # type: ignore[attr-defined] ) masks = [masks[keep_idx] for keep_idx in keep_idxs] merged_vector = np.mean(feature_vectors, axis=0) if feature_vectors else np.ndarray(0) saliency_map = self._merge_saliency_maps(saliency_maps, shape, tiles_coords) if saliency_maps else [] - detected_objects = [] - for i in range(detections_array.shape[0]): - label = int(detections_array[i][0]) - score = float(detections_array[i][1]) - bbox = list(detections_array[i][2:].astype(np.int32)) - masks[i] = _segm_postprocess(np.array(bbox), masks[i], *shape[:-1]) - detected_objects.append( - SegmentedObject(*bbox, score, label, self.model.labels[label], masks[i]), - ) + labels, scores, bboxes = np.hsplit(detections_array, [1, 2]) + labels = labels.astype(np.int32) + resized_masks, label_names = [], [] + for mask, box, label_idx in zip(masks, bboxes, labels): + label_names.append(self.model.labels[int(label_idx)]) + resized_masks.append(_segm_postprocess(box, mask, *shape[:-1])) + resized_masks = np.stack(resized_masks) if resized_masks else masks return InstanceSegmentationResult( - detected_objects, - saliency_map, - merged_vector, + bboxes=np.round(bboxes).astype(np.int32), + labels=labels.squeeze(), + scores=scores.squeeze(), + masks=resized_masks, + label_names=label_names if label_names else None, + saliency_map=saliency_map, + feature_vector=merged_vector, ) def _merge_saliency_maps(self, saliency_maps, shape, tiles_coords): diff --git a/model_api/python/pyproject.toml b/model_api/python/pyproject.toml index c011e57f..63591653 100644 --- a/model_api/python/pyproject.toml +++ b/model_api/python/pyproject.toml @@ -40,6 +40,7 @@ tests = [ "pre-commit", "httpx", "pytest", + "pytest-mock", "openvino-dev[onnx,pytorch,tensorflow2]", "ultralytics>=8.0.114,<=8.0.205", "onnx", diff --git a/tests/python/accuracy/test_YOLOv8.py b/tests/python/accuracy/test_YOLOv8.py index 8ff97b80..2b1ae77e 100644 --- a/tests/python/accuracy/test_YOLOv8.py +++ b/tests/python/accuracy/test_YOLOv8.py @@ -73,31 +73,19 @@ def test_alignment(impath, pt): im = cv2.imread(str(impath)) assert im is not None impl_preds = impl_wrapper(im) - pred_boxes = np.array( - [ - ( - impl_pred.xmin, - impl_pred.ymin, - impl_pred.xmax, - impl_pred.ymax, - impl_pred.score, - impl_pred.id, - ) - for impl_pred in impl_preds.objects - ], - dtype=np.float32, - ) + pred_boxes = impl_preds.bboxes.astype(np.float32) + pred_scores = impl_preds.scores.astype(np.float32) + pred_labels = impl_preds.labels ref_predictions = ref_wrapper.predict(im) assert 1 == len(ref_predictions) ref_boxes = ref_predictions[0].boxes.data.numpy() - if 0 == pred_boxes.size == ref_boxes.size: + if 0 == len(pred_boxes) == len(ref_boxes): return # np.isclose() doesn't work for empty arrays ref_boxes[:, :4] = np.round(ref_boxes[:, :4], out=ref_boxes[:, :4]) - assert np.isclose( - pred_boxes[:, :4], ref_boxes[:, :4], 0, 1 - ).all() # Allow one pixel deviation because image preprocessing is imbedded into the model - assert np.isclose(pred_boxes[:, 4], ref_boxes[:, 4], 0.0, 0.02).all() - assert (pred_boxes[:, 5] == ref_boxes[:, 5]).all() + # Allow one pixel deviation because image preprocessing is imbedded into the model + assert np.isclose(pred_boxes, ref_boxes[:, :4], 0, 1).all() + assert np.isclose(pred_scores, ref_boxes[:, 4], 0.0, 0.02).all() + assert (pred_labels == ref_boxes[:, 5]).all() with open(ref_dir / impath.with_suffix(".txt").name, "w") as file: print(impl_preds, end="", file=file) @@ -125,20 +113,16 @@ def evaluate(self, wrapper): ) for batch in dataloader: im = cv2.imread(batch["im_file"][0]) - pred = torch.tensor( - [ - ( - impl_pred.xmin / im.shape[1], - impl_pred.ymin / im.shape[0], - impl_pred.xmax / im.shape[1], - impl_pred.ymax / im.shape[0], - impl_pred.score, - impl_pred.id, - ) - for impl_pred in wrapper(im).objects - ], - dtype=torch.float32, - )[None] + result = wrapper(im) + bboxes = torch.from_numpy(result.bboxes) / torch.tile( + torch.tensor([im.shape[1], im.shape[0]], dtype=torch.float32), (1, 2) + ) + scores = torch.from_numpy(result.scores) + labels = torch.from_numpy(result.labels) + + pred = torch.cat( + [bboxes, scores[:, None], labels[:, None].float()], dim=1 + ).unsqueeze(0) if not pred.numel(): pred = pred.view(1, 0, 6) self.update_metrics(pred, batch) diff --git a/tests/python/accuracy/test_accuracy.py b/tests/python/accuracy/test_accuracy.py index 61b6800a..98af9f72 100644 --- a/tests/python/accuracy/test_accuracy.py +++ b/tests/python/accuracy/test_accuracy.py @@ -190,7 +190,7 @@ def test_image_models(data, dump, result, model_data): output_str = str(outputs) assert test_data["reference"][0] == output_str image_result = [output_str] - elif isinstance(outputs, DetectionResult): + elif type(outputs) is DetectionResult: assert 1 == len(test_data["reference"]) output_str = str(outputs) assert test_data["reference"][0] == output_str @@ -207,26 +207,14 @@ def test_image_models(data, dump, result, model_data): output_str = str(outputs) + contour_str assert test_data["reference"][0] == output_str image_result = [output_str] - elif isinstance(outputs, InstanceSegmentationResult): + elif type(outputs) is InstanceSegmentationResult: assert 1 == len(test_data["reference"]) - output_str = ( - str( - InstanceSegmentationResult( - add_rotated_rects(outputs.segmentedObjects), - outputs.saliency_map, - outputs.feature_vector, - ) - ) - + "; " - ) + output_str = str(add_rotated_rects(outputs)) + "; " try: # getContours() assumes each instance generates only one contour. # That doesn't hold for some models output_str += ( - "; ".join( - str(contour) - for contour in get_contours(outputs.segmentedObjects) - ) + "; ".join(str(contour) for contour in get_contours(outputs)) + "; " ) except RuntimeError: