From 7db778cbb117ed88576b9e35f5420390b769d0a8 Mon Sep 17 00:00:00 2001 From: Ashwin Vaidya Date: Wed, 13 Nov 2024 11:30:24 +0100 Subject: [PATCH 1/5] refator result types Signed-off-by: Ashwin Vaidya --- .../python/model_api/models/result_types.py | 246 ------------------ .../model_api/models/result_types/__init__.py | 8 + .../models/result_types/anomaly_result.py | 42 +++ .../model_api/models/result_types/base.py | 13 + .../models/result_types/classification.py | 34 +++ .../models/result_types/detection.py | 40 +++ .../models/result_types/keypoint_detection.py | 19 ++ .../models/result_types/segmentation.py | 126 +++++++++ .../model_api/models/result_types/utils.py | 12 + .../models/result_types/visual_prompting.py | 89 +++++++ .../python/model_api/visualizer/__init__.py | 8 + .../python/model_api/visualizer/primitives.py | 10 + .../model_api/visualizer/visualize_mixin.py | 13 + .../python/model_api/visualizer/visualizer.py | 14 + model_api/python/pyproject.toml | 1 + 15 files changed, 429 insertions(+), 246 deletions(-) delete mode 100644 model_api/python/model_api/models/result_types.py create mode 100644 model_api/python/model_api/models/result_types/__init__.py create mode 100644 model_api/python/model_api/models/result_types/anomaly_result.py create mode 100644 model_api/python/model_api/models/result_types/base.py create mode 100644 model_api/python/model_api/models/result_types/classification.py create mode 100644 model_api/python/model_api/models/result_types/detection.py create mode 100644 model_api/python/model_api/models/result_types/keypoint_detection.py create mode 100644 model_api/python/model_api/models/result_types/segmentation.py create mode 100644 model_api/python/model_api/models/result_types/utils.py create mode 100644 model_api/python/model_api/models/result_types/visual_prompting.py create mode 100644 model_api/python/model_api/visualizer/__init__.py create mode 100644 model_api/python/model_api/visualizer/primitives.py create mode 100644 model_api/python/model_api/visualizer/visualize_mixin.py create mode 100644 model_api/python/model_api/visualizer/visualizer.py diff --git a/model_api/python/model_api/models/result_types.py b/model_api/python/model_api/models/result_types.py deleted file mode 100644 index d51c505c..00000000 --- a/model_api/python/model_api/models/result_types.py +++ /dev/null @@ -1,246 +0,0 @@ -# -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 -# - -from __future__ import annotations # TODO: remove when Python3.9 support is dropped - -from typing import NamedTuple - -import cv2 as cv -import numpy as np - - -class AnomalyResult(NamedTuple): - """Results for anomaly models.""" - - anomaly_map: np.ndarray | None = None - pred_boxes: np.ndarray | None = None - pred_label: str | None = None - pred_mask: np.ndarray | None = None - pred_score: float | None = None - - def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: - """Computes min and max values of the tensor.""" - return tensor.min(), tensor.max() - - def __str__(self) -> str: - assert self.anomaly_map is not None - assert self.pred_mask is not None - anomaly_map_min, anomaly_map_max = self._compute_min_max(self.anomaly_map) - pred_mask_min, pred_mask_max = self._compute_min_max(self.pred_mask) - return ( - f"anomaly_map min:{anomaly_map_min} max:{anomaly_map_max};" - f"pred_score:{np.round(self.pred_score, 1) if self.pred_score else 0.0};" - f"pred_label:{self.pred_label};" - f"pred_mask min:{pred_mask_min} max:{pred_mask_max};" - ) - - -class ClassificationResult(NamedTuple): - """Results for classification models.""" - - top_labels: list[tuple[int, str, float]] | None = None - saliency_map: np.ndarray | None = None - feature_vector: np.ndarray | None = None - raw_scores: np.ndarray | None = None - - def __str__(self) -> str: - assert self.top_labels is not None - labels = ", ".join(f"{idx} ({label}): {confidence:.3f}" for idx, label, confidence in self.top_labels) - return ( - f"{labels}, {_array_shape_to_str(self.saliency_map)}, {_array_shape_to_str(self.feature_vector)}, " - f"{_array_shape_to_str(self.raw_scores)}" - ) - - -class Detection: - def __init__(self, xmin, ymin, xmax, ymax, score, id, str_label=None) -> None: - self.xmin: int = xmin - self.ymin: int = ymin - self.xmax: int = xmax - self.ymax: int = ymax - self.score: float = score - self.id: int = int(id) - self.str_label: str | None = str_label - - def __str__(self): - return f"{self.xmin}, {self.ymin}, {self.xmax}, {self.ymax}, {self.id} ({self.str_label}): {self.score:.3f}" - - -class DetectionResult(NamedTuple): - """Result for detection model.""" - - objects: list[Detection] | None = None - saliency_map: np.ndarray | None = None - feature_vector: np.ndarray | None = None - - def __str__(self): - assert self.objects is not None - obj_str = "; ".join(str(obj) for obj in self.objects) - if obj_str: - obj_str += "; " - return f"{obj_str}{_array_shape_to_str(self.saliency_map)}; {_array_shape_to_str(self.feature_vector)}" - - -class SegmentedObject(Detection): - def __init__(self, xmin, ymin, xmax, ymax, score, id, str_label, mask): - super().__init__(xmin, ymin, xmax, ymax, score, id, str_label) - self.mask = mask - - def __str__(self): - return f"{super().__str__()}, {(self.mask > 0.5).sum()}" - - -class SegmentedObjectWithRects(SegmentedObject): - def __init__(self, segmented_object, rotated_rect): - super().__init__( - segmented_object.xmin, - segmented_object.ymin, - segmented_object.xmax, - segmented_object.ymax, - segmented_object.score, - segmented_object.id, - segmented_object.str_label, - segmented_object.mask, - ) - self.rotated_rect = rotated_rect - - def __str__(self): - res = super().__str__() - rect = self.rotated_rect - res += f", RotatedRect: {rect[0][0]:.3f} {rect[0][1]:.3f} {rect[1][0]:.3f} {rect[1][1]:.3f} {rect[2]:.3f}" - return res - - -class InstanceSegmentationResult(NamedTuple): - segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects] - # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists - saliency_map: list[np.ndarray] - feature_vector: np.ndarray - - def __str__(self): - obj_str = "; ".join(str(obj) for obj in self.segmentedObjects) - filled = 0 - for cls_map in self.saliency_map: - if cls_map.size: - filled += 1 - prefix = f"{obj_str}; " if len(obj_str) else "" - return prefix + f"{filled}; [{','.join(str(i) for i in self.feature_vector.shape)}]" - - -class VisualPromptingResult(NamedTuple): - upscaled_masks: list[np.ndarray] | None = None - processed_mask: list[np.ndarray] | None = None - low_res_masks: list[np.ndarray] | None = None - iou_predictions: list[np.ndarray] | None = None - scores: list[np.ndarray] | None = None - labels: list[np.ndarray] | None = None - hard_predictions: list[np.ndarray] | None = None - soft_predictions: list[np.ndarray] | None = None - best_iou: list[float] | None = None - - def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: - return tensor.min(), tensor.max() - - def __str__(self) -> str: - assert self.hard_predictions is not None - assert self.upscaled_masks is not None - upscaled_masks_min, upscaled_masks_max = self._compute_min_max( - self.upscaled_masks[0], - ) - - return ( - f"upscaled_masks min:{upscaled_masks_min:.3f} max:{upscaled_masks_max:.3f};" - f"hard_predictions shape:{self.hard_predictions[0].shape};" - ) - - -class PredictedMask(NamedTuple): - mask: list[np.ndarray] - points: list[np.ndarray] | np.ndarray - scores: list[float] | np.ndarray - - def __str__(self) -> str: - obj_str = "" - obj_str += f"mask sum: {np.sum(sum(self.mask))}; " - - if isinstance(self.points, list): - for i, point in enumerate(self.points): - obj_str += "[" - obj_str += ", ".join(str(round(c, 2)) for c in point) - obj_str += "] " - obj_str += "iou: " + f"{float(self.scores[i]):.3f} " - else: - for i in range(self.points.shape[0]): - point = self.points[i] - obj_str += "[" - obj_str += ", ".join(str(round(c, 2)) for c in point) - obj_str += "] " - obj_str += "iou: " + f"{float(self.scores[i]):.3f} " - - return obj_str.strip() - - -class ZSLVisualPromptingResult(NamedTuple): - data: dict[int, PredictedMask] - - def __str__(self) -> str: - return ", ".join(str(self.data[k]) for k in self.data) - - def get_mask(self, label: int) -> PredictedMask: - """Returns a mask belonging to a given label""" - return self.data[label] - - -class DetectedKeypoints(NamedTuple): - keypoints: np.ndarray - scores: np.ndarray - - def __str__(self): - return ( - f"keypoints: {self.keypoints.shape}, " - f"keypoints_x_sum: {np.sum(self.keypoints[:, :1]):.3f}, " - f"scores: {self.scores.shape}" - ) - - -class Contour(NamedTuple): - label: str - probability: float - shape: list[tuple[int, int]] - - def __str__(self): - return f"{self.label}: {self.probability:.3f}, {len(self.shape)}" - - -class ImageResultWithSoftPrediction(NamedTuple): - resultImage: np.ndarray - soft_prediction: np.ndarray - # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists - saliency_map: np.ndarray # Requires return_soft_prediction==True - feature_vector: np.ndarray - - def __str__(self): - outHist = cv.calcHist( - [self.resultImage.astype(np.uint8)], - channels=None, - mask=None, - histSize=[256], - ranges=[0, 255], - ) - hist = "" - for i, count in enumerate(outHist): - if count > 0: - hist += f"{i}: {count[0] / self.resultImage.size:.3f}, " - return ( - f"{hist}{_array_shape_to_str(self.soft_prediction)}, " - f"{_array_shape_to_str(self.saliency_map)}, " - f"{_array_shape_to_str(self.feature_vector)}" - ) - - -def _array_shape_to_str(array: np.ndarray | None) -> str: - if array is not None: - return f"[{','.join(str(i) for i in array.shape)}]" - return "[]" diff --git a/model_api/python/model_api/models/result_types/__init__.py b/model_api/python/model_api/models/result_types/__init__.py new file mode 100644 index 00000000..5ec07379 --- /dev/null +++ b/model_api/python/model_api/models/result_types/__init__.py @@ -0,0 +1,8 @@ +"""Result types.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from .anomaly_result import AnomalyResult + +__all__ = ["AnomalyResult"] diff --git a/model_api/python/model_api/models/result_types/anomaly_result.py b/model_api/python/model_api/models/result_types/anomaly_result.py new file mode 100644 index 00000000..4bbcbd32 --- /dev/null +++ b/model_api/python/model_api/models/result_types/anomaly_result.py @@ -0,0 +1,42 @@ +"""Anomaly result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np + +from model_api.models.result_types.base import Result + + +class AnomalyResult(Result): + """Results for anomaly models.""" + + def __init__( + self, + anomaly_map: np.ndarray | None = None, + pred_boxes: np.ndarray | None = None, + pred_label: str | None = None, + pred_mask: np.ndarray | None = None, + pred_score: float | None = None, + ) -> None: + self.anomaly_map = anomaly_map + self.pred_boxes = pred_boxes + self.pred_label = pred_label + self.pred_mask = pred_mask + self.pred_score = pred_score + + def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Computes min and max values of the tensor.""" + return tensor.min(), tensor.max() + + def __str__(self) -> str: + assert self.anomaly_map is not None + assert self.pred_mask is not None + anomaly_map_min, anomaly_map_max = self._compute_min_max(self.anomaly_map) + pred_mask_min, pred_mask_max = self._compute_min_max(self.pred_mask) + return ( + f"anomaly_map min:{anomaly_map_min} max:{anomaly_map_max};" + f"pred_score:{np.round(self.pred_score, 1) if self.pred_score else 0.0};" + f"pred_label:{self.pred_label};" + f"pred_mask min:{pred_mask_min} max:{pred_mask_max};" + ) diff --git a/model_api/python/model_api/models/result_types/base.py b/model_api/python/model_api/models/result_types/base.py new file mode 100644 index 00000000..f65f748b --- /dev/null +++ b/model_api/python/model_api/models/result_types/base.py @@ -0,0 +1,13 @@ +"""Base result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC +from typing import NamedTuple + +from model_api.visualizer.visualize_mixin import VisualizeMixin + + +class Result(VisualizeMixin, ABC): + """Base result type.""" diff --git a/model_api/python/model_api/models/result_types/classification.py b/model_api/python/model_api/models/result_types/classification.py new file mode 100644 index 00000000..5ef44155 --- /dev/null +++ b/model_api/python/model_api/models/result_types/classification.py @@ -0,0 +1,34 @@ +"""Classification result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np + +from model_api.models.result_types.base import Result + +from .utils import array_shape_to_str + + +class ClassificationResult(Result): + """Results for classification models.""" + + def __init__( + self, + top_labels: list[tuple[int, str, float]] | None = None, + saliency_map: np.ndarray | None = None, + feature_vector: np.ndarray | None = None, + raw_scores: np.ndarray | None = None, + ) -> None: + self.top_labels = top_labels + self.saliency_map = saliency_map + self.feature_vector = feature_vector + self.raw_scores = raw_scores + + def __str__(self) -> str: + assert self.top_labels is not None + labels = ", ".join(f"{idx} ({label}): {confidence:.3f}" for idx, label, confidence in self.top_labels) + return ( + f"{labels}, {array_shape_to_str(self.saliency_map)}, {array_shape_to_str(self.feature_vector)}, " + f"{array_shape_to_str(self.raw_scores)}" + ) diff --git a/model_api/python/model_api/models/result_types/detection.py b/model_api/python/model_api/models/result_types/detection.py new file mode 100644 index 00000000..6cbbb676 --- /dev/null +++ b/model_api/python/model_api/models/result_types/detection.py @@ -0,0 +1,40 @@ +"""Detection result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +import numpy as np + +from model_api.models.result_types.base import Result + +from .utils import array_shape_to_str + + +class Detection: + def __init__(self, xmin, ymin, xmax, ymax, score, id, str_label=None) -> None: + self.xmin: int = xmin + self.ymin: int = ymin + self.xmax: int = xmax + self.ymax: int = ymax + self.score: float = score + self.id: int = int(id) + self.str_label: str | None = str_label + + def __str__(self): + return f"{self.xmin}, {self.ymin}, {self.xmax}, {self.ymax}, {self.id} ({self.str_label}): {self.score:.3f}" + + +class DetectionResult(Detection, Result): + """Result for detection model.""" + + objects: list[Detection] | None = None + saliency_map: np.ndarray | None = None + feature_vector: np.ndarray | None = None + + def __str__(self): + assert self.objects is not None + obj_str = "; ".join(str(obj) for obj in self.objects) + if obj_str: + obj_str += "; " + return f"{obj_str}{array_shape_to_str(self.saliency_map)}; {array_shape_to_str(self.feature_vector)}" diff --git a/model_api/python/model_api/models/result_types/keypoint_detection.py b/model_api/python/model_api/models/result_types/keypoint_detection.py new file mode 100644 index 00000000..68def7a7 --- /dev/null +++ b/model_api/python/model_api/models/result_types/keypoint_detection.py @@ -0,0 +1,19 @@ +"""Keypoint detection result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np + + +class DetectedKeypoints: + def __init__(self, keypoints: np.ndarray, scores: np.ndarray) -> None: + self.keypoints = keypoints + self.scores = scores + + def __str__(self): + return ( + f"keypoints: {self.keypoints.shape}, " + f"keypoints_x_sum: {np.sum(self.keypoints[:, :1]):.3f}, " + f"scores: {self.scores.shape}" + ) diff --git a/model_api/python/model_api/models/result_types/segmentation.py b/model_api/python/model_api/models/result_types/segmentation.py new file mode 100644 index 00000000..58d9a02b --- /dev/null +++ b/model_api/python/model_api/models/result_types/segmentation.py @@ -0,0 +1,126 @@ +"""Segmentation result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import cv +import numpy as np +from cv2.typing import RotatedRect + +from model_api.python.model_api.models.result_types.utils import array_shape_to_str + + +class SegmentedObject: + def __init__( + self, + xmin: int, + ymin: int, + xmax: int, + ymax: int, + score: float, + id: int, + mask: np.ndarray, + str_label: str | None = None, + ) -> None: + self.xmin = xmin + self.ymin = ymin + self.xmax = xmax + self.ymax = ymax + self.score = score + self.id = id + self.str_label = str_label + self.mask = mask + + def __str__(self): + return f"{super().__str__()}, {(self.mask > 0.5).sum()}" + + +class SegmentedObjectWithRects(SegmentedObject): + def __init__(self, segmented_object: SegmentedObject, rotated_rect: RotatedRect) -> None: + super().__init__( + segmented_object.xmin, + segmented_object.ymin, + segmented_object.xmax, + segmented_object.ymax, + segmented_object.score, + segmented_object.id, + segmented_object.str_label, + segmented_object.mask, + ) + self.rotated_rect = rotated_rect + + def __str__(self): + res = super().__str__() + rect = self.rotated_rect + res += f", RotatedRect: {rect[0][0]:.3f} {rect[0][1]:.3f} {rect[1][0]:.3f} {rect[1][1]:.3f} {rect[2]:.3f}" + return res + + +class InstanceSegmentationResult: + def __init__( + self, + segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects], + saliency_map: list[np.ndarray], + feature_vector: np.ndarray, + ) -> None: + self.segmentedObjects = segmentedObjects + # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists + self.saliency_map = saliency_map + self.feature_vector = feature_vector + + def __str__(self): + obj_str = "; ".join(str(obj) for obj in self.segmentedObjects) + filled = 0 + for cls_map in self.saliency_map: + if cls_map.size: + filled += 1 + prefix = f"{obj_str}; " if len(obj_str) else "" + return prefix + f"{filled}; [{','.join(str(i) for i in self.feature_vector.shape)}]" + + +class Contour: + def __init__( + self, + label: str, + probability: float, + shape: list[tuple[int, int]], + ) -> None: + self.label = label + self.probability = probability + self.shape = shape + + def __str__(self): + return f"{self.label}: {self.probability:.3f}, {len(self.shape)}" + + +class ImageResultWithSoftPrediction: + def __init__( + self, + resultImage: np.ndarray, + soft_prediction: np.ndarray, + saliency_map: np.ndarray, + feature_vector: np.ndarray, + ) -> None: + self.resultImage = resultImage + self.soft_prediction = soft_prediction + # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists + self.saliency_map = saliency_map # Requires return_soft_prediction==True + self.feature_vector = feature_vector + + def __str__(self): + outHist = cv.calcHist( + [self.resultImage.astype(np.uint8)], + channels=None, + mask=None, + histSize=[256], + ranges=[0, 255], + ) + hist = "" + for i, count in enumerate(outHist): + if count > 0: + hist += f"{i}: {count[0] / self.resultImage.size:.3f}, " + return ( + f"{hist}{array_shape_to_str(self.soft_prediction)}, " + f"{array_shape_to_str(self.saliency_map)}, " + f"{array_shape_to_str(self.feature_vector)}" + ) diff --git a/model_api/python/model_api/models/result_types/utils.py b/model_api/python/model_api/models/result_types/utils.py new file mode 100644 index 00000000..d7ca89d1 --- /dev/null +++ b/model_api/python/model_api/models/result_types/utils.py @@ -0,0 +1,12 @@ +"""Utilities for working with result types.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np + + +def array_shape_to_str(array: np.ndarray | None) -> str: + if array is not None: + return f"[{','.join(str(i) for i in array.shape)}]" + return "[]" diff --git a/model_api/python/model_api/models/result_types/visual_prompting.py b/model_api/python/model_api/models/result_types/visual_prompting.py new file mode 100644 index 00000000..9d628ee6 --- /dev/null +++ b/model_api/python/model_api/models/result_types/visual_prompting.py @@ -0,0 +1,89 @@ +"""Visual Prompting result type.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np + + +class VisualPromptingResult: + def __init__( + self, + upscaled_masks: list[np.ndarray] | None = None, + processed_mask: list[np.ndarray] | None = None, + low_res_masks: list[np.ndarray] | None = None, + iou_predictions: list[np.ndarray] | None = None, + scores: list[np.ndarray] | None = None, + labels: list[np.ndarray] | None = None, + hard_predictions: list[np.ndarray] | None = None, + soft_predictions: list[np.ndarray] | None = None, + best_iou: list[float] | None = None, + ) -> None: + self.upscaled_masks = upscaled_masks + self.processed_mask = processed_mask + self.low_res_masks = low_res_masks + self.iou_predictions = iou_predictions + self.scores = scores + self.labels = labels + self.hard_predictions = hard_predictions + self.soft_predictions = soft_predictions + self.best_iou = best_iou + + def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + return tensor.min(), tensor.max() + + def __str__(self) -> str: + assert self.hard_predictions is not None + assert self.upscaled_masks is not None + upscaled_masks_min, upscaled_masks_max = self._compute_min_max( + self.upscaled_masks[0], + ) + + return ( + f"upscaled_masks min:{upscaled_masks_min:.3f} max:{upscaled_masks_max:.3f};" + f"hard_predictions shape:{self.hard_predictions[0].shape};" + ) + + +class PredictedMask: + def __init__( + self, + mask: list[np.ndarray], + points: list[np.ndarray] | np.ndarray, + scores: list[float] | np.ndarray, + ) -> None: + self.mask = mask + self.points = points + self.scores = scores + + def __str__(self) -> str: + obj_str = "" + obj_str += f"mask sum: {np.sum(sum(self.mask))}; " + + if isinstance(self.points, list): + for i, point in enumerate(self.points): + obj_str += "[" + obj_str += ", ".join(str(round(c, 2)) for c in point) + obj_str += "] " + obj_str += "iou: " + f"{float(self.scores[i]):.3f} " + else: + for i in range(self.points.shape[0]): + point = self.points[i] + obj_str += "[" + obj_str += ", ".join(str(round(c, 2)) for c in point) + obj_str += "] " + obj_str += "iou: " + f"{float(self.scores[i]):.3f} " + + return obj_str.strip() + + +class ZSLVisualPromptingResult: + def __init__(self, data: dict[int, PredictedMask]) -> None: + self.data: dict[int, PredictedMask] + + def __str__(self) -> str: + return ", ".join(str(self.data[k]) for k in self.data) + + def get_mask(self, label: int) -> PredictedMask: + """Returns a mask belonging to a given label""" + return self.data[label] diff --git a/model_api/python/model_api/visualizer/__init__.py b/model_api/python/model_api/visualizer/__init__.py new file mode 100644 index 00000000..87b312bc --- /dev/null +++ b/model_api/python/model_api/visualizer/__init__.py @@ -0,0 +1,8 @@ +"""Visualizer.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from .visualizer import Visualizer + +__all__ = ["Visualizer"] diff --git a/model_api/python/model_api/visualizer/primitives.py b/model_api/python/model_api/visualizer/primitives.py new file mode 100644 index 00000000..3de07d3c --- /dev/null +++ b/model_api/python/model_api/visualizer/primitives.py @@ -0,0 +1,10 @@ +"""Base class for primitives.""" + +from abc import ABC + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +class Primitive(ABC): + """Primitive class.""" diff --git a/model_api/python/model_api/visualizer/visualize_mixin.py b/model_api/python/model_api/visualizer/visualize_mixin.py new file mode 100644 index 00000000..651b659b --- /dev/null +++ b/model_api/python/model_api/visualizer/visualize_mixin.py @@ -0,0 +1,13 @@ +"""Mixin for visualization.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC + + +class VisualizeMixin(ABC): + """Mixin for visualization.""" + + def get_labels(self): + """Get labels.""" diff --git a/model_api/python/model_api/visualizer/visualizer.py b/model_api/python/model_api/visualizer/visualizer.py new file mode 100644 index 00000000..164b8603 --- /dev/null +++ b/model_api/python/model_api/visualizer/visualizer.py @@ -0,0 +1,14 @@ +"""Visualizer.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from PIL import Image + +from model_api.visualizer.visualize_mixin import VisualizeMixin + + +class Visualizer: + + def show(self, image: Image, result: VisualizeMixin) -> None: + pass diff --git a/model_api/python/pyproject.toml b/model_api/python/pyproject.toml index c011e57f..ed2ba565 100644 --- a/model_api/python/pyproject.toml +++ b/model_api/python/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "openvino>=2024.0", "openvino-dev>=2024.0", "omz_tools @ git+https://github.com/openvinotoolkit/open_model_zoo.git@master#egg=omz_tools&subdirectory=tools/model_tools", + "pillow", ] [project.optional-dependencies] From 7f20029fa092885d97af7686aa52359357a3f17c Mon Sep 17 00:00:00 2001 From: Ashwin Vaidya Date: Wed, 13 Nov 2024 16:56:04 +0100 Subject: [PATCH 2/5] add initial visualizer Signed-off-by: Ashwin Vaidya --- .../python/model_api/models/result_types.py | 337 ++++++++++++++++++ .../model_api/models/result_types/__init__.py | 8 - .../models/result_types/anomaly_result.py | 42 --- .../model_api/models/result_types/base.py | 13 - .../models/result_types/classification.py | 34 -- .../models/result_types/detection.py | 40 --- .../models/result_types/keypoint_detection.py | 19 - .../models/result_types/segmentation.py | 126 ------- .../model_api/models/result_types/utils.py | 12 - .../models/result_types/visual_prompting.py | 89 ----- .../python/model_api/visualizer/primitives.py | 128 ++++++- .../model_api/visualizer/visualize_mixin.py | 73 +++- .../python/model_api/visualizer/visualizer.py | 123 ++++++- 13 files changed, 655 insertions(+), 389 deletions(-) create mode 100644 model_api/python/model_api/models/result_types.py delete mode 100644 model_api/python/model_api/models/result_types/__init__.py delete mode 100644 model_api/python/model_api/models/result_types/anomaly_result.py delete mode 100644 model_api/python/model_api/models/result_types/base.py delete mode 100644 model_api/python/model_api/models/result_types/classification.py delete mode 100644 model_api/python/model_api/models/result_types/detection.py delete mode 100644 model_api/python/model_api/models/result_types/keypoint_detection.py delete mode 100644 model_api/python/model_api/models/result_types/segmentation.py delete mode 100644 model_api/python/model_api/models/result_types/utils.py delete mode 100644 model_api/python/model_api/models/result_types/visual_prompting.py diff --git a/model_api/python/model_api/models/result_types.py b/model_api/python/model_api/models/result_types.py new file mode 100644 index 00000000..3c848712 --- /dev/null +++ b/model_api/python/model_api/models/result_types.py @@ -0,0 +1,337 @@ +"""Result types.""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from abc import ABC +from typing import NamedTuple + +import cv2 +import numpy as np + +from model_api.visualizer.primitives import BoundingBoxes, Label, Overlay, Polygon +from model_api.visualizer.visualize_mixin import VisualizeMixin + + +class Result(VisualizeMixin, ABC): + """Base result type.""" + + +class AnomalyResult(Result): + """Results for anomaly models.""" + + def __init__( + self, + anomaly_map: np.ndarray | None = None, + pred_boxes: np.ndarray | None = None, + pred_label: str | None = None, + pred_mask: np.ndarray | None = None, + pred_score: float | None = None, + ) -> None: + self.anomaly_map = anomaly_map + self.pred_boxes = pred_boxes + self.pred_label = pred_label + self.pred_mask = pred_mask + self.pred_score = pred_score + + def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Computes min and max values of the tensor.""" + return tensor.min(), tensor.max() + + def __str__(self) -> str: + assert self.anomaly_map is not None + assert self.pred_mask is not None + anomaly_map_min, anomaly_map_max = self._compute_min_max(self.anomaly_map) + pred_mask_min, pred_mask_max = self._compute_min_max(self.pred_mask) + return ( + f"anomaly_map min:{anomaly_map_min} max:{anomaly_map_max};" + f"pred_score:{np.round(self.pred_score, 1) if self.pred_score else 0.0};" + f"pred_label:{self.pred_label};" + f"pred_mask min:{pred_mask_min} max:{pred_mask_max};" + ) + + def _register_primitives(self) -> None: + """Converts the result to primitives.""" + anomaly_map = cv2.applyColorMap(self.anomaly_map, cv2.COLORMAP_JET) + self._add_primitive(Overlay(anomaly_map)) + for box in self.pred_boxes: + self._add_primitive(BoundingBoxes(*box)) + self._add_primitive(Label(self.pred_label, bg_color="red" if self.pred_label == "Anomaly" else "green")) + self._add_primitive(Label(f"Score: {self.pred_score}")) + self._add_primitive(Polygon(mask=self.pred_mask)) + + +class ClassificationResult(Result): + """Results for classification models.""" + + def __init__( + self, + top_labels: list[tuple[int, str, float]] | None = None, + saliency_map: np.ndarray | None = None, + feature_vector: np.ndarray | None = None, + raw_scores: np.ndarray | None = None, + ) -> None: + self.top_labels = top_labels + self.saliency_map = saliency_map + self.feature_vector = feature_vector + self.raw_scores = raw_scores + + def __str__(self) -> str: + assert self.top_labels is not None + labels = ", ".join(f"{idx} ({label}): {confidence:.3f}" for idx, label, confidence in self.top_labels) + return ( + f"{labels}, {_array_shape_to_str(self.saliency_map)}, {_array_shape_to_str(self.feature_vector)}, " + f"{_array_shape_to_str(self.raw_scores)}" + ) + + def _register_primitives(self) -> None: + pass + + +class Detection: + def __init__(self, xmin, ymin, xmax, ymax, score, id, str_label=None) -> None: + self.xmin: int = xmin + self.ymin: int = ymin + self.xmax: int = xmax + self.ymax: int = ymax + self.score: float = score + self.id: int = int(id) + self.str_label: str | None = str_label + + def __str__(self): + return f"{self.xmin}, {self.ymin}, {self.xmax}, {self.ymax}, {self.id} ({self.str_label}): {self.score:.3f}" + + +class DetectionResult(Detection, Result): + """Result for detection model.""" + + objects: list[Detection] | None = None + saliency_map: np.ndarray | None = None + feature_vector: np.ndarray | None = None + + def __str__(self): + assert self.objects is not None + obj_str = "; ".join(str(obj) for obj in self.objects) + if obj_str: + obj_str += "; " + return f"{obj_str}{_array_shape_to_str(self.saliency_map)}; {_array_shape_to_str(self.feature_vector)}" + + +class DetectedKeypoints: + def __init__(self, keypoints: np.ndarray, scores: np.ndarray) -> None: + self.keypoints = keypoints + self.scores = scores + + def __str__(self): + return ( + f"keypoints: {self.keypoints.shape}, " + f"keypoints_x_sum: {np.sum(self.keypoints[:, :1]):.3f}, " + f"scores: {self.scores.shape}" + ) + + +class SegmentedObject: + def __init__( + self, + xmin: int, + ymin: int, + xmax: int, + ymax: int, + score: float, + id: int, + mask: np.ndarray, + str_label: str | None = None, + ) -> None: + self.xmin = xmin + self.ymin = ymin + self.xmax = xmax + self.ymax = ymax + self.score = score + self.id = id + self.str_label = str_label + self.mask = mask + + def __str__(self): + return f"{super().__str__()}, {(self.mask > 0.5).sum()}" + + +class SegmentedObjectWithRects(SegmentedObject): + def __init__(self, segmented_object: SegmentedObject, rotated_rect: RotatedRect) -> None: + super().__init__( + segmented_object.xmin, + segmented_object.ymin, + segmented_object.xmax, + segmented_object.ymax, + segmented_object.score, + segmented_object.id, + segmented_object.str_label, + segmented_object.mask, + ) + self.rotated_rect = rotated_rect + + def __str__(self): + res = super().__str__() + rect = self.rotated_rect + res += f", RotatedRect: {rect[0][0]:.3f} {rect[0][1]:.3f} {rect[1][0]:.3f} {rect[1][1]:.3f} {rect[2]:.3f}" + return res + + +class InstanceSegmentationResult: + def __init__( + self, + segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects], + saliency_map: list[np.ndarray], + feature_vector: np.ndarray, + ) -> None: + self.segmentedObjects = segmentedObjects + # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists + self.saliency_map = saliency_map + self.feature_vector = feature_vector + + def __str__(self): + obj_str = "; ".join(str(obj) for obj in self.segmentedObjects) + filled = 0 + for cls_map in self.saliency_map: + if cls_map.size: + filled += 1 + prefix = f"{obj_str}; " if len(obj_str) else "" + return prefix + f"{filled}; [{','.join(str(i) for i in self.feature_vector.shape)}]" + + +class Contour: + def __init__( + self, + label: str, + probability: float, + shape: list[tuple[int, int]], + ) -> None: + self.label = label + self.probability = probability + self.shape = shape + + def __str__(self): + return f"{self.label}: {self.probability:.3f}, {len(self.shape)}" + + +class ImageResultWithSoftPrediction: + def __init__( + self, + resultImage: np.ndarray, + soft_prediction: np.ndarray, + saliency_map: np.ndarray, + feature_vector: np.ndarray, + ) -> None: + self.resultImage = resultImage + self.soft_prediction = soft_prediction + # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists + self.saliency_map = saliency_map # Requires return_soft_prediction==True + self.feature_vector = feature_vector + + def __str__(self): + outHist = cv2.calcHist( + [self.resultImage.astype(np.uint8)], + channels=None, + mask=None, + histSize=[256], + ranges=[0, 255], + ) + hist = "" + for i, count in enumerate(outHist): + if count > 0: + hist += f"{i}: {count[0] / self.resultImage.size:.3f}, " + return ( + f"{hist}{_array_shape_to_str(self.soft_prediction)}, " + f"{_array_shape_to_str(self.saliency_map)}, " + f"{_array_shape_to_str(self.feature_vector)}" + ) + + +class VisualPromptingResult: + def __init__( + self, + upscaled_masks: list[np.ndarray] | None = None, + processed_mask: list[np.ndarray] | None = None, + low_res_masks: list[np.ndarray] | None = None, + iou_predictions: list[np.ndarray] | None = None, + scores: list[np.ndarray] | None = None, + labels: list[np.ndarray] | None = None, + hard_predictions: list[np.ndarray] | None = None, + soft_predictions: list[np.ndarray] | None = None, + best_iou: list[float] | None = None, + ) -> None: + self.upscaled_masks = upscaled_masks + self.processed_mask = processed_mask + self.low_res_masks = low_res_masks + self.iou_predictions = iou_predictions + self.scores = scores + self.labels = labels + self.hard_predictions = hard_predictions + self.soft_predictions = soft_predictions + self.best_iou = best_iou + + def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + return tensor.min(), tensor.max() + + def __str__(self) -> str: + assert self.hard_predictions is not None + assert self.upscaled_masks is not None + upscaled_masks_min, upscaled_masks_max = self._compute_min_max( + self.upscaled_masks[0], + ) + + return ( + f"upscaled_masks min:{upscaled_masks_min:.3f} max:{upscaled_masks_max:.3f};" + f"hard_predictions shape:{self.hard_predictions[0].shape};" + ) + + +class PredictedMask: + def __init__( + self, + mask: list[np.ndarray], + points: list[np.ndarray] | np.ndarray, + scores: list[float] | np.ndarray, + ) -> None: + self.mask = mask + self.points = points + self.scores = scores + + def __str__(self) -> str: + obj_str = "" + obj_str += f"mask sum: {np.sum(sum(self.mask))}; " + + if isinstance(self.points, list): + for i, point in enumerate(self.points): + obj_str += "[" + obj_str += ", ".join(str(round(c, 2)) for c in point) + obj_str += "] " + obj_str += "iou: " + f"{float(self.scores[i]):.3f} " + else: + for i in range(self.points.shape[0]): + point = self.points[i] + obj_str += "[" + obj_str += ", ".join(str(round(c, 2)) for c in point) + obj_str += "] " + obj_str += "iou: " + f"{float(self.scores[i]):.3f} " + + return obj_str.strip() + + +class ZSLVisualPromptingResult: + def __init__(self, data: dict[int, PredictedMask]) -> None: + self.data: dict[int, PredictedMask] + + def __str__(self) -> str: + return ", ".join(str(self.data[k]) for k in self.data) + + def get_mask(self, label: int) -> PredictedMask: + """Returns a mask belonging to a given label""" + return self.data[label] + + +def _array_shape_to_str(array: np.ndarray | None) -> str: + if array is not None: + return f"[{','.join(str(i) for i in array.shape)}]" + return "[]" diff --git a/model_api/python/model_api/models/result_types/__init__.py b/model_api/python/model_api/models/result_types/__init__.py deleted file mode 100644 index 5ec07379..00000000 --- a/model_api/python/model_api/models/result_types/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -"""Result types.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from .anomaly_result import AnomalyResult - -__all__ = ["AnomalyResult"] diff --git a/model_api/python/model_api/models/result_types/anomaly_result.py b/model_api/python/model_api/models/result_types/anomaly_result.py deleted file mode 100644 index 4bbcbd32..00000000 --- a/model_api/python/model_api/models/result_types/anomaly_result.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Anomaly result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import numpy as np - -from model_api.models.result_types.base import Result - - -class AnomalyResult(Result): - """Results for anomaly models.""" - - def __init__( - self, - anomaly_map: np.ndarray | None = None, - pred_boxes: np.ndarray | None = None, - pred_label: str | None = None, - pred_mask: np.ndarray | None = None, - pred_score: float | None = None, - ) -> None: - self.anomaly_map = anomaly_map - self.pred_boxes = pred_boxes - self.pred_label = pred_label - self.pred_mask = pred_mask - self.pred_score = pred_score - - def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: - """Computes min and max values of the tensor.""" - return tensor.min(), tensor.max() - - def __str__(self) -> str: - assert self.anomaly_map is not None - assert self.pred_mask is not None - anomaly_map_min, anomaly_map_max = self._compute_min_max(self.anomaly_map) - pred_mask_min, pred_mask_max = self._compute_min_max(self.pred_mask) - return ( - f"anomaly_map min:{anomaly_map_min} max:{anomaly_map_max};" - f"pred_score:{np.round(self.pred_score, 1) if self.pred_score else 0.0};" - f"pred_label:{self.pred_label};" - f"pred_mask min:{pred_mask_min} max:{pred_mask_max};" - ) diff --git a/model_api/python/model_api/models/result_types/base.py b/model_api/python/model_api/models/result_types/base.py deleted file mode 100644 index f65f748b..00000000 --- a/model_api/python/model_api/models/result_types/base.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Base result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from abc import ABC -from typing import NamedTuple - -from model_api.visualizer.visualize_mixin import VisualizeMixin - - -class Result(VisualizeMixin, ABC): - """Base result type.""" diff --git a/model_api/python/model_api/models/result_types/classification.py b/model_api/python/model_api/models/result_types/classification.py deleted file mode 100644 index 5ef44155..00000000 --- a/model_api/python/model_api/models/result_types/classification.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Classification result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import numpy as np - -from model_api.models.result_types.base import Result - -from .utils import array_shape_to_str - - -class ClassificationResult(Result): - """Results for classification models.""" - - def __init__( - self, - top_labels: list[tuple[int, str, float]] | None = None, - saliency_map: np.ndarray | None = None, - feature_vector: np.ndarray | None = None, - raw_scores: np.ndarray | None = None, - ) -> None: - self.top_labels = top_labels - self.saliency_map = saliency_map - self.feature_vector = feature_vector - self.raw_scores = raw_scores - - def __str__(self) -> str: - assert self.top_labels is not None - labels = ", ".join(f"{idx} ({label}): {confidence:.3f}" for idx, label, confidence in self.top_labels) - return ( - f"{labels}, {array_shape_to_str(self.saliency_map)}, {array_shape_to_str(self.feature_vector)}, " - f"{array_shape_to_str(self.raw_scores)}" - ) diff --git a/model_api/python/model_api/models/result_types/detection.py b/model_api/python/model_api/models/result_types/detection.py deleted file mode 100644 index 6cbbb676..00000000 --- a/model_api/python/model_api/models/result_types/detection.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Detection result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - - -import numpy as np - -from model_api.models.result_types.base import Result - -from .utils import array_shape_to_str - - -class Detection: - def __init__(self, xmin, ymin, xmax, ymax, score, id, str_label=None) -> None: - self.xmin: int = xmin - self.ymin: int = ymin - self.xmax: int = xmax - self.ymax: int = ymax - self.score: float = score - self.id: int = int(id) - self.str_label: str | None = str_label - - def __str__(self): - return f"{self.xmin}, {self.ymin}, {self.xmax}, {self.ymax}, {self.id} ({self.str_label}): {self.score:.3f}" - - -class DetectionResult(Detection, Result): - """Result for detection model.""" - - objects: list[Detection] | None = None - saliency_map: np.ndarray | None = None - feature_vector: np.ndarray | None = None - - def __str__(self): - assert self.objects is not None - obj_str = "; ".join(str(obj) for obj in self.objects) - if obj_str: - obj_str += "; " - return f"{obj_str}{array_shape_to_str(self.saliency_map)}; {array_shape_to_str(self.feature_vector)}" diff --git a/model_api/python/model_api/models/result_types/keypoint_detection.py b/model_api/python/model_api/models/result_types/keypoint_detection.py deleted file mode 100644 index 68def7a7..00000000 --- a/model_api/python/model_api/models/result_types/keypoint_detection.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Keypoint detection result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import numpy as np - - -class DetectedKeypoints: - def __init__(self, keypoints: np.ndarray, scores: np.ndarray) -> None: - self.keypoints = keypoints - self.scores = scores - - def __str__(self): - return ( - f"keypoints: {self.keypoints.shape}, " - f"keypoints_x_sum: {np.sum(self.keypoints[:, :1]):.3f}, " - f"scores: {self.scores.shape}" - ) diff --git a/model_api/python/model_api/models/result_types/segmentation.py b/model_api/python/model_api/models/result_types/segmentation.py deleted file mode 100644 index 58d9a02b..00000000 --- a/model_api/python/model_api/models/result_types/segmentation.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Segmentation result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import cv -import numpy as np -from cv2.typing import RotatedRect - -from model_api.python.model_api.models.result_types.utils import array_shape_to_str - - -class SegmentedObject: - def __init__( - self, - xmin: int, - ymin: int, - xmax: int, - ymax: int, - score: float, - id: int, - mask: np.ndarray, - str_label: str | None = None, - ) -> None: - self.xmin = xmin - self.ymin = ymin - self.xmax = xmax - self.ymax = ymax - self.score = score - self.id = id - self.str_label = str_label - self.mask = mask - - def __str__(self): - return f"{super().__str__()}, {(self.mask > 0.5).sum()}" - - -class SegmentedObjectWithRects(SegmentedObject): - def __init__(self, segmented_object: SegmentedObject, rotated_rect: RotatedRect) -> None: - super().__init__( - segmented_object.xmin, - segmented_object.ymin, - segmented_object.xmax, - segmented_object.ymax, - segmented_object.score, - segmented_object.id, - segmented_object.str_label, - segmented_object.mask, - ) - self.rotated_rect = rotated_rect - - def __str__(self): - res = super().__str__() - rect = self.rotated_rect - res += f", RotatedRect: {rect[0][0]:.3f} {rect[0][1]:.3f} {rect[1][0]:.3f} {rect[1][1]:.3f} {rect[2]:.3f}" - return res - - -class InstanceSegmentationResult: - def __init__( - self, - segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects], - saliency_map: list[np.ndarray], - feature_vector: np.ndarray, - ) -> None: - self.segmentedObjects = segmentedObjects - # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists - self.saliency_map = saliency_map - self.feature_vector = feature_vector - - def __str__(self): - obj_str = "; ".join(str(obj) for obj in self.segmentedObjects) - filled = 0 - for cls_map in self.saliency_map: - if cls_map.size: - filled += 1 - prefix = f"{obj_str}; " if len(obj_str) else "" - return prefix + f"{filled}; [{','.join(str(i) for i in self.feature_vector.shape)}]" - - -class Contour: - def __init__( - self, - label: str, - probability: float, - shape: list[tuple[int, int]], - ) -> None: - self.label = label - self.probability = probability - self.shape = shape - - def __str__(self): - return f"{self.label}: {self.probability:.3f}, {len(self.shape)}" - - -class ImageResultWithSoftPrediction: - def __init__( - self, - resultImage: np.ndarray, - soft_prediction: np.ndarray, - saliency_map: np.ndarray, - feature_vector: np.ndarray, - ) -> None: - self.resultImage = resultImage - self.soft_prediction = soft_prediction - # Contain per class saliency_maps and "feature_vector" model output if feature_vector exists - self.saliency_map = saliency_map # Requires return_soft_prediction==True - self.feature_vector = feature_vector - - def __str__(self): - outHist = cv.calcHist( - [self.resultImage.astype(np.uint8)], - channels=None, - mask=None, - histSize=[256], - ranges=[0, 255], - ) - hist = "" - for i, count in enumerate(outHist): - if count > 0: - hist += f"{i}: {count[0] / self.resultImage.size:.3f}, " - return ( - f"{hist}{array_shape_to_str(self.soft_prediction)}, " - f"{array_shape_to_str(self.saliency_map)}, " - f"{array_shape_to_str(self.feature_vector)}" - ) diff --git a/model_api/python/model_api/models/result_types/utils.py b/model_api/python/model_api/models/result_types/utils.py deleted file mode 100644 index d7ca89d1..00000000 --- a/model_api/python/model_api/models/result_types/utils.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Utilities for working with result types.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import numpy as np - - -def array_shape_to_str(array: np.ndarray | None) -> str: - if array is not None: - return f"[{','.join(str(i) for i in array.shape)}]" - return "[]" diff --git a/model_api/python/model_api/models/result_types/visual_prompting.py b/model_api/python/model_api/models/result_types/visual_prompting.py deleted file mode 100644 index 9d628ee6..00000000 --- a/model_api/python/model_api/models/result_types/visual_prompting.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Visual Prompting result type.""" - -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import numpy as np - - -class VisualPromptingResult: - def __init__( - self, - upscaled_masks: list[np.ndarray] | None = None, - processed_mask: list[np.ndarray] | None = None, - low_res_masks: list[np.ndarray] | None = None, - iou_predictions: list[np.ndarray] | None = None, - scores: list[np.ndarray] | None = None, - labels: list[np.ndarray] | None = None, - hard_predictions: list[np.ndarray] | None = None, - soft_predictions: list[np.ndarray] | None = None, - best_iou: list[float] | None = None, - ) -> None: - self.upscaled_masks = upscaled_masks - self.processed_mask = processed_mask - self.low_res_masks = low_res_masks - self.iou_predictions = iou_predictions - self.scores = scores - self.labels = labels - self.hard_predictions = hard_predictions - self.soft_predictions = soft_predictions - self.best_iou = best_iou - - def _compute_min_max(self, tensor: np.ndarray) -> tuple[np.ndarray, np.ndarray]: - return tensor.min(), tensor.max() - - def __str__(self) -> str: - assert self.hard_predictions is not None - assert self.upscaled_masks is not None - upscaled_masks_min, upscaled_masks_max = self._compute_min_max( - self.upscaled_masks[0], - ) - - return ( - f"upscaled_masks min:{upscaled_masks_min:.3f} max:{upscaled_masks_max:.3f};" - f"hard_predictions shape:{self.hard_predictions[0].shape};" - ) - - -class PredictedMask: - def __init__( - self, - mask: list[np.ndarray], - points: list[np.ndarray] | np.ndarray, - scores: list[float] | np.ndarray, - ) -> None: - self.mask = mask - self.points = points - self.scores = scores - - def __str__(self) -> str: - obj_str = "" - obj_str += f"mask sum: {np.sum(sum(self.mask))}; " - - if isinstance(self.points, list): - for i, point in enumerate(self.points): - obj_str += "[" - obj_str += ", ".join(str(round(c, 2)) for c in point) - obj_str += "] " - obj_str += "iou: " + f"{float(self.scores[i]):.3f} " - else: - for i in range(self.points.shape[0]): - point = self.points[i] - obj_str += "[" - obj_str += ", ".join(str(round(c, 2)) for c in point) - obj_str += "] " - obj_str += "iou: " + f"{float(self.scores[i]):.3f} " - - return obj_str.strip() - - -class ZSLVisualPromptingResult: - def __init__(self, data: dict[int, PredictedMask]) -> None: - self.data: dict[int, PredictedMask] - - def __str__(self) -> str: - return ", ".join(str(self.data[k]) for k in self.data) - - def get_mask(self, label: int) -> PredictedMask: - """Returns a mask belonging to a given label""" - return self.data[label] diff --git a/model_api/python/model_api/visualizer/primitives.py b/model_api/python/model_api/visualizer/primitives.py index 3de07d3c..1abc4718 100644 --- a/model_api/python/model_api/visualizer/primitives.py +++ b/model_api/python/model_api/visualizer/primitives.py @@ -1,10 +1,134 @@ """Base class for primitives.""" -from abc import ABC - # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from abc import ABC, abstractmethod +from io import BytesIO + +import cv2 +import numpy as np +from PIL import Image, ImageDraw, ImageFont + class Primitive(ABC): """Primitive class.""" + + @abstractmethod + def compute(self, **kwargs) -> Image: + pass + + +class Label(Primitive): + """Label primitive.""" + + def __init__( + self, + label: str, + fg_color: str | tuple[int, int, int] = "black", + bg_color: str | tuple[int, int, int] = "yellow", + font_path: str | None | BytesIO = None, + size: int = 16, + ) -> None: + self.label = label + self.fg_color = fg_color + self.bg_color = bg_color + self.font = ImageFont.load_default(size=size) if font_path is None else ImageFont.truetype(font_path, size) + + def compute(self, image: Image, overlay_on_image: bool = True, buffer_y: int = 5) -> Image: + """Generate label image. + + If overlay_on_image is True, the label will be drawn on top of the image. + Else only the label will be drawn. This is useful for collecting labels so that they can be drawn on the same + image. + """ + dummy_image = Image.new("RGB", (1, 1)) + draw = ImageDraw.Draw(dummy_image) + textbox = draw.textbbox((0, 0), self.label, font=self.font) + label_image = Image.new("RGB", (textbox[2] - textbox[0], textbox[3] + buffer_y - textbox[1]), self.bg_color) + draw = ImageDraw.Draw(label_image) + draw.text((0, 0), self.label, font=self.font, fill=self.fg_color) + if overlay_on_image: + image.paste(label_image, (0, 0)) + return image + return label_image + + @classmethod + def overlay_labels(cls, image: Image, label_images: list[Image], buffer: int = 5) -> Image: + """Overlay multiple label images on top of the image. + + Paste the labels in a row but wrap the labels if they exceed the image width. + """ + offset_x = 0 + offset_y = 0 + for label_image in label_images: + image.paste(label_image, (offset_x, offset_y)) + offset_x += label_image.width + buffer + if offset_x + label_image.width > image.width: + offset_x = 0 + offset_y += label_image.height + return image + + +class Polygon(Primitive): + """Polygon primitive.""" + + def __init__( + self, + points: list[tuple[int, int]] | None = None, + mask: np.ndarray | None = None, + color: str | tuple[int, int, int] = "blue", + ) -> None: + self.points = self._get_points(points, mask) + self.color = color + + def _get_points(self, points: list[tuple[int, int]] | None, mask: np.ndarray | None) -> list[tuple[int, int]]: + if points is not None: + return points + return self._get_points_from_mask(mask) + + def _get_points_from_mask(self, mask: np.ndarray) -> list[tuple[int, int]]: + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + _points = contours[0].squeeze().tolist() + return [tuple(point) for point in _points] + + def compute(self, image: Image) -> Image: + draw = ImageDraw.Draw(image) + draw.polygon(self.points, fill=self.color) + return image + + +class Overlay(Primitive): + """Overlay an image. + + Useful for XAI and Anomaly Maps. + """ + + def __init__(self, image: Image | np.ndarray, opacity: float = 0.4) -> None: + self.image = self._to_image(image) + self.opacity = opacity + + def _to_image(self, image: Image | np.ndarray) -> Image: + if isinstance(image, Image.Image): + return image + return Image.fromarray(image) + + def compute(self, image: Image) -> Image: + _image = self.image.resize(image.size) + return Image.blend(image, _image, self.opacity) + + +class BoundingBoxes(Primitive): + def __init__(self, x1: int, y1: int, x2: int, y2: int, color: str | tuple[int, int, int] = "blue") -> None: + self.x1 = x1 + self.y1 = y1 + self.x2 = x2 + self.y2 = y2 + self.color = color + + def compute(self, image: Image) -> Image: + draw = ImageDraw.Draw(image) + draw.rectangle([self.x1, self.y1, self.x2, self.y2], fill=None, outline=self.color, width=2) + return image diff --git a/model_api/python/model_api/visualizer/visualize_mixin.py b/model_api/python/model_api/visualizer/visualize_mixin.py index 651b659b..34d38b4f 100644 --- a/model_api/python/model_api/visualizer/visualize_mixin.py +++ b/model_api/python/model_api/visualizer/visualize_mixin.py @@ -3,11 +3,80 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -from abc import ABC +from abc import ABC, abstractmethod + +from .primitives import BoundingBoxes, Label, Overlay, Polygon, Primitive class VisualizeMixin(ABC): """Mixin for visualization.""" - def get_labels(self): + _labels: list[Label] = [] + _polygons: list[Polygon] = [] + _overlays: list[Overlay] = [] + _bounding_boxes: list[BoundingBoxes] = [] + _registered_primitives: bool = False + + @abstractmethod + def _register_primitives(self) -> None: + """Convert result entities to primitives.""" + + def _add_primitive(self, primitive: Primitive) -> None: + """Add primitive.""" + if isinstance(primitive, Label): + self._labels.append(primitive) + elif isinstance(primitive, Polygon): + self._polygons.append(primitive) + elif isinstance(primitive, Overlay): + self._overlays.append(primitive) + elif isinstance(primitive, BoundingBoxes): + self._bounding_boxes.append(primitive) + + @property + def has_labels(self) -> bool: + """Check if there are labels.""" + self._register_primitives_if_needed() + return bool(self._labels) + + @property + def has_bounding_boxes(self) -> bool: + """Check if there are bounding boxes.""" + self._register_primitives_if_needed() + return bool(self._bounding_boxes) + + @property + def has_polygons(self) -> bool: + """Check if there are polygons.""" + self._register_primitives_if_needed() + return bool(self._polygons) + + @property + def has_overlays(self) -> bool: + """Check if there are overlays.""" + self._register_primitives_if_needed() + return bool(self._overlays) + + def get_labels(self) -> list[Label]: """Get labels.""" + self._register_primitives_if_needed() + return self._labels + + def get_polygons(self) -> list[Polygon]: + """Get polygons.""" + self._register_primitives_if_needed() + return self._polygons + + def get_overlays(self) -> list[Overlay]: + """Get overlays.""" + self._register_primitives_if_needed() + return self._overlays + + def get_bounding_boxes(self) -> list[BoundingBoxes]: + """Get bounding boxes.""" + self._register_primitives_if_needed() + return self._bounding_boxes + + def _register_primitives_if_needed(self): + if not self._registered_primitives: + self._register_primitives() + self._registered_primitives = True diff --git a/model_api/python/model_api/visualizer/visualizer.py b/model_api/python/model_api/visualizer/visualizer.py index 164b8603..fdd2992f 100644 --- a/model_api/python/model_api/visualizer/visualizer.py +++ b/model_api/python/model_api/visualizer/visualizer.py @@ -3,12 +3,131 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from enum import Enum + from PIL import Image +from model_api.visualizer.primitives import Label from model_api.visualizer.visualize_mixin import VisualizeMixin -class Visualizer: +class VisualizationType(Enum): + """Visualization type.""" + + FULL = "full" + SIMPLE = "simple" + - def show(self, image: Image, result: VisualizeMixin) -> None: +class Visualizer: + def __init__(self) -> None: + # TODO: add transforms for the source image so that it has the same crop, and size as the model. pass + + def show( + self, + image: Image, + result: VisualizeMixin, + visualization_type: VisualizationType | str = VisualizationType.SIMPLE, + ) -> None: + visualization_type = VisualizationType(visualization_type) + result: Image = self._generate(image, result, visualization_type) + result.show() + + def save( + self, + image: Image, + result: VisualizeMixin, + path: str, + visualization_type: VisualizationType | str = VisualizationType.SIMPLE, + ) -> None: + visualization_type = VisualizationType(visualization_type) + result: Image = self._generate(image, result, visualization_type) + result.save(path) + + def _generate(self, image: Image, result: VisualizeMixin, visualization_type: VisualizationType) -> Image: + result: Image + if visualization_type == VisualizationType.SIMPLE: + result = self._generate_simple(image, result) + else: + result = self._generate_full(image, result) + return result + + def _generate_simple(self, image: Image, result: VisualizeMixin) -> Image: + """Return a single image with stacked visualizations.""" + # 1. Use Overlay + if result.has_overlays: + overlays = result.get_overlays() + for overlay in overlays: + image = overlay.compute(image) + + elif result.has_polygons: # 2. else use polygons + polygons = result.get_polygons() + for polygon in polygons: + image = polygon.compute(image) + + elif result.has_bounding_boxes: # 3. else use bounding boxes + bounding_boxes = result.get_bounding_boxes() + for bounding_box in bounding_boxes: + image = bounding_box.compute(image) + + # Finally add labels + if result.has_labels: + labels = result.get_labels() + label_images = [] + for label in labels: + label_images.append(label.compute(image, overlay_on_image=False)) + image = Label.overlay_labels(image, label_images) + + return image + + def _generate_full(self, image: Image, result: VisualizeMixin) -> Image: + """Return a single image with visualizations side by side.""" + images: list[Image] = [image] + + if result.has_overlays: + overlays = result.get_overlays() + _image = image.copy() + for overlay in overlays: + _image = overlay.compute(_image) + images.append(_image) + if result.has_polygons: + polygons = result.get_polygons() + _image = image.copy() + for polygon in polygons: + _image = polygon.compute(_image) + images.append(_image) + if result.has_bounding_boxes: + bounding_boxes = result.get_bounding_boxes() + _image = image.copy() + for bounding_box in bounding_boxes: + _image = bounding_box.compute(_image) + images.append(_image) + if result.has_labels: + labels = result.get_labels() + for label in labels: + images.append(label.compute(image.copy(), overlay_on_image=True)) + return self._stitch(*images) + + def _stitch(self, *images: Image) -> Image: + """Stitch images together. + + Args: + images (Image): Images to stitch. + + Returns: + Image: Stitched image. + """ + new_image = Image.new( + "RGB", + ( + sum(image.width for image in images), + max(image.height for image in images), + ), + ) + x_offset = 0 + for image in images: + new_image.paste(image, (x_offset, 0)) + x_offset += image.width + return new_image From ed0d5bc60a4e067e6d7b65eafe813a63848785ff Mon Sep 17 00:00:00 2001 From: Ashwin Vaidya Date: Thu, 14 Nov 2024 11:59:29 +0100 Subject: [PATCH 3/5] bug-fix + add classification Signed-off-by: Ashwin Vaidya --- model_api/python/model_api/models/result_types.py | 6 +++++- .../python/model_api/visualizer/visualize_mixin.py | 11 ++++++----- model_api/python/model_api/visualizer/visualizer.py | 13 +++++++------ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/model_api/python/model_api/models/result_types.py b/model_api/python/model_api/models/result_types.py index 3c848712..8eda2144 100644 --- a/model_api/python/model_api/models/result_types.py +++ b/model_api/python/model_api/models/result_types.py @@ -30,6 +30,7 @@ def __init__( pred_mask: np.ndarray | None = None, pred_score: float | None = None, ) -> None: + super().__init__() self.anomaly_map = anomaly_map self.pred_boxes = pred_boxes self.pred_label = pred_label @@ -73,6 +74,7 @@ def __init__( feature_vector: np.ndarray | None = None, raw_scores: np.ndarray | None = None, ) -> None: + super().__init__() self.top_labels = top_labels self.saliency_map = saliency_map self.feature_vector = feature_vector @@ -87,7 +89,9 @@ def __str__(self) -> str: ) def _register_primitives(self) -> None: - pass + # TODO add saliency map + for idx, label, confidence in self.top_labels: + self._add_primitive(Label(f"Rank: {idx}, {label}: {confidence:.3f}")) class Detection: diff --git a/model_api/python/model_api/visualizer/visualize_mixin.py b/model_api/python/model_api/visualizer/visualize_mixin.py index 34d38b4f..8e56a5a1 100644 --- a/model_api/python/model_api/visualizer/visualize_mixin.py +++ b/model_api/python/model_api/visualizer/visualize_mixin.py @@ -11,11 +11,12 @@ class VisualizeMixin(ABC): """Mixin for visualization.""" - _labels: list[Label] = [] - _polygons: list[Polygon] = [] - _overlays: list[Overlay] = [] - _bounding_boxes: list[BoundingBoxes] = [] - _registered_primitives: bool = False + def __init__(self) -> None: + self._labels = [] + self._polygons = [] + self._overlays = [] + self._bounding_boxes = [] + self._registered_primitives = False @abstractmethod def _register_primitives(self) -> None: diff --git a/model_api/python/model_api/visualizer/visualizer.py b/model_api/python/model_api/visualizer/visualizer.py index fdd2992f..1db44bb9 100644 --- a/model_api/python/model_api/visualizer/visualizer.py +++ b/model_api/python/model_api/visualizer/visualizer.py @@ -57,30 +57,31 @@ def _generate(self, image: Image, result: VisualizeMixin, visualization_type: Vi def _generate_simple(self, image: Image, result: VisualizeMixin) -> Image: """Return a single image with stacked visualizations.""" # 1. Use Overlay + _image = image.copy() if result.has_overlays: overlays = result.get_overlays() for overlay in overlays: - image = overlay.compute(image) + image = overlay.compute(_image) elif result.has_polygons: # 2. else use polygons polygons = result.get_polygons() for polygon in polygons: - image = polygon.compute(image) + image = polygon.compute(_image) elif result.has_bounding_boxes: # 3. else use bounding boxes bounding_boxes = result.get_bounding_boxes() for bounding_box in bounding_boxes: - image = bounding_box.compute(image) + image = bounding_box.compute(_image) # Finally add labels if result.has_labels: labels = result.get_labels() label_images = [] for label in labels: - label_images.append(label.compute(image, overlay_on_image=False)) - image = Label.overlay_labels(image, label_images) + label_images.append(label.compute(_image, overlay_on_image=False)) + _image = Label.overlay_labels(_image, label_images) - return image + return _image def _generate_full(self, image: Image, result: VisualizeMixin) -> Image: """Return a single image with visualizations side by side.""" From 2d0849ea17a7f79a75f2acd8366c81aa715be59d Mon Sep 17 00:00:00 2001 From: Ashwin Vaidya Date: Mon, 18 Nov 2024 12:53:47 +0100 Subject: [PATCH 4/5] restore visualization changes Signed-off-by: Ashwin Vaidya --- .../model_api/models/result_types/anomaly.py | 19 ++++++++++++++++++- .../model_api/models/result_types/base.py | 12 ++++++++++++ .../models/result_types/classification.py | 10 +++++++++- .../python/model_api/visualizer/visualizer.py | 13 ++++++++----- 4 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 model_api/python/model_api/models/result_types/base.py diff --git a/model_api/python/model_api/models/result_types/anomaly.py b/model_api/python/model_api/models/result_types/anomaly.py index 61f5384c..d657f8f1 100644 --- a/model_api/python/model_api/models/result_types/anomaly.py +++ b/model_api/python/model_api/models/result_types/anomaly.py @@ -5,10 +5,15 @@ from __future__ import annotations +import cv2 import numpy as np +from model_api.visualizer.primitives import BoundingBoxes, Label, Overlay, Polygon -class AnomalyResult: +from .base import Result + + +class AnomalyResult(Result): """Results for anomaly models.""" def __init__( @@ -19,6 +24,7 @@ def __init__( pred_mask: np.ndarray | None = None, pred_score: float | None = None, ) -> None: + super().__init__() self.anomaly_map = anomaly_map self.pred_boxes = pred_boxes self.pred_label = pred_label @@ -40,3 +46,14 @@ def __str__(self) -> str: f"pred_label:{self.pred_label};" f"pred_mask min:{pred_mask_min} max:{pred_mask_max};" ) + + def _register_primitives(self) -> None: + """Converts the result to primitives.""" + anomaly_map = cv2.applyColorMap(self.anomaly_map, cv2.COLORMAP_JET) + self._add_primitive(Overlay(anomaly_map)) + for box in self.pred_boxes: + self._add_primitive(BoundingBoxes(*box)) + if self.pred_label is not None: + self._add_primitive(Label(self.pred_label, bg_color="red" if self.pred_label == "Anomaly" else "green")) + self._add_primitive(Label(f"Score: {self.pred_score}")) + self._add_primitive(Polygon(mask=self.pred_mask)) diff --git a/model_api/python/model_api/models/result_types/base.py b/model_api/python/model_api/models/result_types/base.py new file mode 100644 index 00000000..0f9d0e1e --- /dev/null +++ b/model_api/python/model_api/models/result_types/base.py @@ -0,0 +1,12 @@ +"""Base result type""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC + +from model_api.visualizer.visualize_mixin import VisualizeMixin + + +class Result(VisualizeMixin, ABC): + """Base result type.""" diff --git a/model_api/python/model_api/models/result_types/classification.py b/model_api/python/model_api/models/result_types/classification.py index 37fa3c33..eb1e3e1c 100644 --- a/model_api/python/model_api/models/result_types/classification.py +++ b/model_api/python/model_api/models/result_types/classification.py @@ -7,13 +7,16 @@ from typing import TYPE_CHECKING +from model_api.visualizer.primitives import Label + +from .base import Result from .utils import array_shape_to_str if TYPE_CHECKING: import numpy as np -class ClassificationResult: +class ClassificationResult(Result): """Results for classification models.""" def __init__( @@ -35,3 +38,8 @@ def __str__(self) -> str: f"{labels}, {array_shape_to_str(self.saliency_map)}, {array_shape_to_str(self.feature_vector)}, " f"{array_shape_to_str(self.raw_scores)}" ) + + def _register_primitives(self) -> None: + # TODO add saliency map + for idx, label, confidence in self.top_labels: + self._add_primitive(Label(f"Rank: {idx}, {label}: {confidence:.3f}")) diff --git a/model_api/python/model_api/visualizer/visualizer.py b/model_api/python/model_api/visualizer/visualizer.py index 1db44bb9..ba27ef3c 100644 --- a/model_api/python/model_api/visualizer/visualizer.py +++ b/model_api/python/model_api/visualizer/visualizer.py @@ -6,11 +6,14 @@ from __future__ import annotations from enum import Enum +from typing import TYPE_CHECKING from PIL import Image from model_api.visualizer.primitives import Label -from model_api.visualizer.visualize_mixin import VisualizeMixin + +if TYPE_CHECKING: + from model_api.visualizer.visualize_mixin import VisualizeMixin class VisualizationType(Enum): @@ -47,12 +50,12 @@ def save( result.save(path) def _generate(self, image: Image, result: VisualizeMixin, visualization_type: VisualizationType) -> Image: - result: Image + _result: Image if visualization_type == VisualizationType.SIMPLE: - result = self._generate_simple(image, result) + _result = self._generate_simple(image, result) else: - result = self._generate_full(image, result) - return result + _result = self._generate_full(image, result) + return _result def _generate_simple(self, image: Image, result: VisualizeMixin) -> Image: """Return a single image with stacked visualizations.""" From 67c3a60f22179c81569f4f81709a679544676b33 Mon Sep 17 00:00:00 2001 From: Ashwin Vaidya Date: Wed, 20 Nov 2024 16:41:23 +0100 Subject: [PATCH 5/5] Add PoC 2 Signed-off-by: Ashwin Vaidya --- .../model_api/models/result_types/anomaly.py | 9 ++ .../python/model_api/visualizer/layout.py | 85 +++++++++++++ .../model_api/visualizer/visualize_mixin.py | 72 +++++------ .../python/model_api/visualizer/visualizer.py | 114 ++---------------- 4 files changed, 136 insertions(+), 144 deletions(-) create mode 100644 model_api/python/model_api/visualizer/layout.py diff --git a/model_api/python/model_api/models/result_types/anomaly.py b/model_api/python/model_api/models/result_types/anomaly.py index d657f8f1..524eb3cd 100644 --- a/model_api/python/model_api/models/result_types/anomaly.py +++ b/model_api/python/model_api/models/result_types/anomaly.py @@ -8,6 +8,7 @@ import cv2 import numpy as np +from model_api.visualizer.layout import Flatten, Layout from model_api.visualizer.primitives import BoundingBoxes, Label, Overlay, Polygon from .base import Result @@ -57,3 +58,11 @@ def _register_primitives(self) -> None: self._add_primitive(Label(self.pred_label, bg_color="red" if self.pred_label == "Anomaly" else "green")) self._add_primitive(Label(f"Score: {self.pred_score}")) self._add_primitive(Polygon(mask=self.pred_mask)) + + @property + def default_layout(self) -> Layout: + return Flatten( + Overlay, + Polygon, + Label, + ) diff --git a/model_api/python/model_api/visualizer/layout.py b/model_api/python/model_api/visualizer/layout.py new file mode 100644 index 00000000..5a931f14 --- /dev/null +++ b/model_api/python/model_api/visualizer/layout.py @@ -0,0 +1,85 @@ +"""Visualization Layout""" + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from abc import ABC +from typing import TYPE_CHECKING, Type + +from PIL import Image + +if TYPE_CHECKING: + from model_api.visualizer.primitives import Primitive + + from .visualize_mixin import VisualizeMixin + + +class Layout(ABC): + """Base class for layouts.""" + + def _compute_on_primitive(self, primitive: Primitive, image: Image, result: VisualizeMixin) -> Image | None: + if result.has_primitive(primitive): + primitives = result.get_primitive(primitive) + for primitive in primitives: + image = primitive.compute(image) + return image + return None + + +class HStack(Layout): + """Horizontal stack layout.""" + + def __init__(self, *args: Layout | Type[Primitive]) -> None: + self.children = args + + def __call__(self, image: Image, result: VisualizeMixin) -> Image: + images: list[Image] = [] + for child in self.children: + if isinstance(child, Layout): + images.append(child(image, result)) + else: + _image = image.copy() + _image = self._compute_on_primitive(child, _image, result) + if _image is not None: + images.append(_image) + return self._stitch(*images) + + def _stitch(self, *images: Image) -> Image: + """Stitch images together. + + Args: + images (Image): Images to stitch. + + Returns: + Image: Stitched image. + """ + new_image = Image.new( + "RGB", + ( + sum(image.width for image in images), + max(image.height for image in images), + ), + ) + x_offset = 0 + for image in images: + new_image.paste(image, (x_offset, 0)) + x_offset += image.width + return new_image + + +class VStack(Layout): + """Vertical stack layout.""" + + +class Flatten(Layout): + """Put all primitives on top of each other""" + + def __init__(self, *args: Type[Primitive]) -> None: + self.children = args + + def __call__(self, image: Image, result: VisualizeMixin) -> Image: + _image: Image = image.copy() + for child in self.children: + _image = self._compute_on_primitive(child, _image, result) + return _image diff --git a/model_api/python/model_api/visualizer/visualize_mixin.py b/model_api/python/model_api/visualizer/visualize_mixin.py index 8e56a5a1..844c23a5 100644 --- a/model_api/python/model_api/visualizer/visualize_mixin.py +++ b/model_api/python/model_api/visualizer/visualize_mixin.py @@ -4,7 +4,9 @@ # SPDX-License-Identifier: Apache-2.0 from abc import ABC, abstractmethod +from typing import Type +from .layout import Layout from .primitives import BoundingBoxes, Label, Overlay, Polygon, Primitive @@ -22,6 +24,11 @@ def __init__(self) -> None: def _register_primitives(self) -> None: """Convert result entities to primitives.""" + @property + @abstractmethod + def default_layout(self) -> Layout: + """Default layout.""" + def _add_primitive(self, primitive: Primitive) -> None: """Add primitive.""" if isinstance(primitive, Label): @@ -33,49 +40,32 @@ def _add_primitive(self, primitive: Primitive) -> None: elif isinstance(primitive, BoundingBoxes): self._bounding_boxes.append(primitive) - @property - def has_labels(self) -> bool: - """Check if there are labels.""" - self._register_primitives_if_needed() - return bool(self._labels) - - @property - def has_bounding_boxes(self) -> bool: - """Check if there are bounding boxes.""" - self._register_primitives_if_needed() - return bool(self._bounding_boxes) - - @property - def has_polygons(self) -> bool: - """Check if there are polygons.""" - self._register_primitives_if_needed() - return bool(self._polygons) - - @property - def has_overlays(self) -> bool: - """Check if there are overlays.""" - self._register_primitives_if_needed() - return bool(self._overlays) - - def get_labels(self) -> list[Label]: - """Get labels.""" + def has_primitive(self, primitive: Type[Primitive]) -> bool: + """Check if the primitive type is registered.""" self._register_primitives_if_needed() - return self._labels - - def get_polygons(self) -> list[Polygon]: - """Get polygons.""" - self._register_primitives_if_needed() - return self._polygons - - def get_overlays(self) -> list[Overlay]: - """Get overlays.""" - self._register_primitives_if_needed() - return self._overlays - - def get_bounding_boxes(self) -> list[BoundingBoxes]: - """Get bounding boxes.""" + if primitive == Label: + return bool(self._labels) + if primitive == Polygon: + return bool(self._polygons) + if primitive == Overlay: + return bool(self._overlays) + if primitive == BoundingBoxes: + return bool(self._bounding_boxes) + return False + + def get_primitive(self, primitive: Type[Primitive]) -> Primitive: + """Get primitive.""" self._register_primitives_if_needed() - return self._bounding_boxes + if primitive == Label: + return self._labels + if primitive == Polygon: + return self._polygons + if primitive == Overlay: + return self._overlays + if primitive == BoundingBoxes: + return self._bounding_boxes + msg = f"Primitive {primitive} not found" + raise ValueError(msg) def _register_primitives_if_needed(self): if not self._registered_primitives: diff --git a/model_api/python/model_api/visualizer/visualizer.py b/model_api/python/model_api/visualizer/visualizer.py index ba27ef3c..bf8984db 100644 --- a/model_api/python/model_api/visualizer/visualizer.py +++ b/model_api/python/model_api/visualizer/visualizer.py @@ -8,34 +8,26 @@ from enum import Enum from typing import TYPE_CHECKING -from PIL import Image - from model_api.visualizer.primitives import Label if TYPE_CHECKING: - from model_api.visualizer.visualize_mixin import VisualizeMixin + from PIL import Image + from model_api.visualizer.visualize_mixin import VisualizeMixin -class VisualizationType(Enum): - """Visualization type.""" - - FULL = "full" - SIMPLE = "simple" + from .layout import Layout class Visualizer: - def __init__(self) -> None: - # TODO: add transforms for the source image so that it has the same crop, and size as the model. - pass + def __init__(self, layout: Layout | None = None) -> None: + self.layout = layout def show( self, image: Image, result: VisualizeMixin, - visualization_type: VisualizationType | str = VisualizationType.SIMPLE, ) -> None: - visualization_type = VisualizationType(visualization_type) - result: Image = self._generate(image, result, visualization_type) + result: Image = self._generate(image, result) result.show() def save( @@ -43,95 +35,11 @@ def save( image: Image, result: VisualizeMixin, path: str, - visualization_type: VisualizationType | str = VisualizationType.SIMPLE, ) -> None: - visualization_type = VisualizationType(visualization_type) - result: Image = self._generate(image, result, visualization_type) + result: Image = self._generate(image, result) result.save(path) - def _generate(self, image: Image, result: VisualizeMixin, visualization_type: VisualizationType) -> Image: - _result: Image - if visualization_type == VisualizationType.SIMPLE: - _result = self._generate_simple(image, result) - else: - _result = self._generate_full(image, result) - return _result - - def _generate_simple(self, image: Image, result: VisualizeMixin) -> Image: - """Return a single image with stacked visualizations.""" - # 1. Use Overlay - _image = image.copy() - if result.has_overlays: - overlays = result.get_overlays() - for overlay in overlays: - image = overlay.compute(_image) - - elif result.has_polygons: # 2. else use polygons - polygons = result.get_polygons() - for polygon in polygons: - image = polygon.compute(_image) - - elif result.has_bounding_boxes: # 3. else use bounding boxes - bounding_boxes = result.get_bounding_boxes() - for bounding_box in bounding_boxes: - image = bounding_box.compute(_image) - - # Finally add labels - if result.has_labels: - labels = result.get_labels() - label_images = [] - for label in labels: - label_images.append(label.compute(_image, overlay_on_image=False)) - _image = Label.overlay_labels(_image, label_images) - - return _image - - def _generate_full(self, image: Image, result: VisualizeMixin) -> Image: - """Return a single image with visualizations side by side.""" - images: list[Image] = [image] - - if result.has_overlays: - overlays = result.get_overlays() - _image = image.copy() - for overlay in overlays: - _image = overlay.compute(_image) - images.append(_image) - if result.has_polygons: - polygons = result.get_polygons() - _image = image.copy() - for polygon in polygons: - _image = polygon.compute(_image) - images.append(_image) - if result.has_bounding_boxes: - bounding_boxes = result.get_bounding_boxes() - _image = image.copy() - for bounding_box in bounding_boxes: - _image = bounding_box.compute(_image) - images.append(_image) - if result.has_labels: - labels = result.get_labels() - for label in labels: - images.append(label.compute(image.copy(), overlay_on_image=True)) - return self._stitch(*images) - - def _stitch(self, *images: Image) -> Image: - """Stitch images together. - - Args: - images (Image): Images to stitch. - - Returns: - Image: Stitched image. - """ - new_image = Image.new( - "RGB", - ( - sum(image.width for image in images), - max(image.height for image in images), - ), - ) - x_offset = 0 - for image in images: - new_image.paste(image, (x_offset, 0)) - x_offset += image.width - return new_image + def _generate(self, image: Image, result: VisualizeMixin) -> Image: + if self.layout is not None: + return self.layout(image, result) + return result.default_layout(image, result)