diff --git a/model_api/python/model_api/models/anomaly.py b/model_api/python/model_api/models/anomaly.py index a8529d89..f971fb08 100644 --- a/model_api/python/model_api/models/anomaly.py +++ b/model_api/python/model_api/models/anomaly.py @@ -15,10 +15,9 @@ import numpy as np from model_api.adapters.inference_adapter import InferenceAdapter - -from .image_model import ImageModel -from .result_types import AnomalyResult -from .types import ListValue, NumericalValue, StringValue +from model_api.models.image_model import ImageModel +from model_api.models.result_types import AnomalyResult +from model_api.models.types import ListValue, NumericalValue, StringValue class AnomalyDetection(ImageModel): diff --git a/model_api/python/model_api/models/classification.py b/model_api/python/model_api/models/classification.py index a3f2eb80..0b62afbf 100644 --- a/model_api/python/model_api/models/classification.py +++ b/model_api/python/model_api/models/classification.py @@ -9,23 +9,24 @@ import json from collections import defaultdict from pathlib import Path +from typing import List, Tuple import numpy as np +from numpy import float32 from openvino.preprocess import PrePostProcessor from openvino.runtime import Model, Type from openvino.runtime import opset10 as opset from model_api.adapters.inference_adapter import InferenceAdapter - -from .image_model import ImageModel -from .result_types import ClassificationResult -from .types import BooleanValue, ListValue, NumericalValue, StringValue +from model_api.models.image_model import ImageModel +from model_api.models.result_types import ClassificationResult +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue class ClassificationModel(ImageModel): __model__ = "Classification" - def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload=False) self.topk: int self.labels: list[str] @@ -102,7 +103,7 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} if preload: self.load() - def _load_labels(self, labels_file): + def _load_labels(self, labels_file: str) -> list: with Path(labels_file).open() as f: labels = [] for s in f: @@ -113,7 +114,7 @@ def _load_labels(self, labels_file): labels.append(s[(begin_idx + 1) : end_idx]) return labels - def _verify_single_output(self): + def _verify_single_output(self) -> None: layer_name = next(iter(self.outputs)) layer_shape = self.outputs[layer_name].shape @@ -137,7 +138,7 @@ def _verify_single_output(self): ) @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -180,7 +181,8 @@ def parameters(cls): ) return parameters - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> ClassificationResult: + del meta # unused if self.multilabel: result = self.get_multilabel_predictions( outputs[self.out_layer_names[0]].squeeze(), @@ -220,7 +222,7 @@ def get_saliency_maps(self, outputs: dict) -> np.ndarray: reordered_saliency_maps[batch].append(saliency_maps[batch][idx]) return np.array(reordered_saliency_maps) - def get_all_probs(self, logits: np.ndarray): + def get_all_probs(self, logits: np.ndarray) -> np.ndarray: if self.multilabel: probs = sigmoid_numpy(logits.reshape(-1)) elif self.hierarchical: @@ -266,10 +268,10 @@ def get_hierarchical_predictions(self, logits: np.ndarray): predicted_labels.append(label_str) predicted_scores.append(head_logits[i]) - predictions = zip(predicted_labels, predicted_scores) + predictions = list(zip(predicted_labels, predicted_scores)) return self.labels_resolver.resolve_labels(predictions) - def get_multilabel_predictions(self, logits: np.ndarray): + def get_multilabel_predictions(self, logits: np.ndarray) -> List[Tuple[int, str, float32]]: logits = sigmoid_numpy(logits) scores = [] indices = [] @@ -281,7 +283,7 @@ def get_multilabel_predictions(self, logits: np.ndarray): return list(zip(indices, labels, scores)) - def get_multiclass_predictions(self, outputs): + def get_multiclass_predictions(self, outputs: dict) -> list[tuple[int, str, float]]: if self.embedded_topk: indicesTensor = outputs[self.out_layer_names[0]][0] scoresTensor = outputs[self.out_layer_names[1]][0] @@ -293,7 +295,7 @@ def get_multiclass_predictions(self, outputs): return list(zip(indicesTensor, labels, scoresTensor)) -def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: int, output_raw_scores: bool): +def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: int, output_raw_scores: bool) -> None: softmaxNode = None for i in range(len(inference_adapter.model.outputs)): output_node = inference_adapter.model.get_output_op(i).input(0).get_source_output().get_node() @@ -345,17 +347,17 @@ def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: in inference_adapter.model = ppp.build() -def sigmoid_numpy(x: np.ndarray): +def sigmoid_numpy(x: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-x)) -def softmax_numpy(x: np.ndarray, eps: float = 1e-9): +def softmax_numpy(x: np.ndarray, eps: float = 1e-9) -> np.ndarray: x = np.exp(x - np.max(x)) return x / (np.sum(x) + eps) class GreedyLabelsResolver: - def __init__(self, hierarchical_config) -> None: + def __init__(self, hierarchical_config: dict) -> None: self.label_to_idx = hierarchical_config["cls_heads_info"]["label_to_idx"] self.label_relations = hierarchical_config["label_tree_edges"] self.label_groups = hierarchical_config["cls_heads_info"]["all_groups"] @@ -364,7 +366,7 @@ def __init__(self, hierarchical_config) -> None: for child, parent in self.label_relations: self.label_tree.add_edge(parent, child) - def resolve_labels(self, predictions): + def resolve_labels(self, predictions: list[tuple]) -> list: """Resolves hierarchical labels and exclusivity based on a list of ScoredLabels (labels with probability). The following two steps are taken: - select the most likely label from each label group @@ -374,7 +376,7 @@ def resolve_labels(self, predictions): predictions: a list of tuples (label name, score) """ - def get_predecessors(lbl, candidates): + def get_predecessors(lbl: str, candidates: list[str]) -> list: """Return all predecessors. Returns all the predecessors of the input label or an empty list if one of the predecessors is not a @@ -422,12 +424,12 @@ def get_predecessors(lbl, candidates): class ProbabilisticLabelsResolver(GreedyLabelsResolver): - def __init__(self, hierarchical_config, warmup_cache=True) -> None: + def __init__(self, hierarchical_config: dict, warmup_cache: bool = True) -> None: super().__init__(hierarchical_config) if warmup_cache: self.label_tree.get_labels_in_topological_order() - def resolve_labels(self, predictions): + def resolve_labels(self, predictions: list[tuple[str, float]]) -> list[tuple[int, str, float]]: """Resolves hierarchical labels and exclusivity based on a list of ScoredLabels (labels with probability). The following two steps are taken: @@ -446,8 +448,8 @@ def resolve_labels(self, predictions): def __resolve_labels_probabilistic( self, - label_to_probability, - ): + label_to_probability: dict[str, float], + ) -> list[tuple[int, str, float]]: """Resolves hierarchical labels and exclusivity based on a probabilistic label output. - selects the most likely (max) label from an exclusive group @@ -546,24 +548,24 @@ class SimpleLabelsGraph: like adding edges, getting children and parents. """ - def __init__(self, vertices): + def __init__(self, vertices: list[str]) -> None: self._v = vertices - self._adj = defaultdict(list) - self._topological_order_cache = None - self._parents_map = {} + self._adj: dict[str, list] = {v: [] for v in vertices} + self._topological_order_cache: list | None = None + self._parents_map: dict[str, str] = {} - def add_edge(self, parent, child): + def add_edge(self, parent: str, child: str) -> None: self._adj[parent].append(child) self._parents_map[child] = parent self.clear_topological_cache() - def get_children(self, label): + def get_children(self, label: str) -> list: return self._adj[label] - def get_parent(self, label): + def get_parent(self, label: str) -> str | None: return self._parents_map.get(label, None) - def get_ancestors(self, label): + def get_ancestors(self, label: str) -> list[str]: """Returns all the ancestors of the input label, including self.""" predecessors = [label] last_parent = self.get_parent(label) @@ -576,14 +578,14 @@ def get_ancestors(self, label): return predecessors - def get_labels_in_topological_order(self): + def get_labels_in_topological_order(self) -> list: if self._topological_order_cache is None: self._topological_order_cache = self.topological_sort() return self._topological_order_cache - def topological_sort(self): - in_degree = defaultdict(int) + def topological_sort(self) -> list: + in_degree: dict[str, int] = dict.fromkeys(self._v, 0) for node_adj in self._adj.values(): for j in node_adj: @@ -610,7 +612,7 @@ def topological_sort(self): return ordered - def clear_topological_cache(self): + def clear_topological_cache(self) -> None: self._topological_order_cache = None @@ -619,7 +621,7 @@ def clear_topological_cache(self): _raw_scores_name = "raw_scores" -def _get_non_xai_names(output_names): +def _get_non_xai_names(output_names: list[str]) -> list[str]: return [ output_name for output_name in output_names @@ -627,7 +629,7 @@ def _get_non_xai_names(output_names): ] -def _append_xai_names(outputs, output_names): +def _append_xai_names(outputs: dict, output_names: list[str]) -> None: if _saliency_map_name in outputs: output_names.append(_saliency_map_name) if _feature_vector_name in outputs: diff --git a/model_api/python/model_api/models/image_model.py b/model_api/python/model_api/models/image_model.py index f7ed1f9a..f6817c89 100644 --- a/model_api/python/model_api/models/image_model.py +++ b/model_api/python/model_api/models/image_model.py @@ -3,10 +3,11 @@ # SPDX-License-Identifier: Apache-2.0 # -from model_api.adapters.utils import RESIZE_TYPES, InputTransform +import numpy as np -from .model import Model -from .types import BooleanValue, ListValue, NumericalValue, StringValue +from model_api.adapters.utils import RESIZE_TYPES, InputTransform +from model_api.models.model import Model +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue class ImageModel(Model): @@ -89,7 +90,7 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): self.orig_height, self.orig_width = self.h, self.w @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -169,7 +170,7 @@ def _get_inputs(self): ) return image_blob_names, image_info_blob_names - def preprocess(self, inputs): + def preprocess(self, inputs) -> list[dict]: """Data preprocess method It performs basic preprocessing of a single image: @@ -194,10 +195,13 @@ def preprocess(self, inputs): } - the input metadata, which might be used in `postprocess` method """ - return {self.image_blob_name: inputs[None]}, { - "original_shape": inputs.shape, - "resized_shape": (self.w, self.h, self.c), - } + return [ + {self.image_blob_name: inputs[None]}, + { + "original_shape": inputs.shape, + "resized_shape": (self.w, self.h, self.c), + }, + ] def _change_layout(self, image): """Changes the input image layout to fit the layout of the model input layer. diff --git a/model_api/python/model_api/models/instance_segmentation.py b/model_api/python/model_api/models/instance_segmentation.py index ff39c55b..621c4d61 100644 --- a/model_api/python/model_api/models/instance_segmentation.py +++ b/model_api/python/model_api/models/instance_segmentation.py @@ -6,6 +6,8 @@ import cv2 import numpy as np +from model_api.adapters.inference_adapter import InferenceAdapter + from .image_model import ImageModel from .result_types import InstanceSegmentationResult, SegmentedObject from .types import BooleanValue, ListValue, NumericalValue, StringValue @@ -15,17 +17,22 @@ class MaskRCNNModel(ImageModel): __model__ = "MaskRCNN" - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload) self._check_io_number((1, 2), (3, 4, 5, 6, 8)) + + self.confidence_threshold: float + self.labels: list[str] self.path_to_labels: str + self.postprocess_semantic_masks: bool + if self.path_to_labels: self.labels = load_labels(self.path_to_labels) self.is_segmentoly = len(self.inputs) == 2 self.output_blob_name = self._get_outputs() @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -45,7 +52,7 @@ def parameters(cls): ) return parameters - def _get_outputs(self): # noqa: C901 TODO: Fix this method to reduce complexity + def _get_outputs(self) -> dict: # noqa: C901 TODO: Fix this method to reduce complexity if self.is_segmentoly: return self._get_segmentoly_outputs() filtered_names = [] @@ -84,7 +91,7 @@ def _get_outputs(self): # noqa: C901 TODO: Fix this method to reduce complexity return outputs return self.raise_error(f"Unexpected outputs: {self.outputs}") - def _get_segmentoly_outputs(self): + def _get_segmentoly_outputs(self) -> dict: outputs = {} for layer_name in self.outputs: layer_shape = self.outputs[layer_name].shape @@ -102,7 +109,7 @@ def _get_segmentoly_outputs(self): ) return outputs - def preprocess(self, inputs): + def preprocess(self, inputs: np.ndarray) -> list[dict]: dict_inputs, meta = super().preprocess(inputs) input_image_size = meta["resized_shape"][:2] if self.is_segmentoly: @@ -112,9 +119,9 @@ def preprocess(self, inputs): dtype=np.float32, ) dict_inputs[self.image_info_blob_names[0]] = input_image_info - return dict_inputs, meta + return [dict_inputs, meta] - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: if ( outputs[self.output_blob_name["labels"]].ndim == 2 and outputs[self.output_blob_name["boxes"]].ndim == 3 @@ -174,7 +181,7 @@ def postprocess(self, outputs, meta): if has_feature_vector_name: if not self.labels: self.raise_error("Can't get number of classes because labels are empty") - saliency_maps = [[] for _ in range(len(self.labels))] + saliency_maps: list = [[] for _ in range(len(self.labels))] else: saliency_maps = [] for box, confidence, cls, raw_mask in zip(boxes, scores, labels, masks): @@ -200,13 +207,17 @@ def postprocess(self, outputs, meta): resized_mask = raw_cls_mask if confidence > self.confidence_threshold: output_mask = resized_mask if self.postprocess_semantic_masks else raw_cls_mask + xmin, ymin, xmax, ymax = box.astype(int) objects.append( SegmentedObject( - *box.astype(int), - confidence, - cls, - str_label, - output_mask, + xmin, + ymin, + xmax, + ymax, + score=confidence, + id=cls, + str_label=str_label, + mask=output_mask, ), ) if has_feature_vector_name and confidence > self.confidence_threshold: @@ -218,7 +229,7 @@ def postprocess(self, outputs, meta): ) -def _average_and_normalize(saliency_maps): +def _average_and_normalize(saliency_maps: list) -> list: aggregated = [] for per_object_maps in saliency_maps: if per_object_maps: @@ -231,7 +242,7 @@ def _average_and_normalize(saliency_maps): return aggregated -def _expand_box(box, scale): +def _expand_box(box: np.ndarray, scale: float) -> np.ndarray: w_half = (box[2] - box[0]) * 0.5 * scale h_half = (box[3] - box[1]) * 0.5 * scale x_c = (box[2] + box[0]) * 0.5 @@ -244,7 +255,7 @@ def _expand_box(box, scale): return box_exp -def _segm_postprocess(box, raw_cls_mask, im_h, im_w): +def _segm_postprocess(box: np.ndarray, raw_cls_mask: np.ndarray, im_h: int, im_w: int) -> np.ndarray: # Add zero border to prevent upsampling artifacts on segment borders. raw_cls_mask = np.pad(raw_cls_mask, ((1, 1), (1, 1)), "constant", constant_values=0) extended_box = _expand_box( @@ -270,7 +281,7 @@ def _segm_postprocess(box, raw_cls_mask, im_h, im_w): _feature_vector_name = "feature_vector" -def _append_xai_names(outputs, output_names): +def _append_xai_names(outputs: dict, output_names: dict) -> None: if _saliency_map_name in outputs: output_names["saliency_map"] = _saliency_map_name if _feature_vector_name in outputs: diff --git a/model_api/python/model_api/models/sam_models.py b/model_api/python/model_api/models/sam_models.py index 06eeaf5f..b6d67a72 100644 --- a/model_api/python/model_api/models/sam_models.py +++ b/model_api/python/model_api/models/sam_models.py @@ -51,11 +51,11 @@ def parameters(cls) -> dict[str, Any]: def preprocess( self, inputs: np.ndarray, - ) -> tuple[dict[str, np.ndarray], dict[str, Any]]: + ) -> list[dict]: """Update meta for image encoder.""" dict_inputs, meta = super().preprocess(inputs) meta["resize_type"] = self.resize_type - return dict_inputs, meta + return [dict_inputs, meta] def postprocess( self, @@ -123,7 +123,7 @@ def parameters(cls) -> dict[str, Any]: def _get_outputs(self) -> str: return "upscaled_masks" - def preprocess(self, inputs: dict[str, Any]) -> list[dict[str, Any]]: + def preprocess(self, inputs: dict[str, Any]) -> list[dict]: """Preprocess prompts.""" processed_prompts: list[dict[str, Any]] = [] for prompt_name in ["bboxes", "points"]: diff --git a/model_api/python/model_api/models/segmentation.py b/model_api/python/model_api/models/segmentation.py index 5056b95c..1129ad06 100644 --- a/model_api/python/model_api/models/segmentation.py +++ b/model_api/python/model_api/models/segmentation.py @@ -10,10 +10,11 @@ import cv2 import numpy as np -from .image_model import ImageModel -from .result_types import Contour, ImageResultWithSoftPrediction -from .types import BooleanValue, ListValue, NumericalValue, StringValue -from .utils import load_labels +from model_api.adapters.inference_adapter import InferenceAdapter +from model_api.models.image_model import ImageModel +from model_api.models.result_types import Contour, ImageResultWithSoftPrediction +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue +from model_api.models.utils import load_labels def create_hard_prediction_from_soft_prediction( @@ -51,16 +52,20 @@ def create_hard_prediction_from_soft_prediction( class SegmentationModel(ImageModel): __model__ = "Segmentation" - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, (1, 2)) + self.labels: list[str] self.path_to_labels: str + self.blur_strength: int + self.soft_threshold: float + self.return_soft_prediction: bool if self.path_to_labels: self.labels = load_labels(self.path_to_labels) self.output_blob_name = self._get_outputs() - def _get_outputs(self): + def _get_outputs(self) -> str: out_name = "" for name, output in self.outputs.items(): if _feature_vector_name not in output.names: @@ -86,7 +91,7 @@ def _get_outputs(self): return out_name @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -115,7 +120,7 @@ def parameters(cls): ) return parameters - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> ImageResultWithSoftPrediction | cv2.Mat: input_image_height = meta["original_shape"][0] input_image_width = meta["original_shape"][1] predictions = outputs[self.output_blob_name].squeeze() @@ -204,7 +209,7 @@ def get_contours( class SalientObjectDetectionModel(SegmentationModel): __model__ = "Salient_Object_Detection" - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> cv2.Mat: input_image_height = meta["original_shape"][0] input_image_width = meta["original_shape"][1] result = outputs[self.output_blob_name].squeeze() @@ -221,7 +226,7 @@ def postprocess(self, outputs, meta): _feature_vector_name = "feature_vector" -def _get_activation_map(features: np.ndarray | Iterable | int | float): +def _get_activation_map(features: np.ndarray | Iterable | int | float) -> np.ndarray: """Getter activation_map functions.""" min_soft_score = np.min(features) max_soft_score = np.max(features) diff --git a/model_api/python/model_api/models/utils.py b/model_api/python/model_api/models/utils.py index 98d9e09d..9cc3fd74 100644 --- a/model_api/python/model_api/models/utils.py +++ b/model_api/python/model_api/models/utils.py @@ -10,7 +10,7 @@ import cv2 import numpy as np -from .result_types import Contour, SegmentedObject, SegmentedObjectWithRects +from model_api.models.result_types import Contour, SegmentedObject, SegmentedObjectWithRects def add_rotated_rects(segmented_objects):