diff --git a/model_api/python/model_api/adapters/inference_adapter.py b/model_api/python/model_api/adapters/inference_adapter.py index 464092a0..6cf1a87c 100644 --- a/model_api/python/model_api/adapters/inference_adapter.py +++ b/model_api/python/model_api/adapters/inference_adapter.py @@ -83,7 +83,7 @@ def reshape_model(self, new_shape): """ @abstractmethod - def infer_sync(self, dict_data): + def infer_sync(self, dict_data) -> dict: """Performs the synchronous model inference. The infer is a blocking method. Args: @@ -121,6 +121,22 @@ def infer_async(self, dict_data, callback_data): - callback_data: the data for callback, that will be taken after the model inference is ended """ + @abstractmethod + def get_raw_result(self, infer_result) -> dict: + """Gets raw results from the internal inference framework representation as a dict. + + Args: + - infer_result: framework-specific result of inference from the model + + Returns: + - raw result (dict) - model raw output in the following format: + { + 'output_layer_name_1': raw_result_1, + 'output_layer_name_2': raw_result_2, + ... + } + """ + @abstractmethod def is_ready(self): """In case of asynchronous execution checks if one can submit input data @@ -153,7 +169,7 @@ def embed_preprocessing( layout, resize_mode: str, interpolation_mode, - target_shape: tuple[int], + target_shape: tuple[int, ...], pad_value, dtype: type = int, brg2rgb=False, diff --git a/model_api/python/model_api/adapters/onnx_adapter.py b/model_api/python/model_api/adapters/onnx_adapter.py index 3ddec73b..67fe5367 100644 --- a/model_api/python/model_api/adapters/onnx_adapter.py +++ b/model_api/python/model_api/adapters/onnx_adapter.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: Apache-2.0 # +from __future__ import annotations # TODO: remove when Python3.9 support is dropped + import sys from functools import partial, reduce @@ -122,6 +124,9 @@ def await_all(self): def await_any(self): pass + def get_raw_result(self, infer_result): + pass + def embed_preprocessing( self, layout, diff --git a/model_api/python/model_api/adapters/openvino_adapter.py b/model_api/python/model_api/adapters/openvino_adapter.py index 74593a04..df09b495 100644 --- a/model_api/python/model_api/adapters/openvino_adapter.py +++ b/model_api/python/model_api/adapters/openvino_adapter.py @@ -3,8 +3,16 @@ # SPDX-License-Identifier: Apache-2.0 # +from __future__ import annotations # TODO: remove when Python3.9 support is dropped + import logging as log from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from os import PathLike + + from numpy import ndarray try: import openvino.runtime as ov @@ -12,6 +20,7 @@ AsyncInferQueue, Core, Dimension, + OVAny, PartialShape, Type, get_version, @@ -35,7 +44,7 @@ ) -def create_core(): +def create_core() -> Core: if openvino_absent: msg = "The OpenVINO package is not installed" raise ImportError(msg) @@ -45,7 +54,7 @@ def create_core(): return Core() -def parse_devices(device_string): +def parse_devices(device_string: str) -> list[str]: colon_position = device_string.find(":") if colon_position != -1: device_type = device_string[:colon_position] @@ -57,7 +66,7 @@ def parse_devices(device_string): if parenthesis_position != -1: device = device[:parenthesis_position] return devices - return (device_string,) + return [device_string] def parse_value_per_device(devices: set[str], values_string: str) -> dict[str, int]: @@ -82,7 +91,7 @@ def parse_value_per_device(devices: set[str], values_string: str) -> dict[str, i def get_user_config( flags_d: str, flags_nstreams: str, - flags_nthreads: int, + flags_nthreads: int | None = None, ) -> dict[str, str]: config = {} @@ -111,17 +120,17 @@ class OpenvinoAdapter(InferenceAdapter): def __init__( self, - core, - model, - weights_path="", - model_parameters={}, - device="CPU", - plugin_config=None, - max_num_requests=0, - precision="FP16", - download_dir=None, - cache_dir=None, - ): + core: Core, + model: str, + weights_path: PathLike | None = None, + model_parameters: dict[str, Any] = {}, + device: str = "CPU", + plugin_config: dict[str, Any] | None = None, + max_num_requests: int = 0, + precision: str = "FP16", + download_dir: PathLike | None = None, + cache_dir: PathLike | None = None, + ) -> None: """precision, download_dir and cache_dir are ignored if model is a path to a file""" self.core = core self.model_path = model @@ -179,7 +188,7 @@ def __init__( msg = "Model must be bytes, a file or existing OMZ model name" raise RuntimeError(msg) - def load_model(self): + def load_model(self) -> None: self.compiled_model = self.core.compile_model( self.model, self.device, @@ -201,7 +210,7 @@ def load_model(self): ) self.log_runtime_settings() - def log_runtime_settings(self): + def log_runtime_settings(self) -> None: devices = set(parse_devices(self.device)) if "AUTO" not in devices: for device in devices: @@ -222,7 +231,7 @@ def log_runtime_settings(self): pass log.info(f"\tNumber of model infer requests: {len(self.async_queue)}") - def get_input_layers(self): + def get_input_layers(self) -> dict[str, Metadata]: inputs = {} for input in self.model.inputs: input_shape = get_input_shape(input) @@ -235,7 +244,11 @@ def get_input_layers(self): ) return self._get_meta_from_ngraph(inputs) - def get_layout_for_input(self, input, shape=None) -> str: + def get_layout_for_input( + self, + input: ov.Output, + shape: list[int] | tuple[int, int, int, int] | None = None, + ) -> str: input_layout = "" if self.model_parameters["input_layouts"]: input_layout = Layout.from_user_layouts( @@ -251,7 +264,7 @@ def get_layout_for_input(self, input, shape=None) -> str: ) return input_layout - def get_output_layers(self): + def get_output_layers(self) -> dict[str, Metadata]: outputs = {} for i, output in enumerate(self.model.outputs): output_shape = output.partial_shape.get_min_shape() if self.model.is_dynamic() else output.shape @@ -273,13 +286,13 @@ def reshape_model(self, new_shape): } self.model.reshape(new_shape) - def get_raw_result(self, request): + def get_raw_result(self, request: ov.InferRequest) -> dict[str, ndarray]: return {key: request.get_tensor(key).data for key in self.get_output_layers()} def copy_raw_result(self, request): return {key: request.get_tensor(key).data.copy() for key in self.get_output_layers()} - def infer_sync(self, dict_data): + def infer_sync(self, dict_data: dict[str, ndarray]) -> dict[str, ndarray]: self.infer_request = self.async_queue[self.async_queue.get_idle_request_id()] self.infer_request.infer(dict_data) return self.get_raw_result(self.infer_request) @@ -299,7 +312,7 @@ def await_all(self) -> None: def await_any(self) -> None: self.async_queue.get_idle_request_id() - def _get_meta_from_ngraph(self, layers_info): + def _get_meta_from_ngraph(self, layers_info: dict[str, Metadata]) -> dict[str, Metadata]: for node in self.model.get_ordered_ops(): layer_name = node.get_friendly_name() if layer_name not in layers_info: @@ -319,24 +332,24 @@ def operations_by_type(self, operation_type): ) return layers_info - def get_rt_info(self, path): + def get_rt_info(self, path: list[str]) -> OVAny: if self.is_onnx_file: return get_rt_info_from_dict(self.onnx_metadata, path) return self.model.get_rt_info(path) def embed_preprocessing( self, - layout, + layout: str, resize_mode: str, - interpolation_mode, - target_shape: tuple[int], - pad_value, + interpolation_mode: str, + target_shape: tuple[int, ...], + pad_value: int, dtype: type = int, - brg2rgb=False, - mean=None, - scale=None, - input_idx=0, - ): + brg2rgb: bool = False, + mean: list[Any] | None = None, + scale: list[Any] | None = None, + input_idx: int = 0, + ) -> None: ppp = PrePostProcessor(self.model) # Change the input type to the 8-bit image @@ -371,7 +384,7 @@ def embed_preprocessing( ppp.input(input_idx).tensor().set_shape(input_shape) ppp.input(input_idx).preprocess().custom( RESIZE_MODE_MAP[resize_mode]( - target_shape, + (target_shape[0], target_shape[1]), INTERPOLATION_MODE_MAP[interpolation_mode], pad_value, ), @@ -407,7 +420,7 @@ def get_model(self): return self.model -def get_input_shape(input_tensor): +def get_input_shape(input_tensor: ov.Output) -> list[int]: def string_to_tuple(string, casting_type=int): processed = string.replace(" ", "").replace("(", "").replace(")", "").split(",") processed = filter(lambda x: x, processed) @@ -428,4 +441,4 @@ def string_to_tuple(string, casting_type=int): else: shape_list.append(int(dim)) return shape_list - return string_to_tuple(preprocessed) + return list(string_to_tuple(preprocessed)) diff --git a/model_api/python/model_api/adapters/ovms_adapter.py b/model_api/python/model_api/adapters/ovms_adapter.py index 77d11623..42c63f1c 100644 --- a/model_api/python/model_api/adapters/ovms_adapter.py +++ b/model_api/python/model_api/adapters/ovms_adapter.py @@ -97,6 +97,9 @@ def await_all(self): def await_any(self): pass + def get_raw_result(self, infer_result): + pass + def embed_preprocessing( self, layout, diff --git a/model_api/python/model_api/adapters/utils.py b/model_api/python/model_api/adapters/utils.py index fe403595..2c6f9d14 100644 --- a/model_api/python/model_api/adapters/utils.py +++ b/model_api/python/model_api/adapters/utils.py @@ -6,23 +6,26 @@ from __future__ import annotations # TODO: remove when Python3.9 support is dropped import math -from collections.abc import Callable from functools import partial +from typing import TYPE_CHECKING, Any import cv2 import numpy as np from openvino import Model, OVAny, Type, layout_helpers -from openvino.runtime import Output +from openvino.runtime import Input, Node, Output from openvino.runtime import opset10 as opset from openvino.runtime.utils.decorators import custom_preprocess_function +if TYPE_CHECKING: + from collections.abc import Callable + class Layout: - def __init__(self, layout="") -> None: + def __init__(self, layout: str = "") -> None: self.layout = layout @staticmethod - def from_shape(shape): + def from_shape(shape: list[int] | tuple[int, ...]) -> str: """Create Layout from given shape""" if len(shape) == 2: return "NC" @@ -37,7 +40,7 @@ def from_shape(shape): raise RuntimeError(msg) @staticmethod - def from_openvino(input): + def from_openvino(input: Input): """Create Layout from openvino input""" return layout_helpers.get_layout(input).to_string().strip("[]").replace(",", "") @@ -72,7 +75,7 @@ def parse_layouts(layout_string: str) -> dict | None: return user_layouts -def resize_image_letterbox_graph(input: Output, size, interpolation, pad_value): +def resize_image_letterbox_graph(input: Output, size: tuple[int, int], interpolation: str, pad_value: int) -> Node: if not isinstance(pad_value, int): msg = "pad_value must be int" raise RuntimeError(msg) @@ -154,7 +157,7 @@ def resize_image_letterbox_graph(input: Output, size, interpolation, pad_value): ) -def crop_resize_graph(input: Output, size): +def crop_resize_graph(input: Output, size: tuple[int, int]) -> Node: h_axis = 1 w_axis = 2 desired_aspect_ratio = size[1] / size[0] # width / height @@ -285,11 +288,11 @@ def crop_resize_graph(input: Output, size): def resize_image_graph( input: Output, - size, - keep_aspect_ratio, - interpolation, - pad_value, -): + size: tuple[int, int], + keep_aspect_ratio: bool, + interpolation: str, + pad_value: int, +) -> Node: if not isinstance(pad_value, int): msg = "pad_value must be int" raise RuntimeError(msg) @@ -362,7 +365,7 @@ def resize_image_graph( ) -def resize_image(size, interpolation, pad_value): +def resize_image(size: tuple[int, int], interpolation: str, pad_value: int) -> Callable: return custom_preprocess_function( partial( resize_image_graph, @@ -374,7 +377,7 @@ def resize_image(size, interpolation, pad_value): ) -def resize_image_with_aspect(size, interpolation, pad_value): +def resize_image_with_aspect(size: tuple[int, int], interpolation: str, pad_value: int) -> Callable: return custom_preprocess_function( partial( resize_image_graph, @@ -386,11 +389,11 @@ def resize_image_with_aspect(size, interpolation, pad_value): ) -def crop_resize(size, interpolation, pad_value): +def crop_resize(size: tuple[int, int], interpolation: str, pad_value: int) -> Callable: return custom_preprocess_function(partial(crop_resize_graph, size=size)) -def resize_image_letterbox(size, interpolation, pad_value): +def resize_image_letterbox(size: tuple[int, int], interpolation: str, pad_value: int) -> Callable: return custom_preprocess_function( partial( resize_image_letterbox_graph, @@ -401,8 +404,8 @@ def resize_image_letterbox(size, interpolation, pad_value): ) -def load_parameters_from_onnx(onnx_model): - parameters = {} +def load_parameters_from_onnx(onnx_model: Any) -> dict[str, Any]: + parameters: dict[str, Any] = {} def insert_hierarchical(keys, val, root_dict): if len(keys) == 1: @@ -420,7 +423,7 @@ def insert_hierarchical(keys, val, root_dict): return parameters -def get_rt_info_from_dict(rt_info_dict, path): +def get_rt_info_from_dict(rt_info_dict: dict[str, Any], path: list[str]) -> OVAny: value = rt_info_dict try: value = rt_info_dict @@ -433,13 +436,13 @@ def get_rt_info_from_dict(rt_info_dict, path): def resize_image_ocv( - image, - size, - keep_aspect_ratio=False, - is_pad=False, - pad_value=0, - interpolation=cv2.INTER_LINEAR, -): + image: np.ndarray, + size: tuple[int, int], + keep_aspect_ratio: bool = False, + is_pad: bool = False, + pad_value: int = 0, + interpolation: int = cv2.INTER_LINEAR, +) -> np.ndarray: if keep_aspect_ratio: h, w = image.shape[:2] scale = min(size[1] / h, size[0] / w) @@ -457,7 +460,11 @@ def resize_image_ocv( return cv2.resize(image, size, interpolation=interpolation) -def resize_image_with_aspect_ocv(image, size, interpolation=cv2.INTER_LINEAR): +def resize_image_with_aspect_ocv( + image: np.ndarray, + size: tuple[int, int], + interpolation: int = cv2.INTER_LINEAR, +) -> np.ndarray: return resize_image_ocv( image, size, @@ -469,11 +476,11 @@ def resize_image_with_aspect_ocv(image, size, interpolation=cv2.INTER_LINEAR): def resize_image_letterbox_ocv( - image, - size, - interpolation=cv2.INTER_LINEAR, - pad_value=0, -): + image: np.ndarray, + size: tuple[int, int], + interpolation: int = cv2.INTER_LINEAR, + pad_value: int = 0, +) -> np.ndarray: ih, iw = image.shape[0:2] w, h = size scale = min(w / iw, h / ih) @@ -490,7 +497,7 @@ def resize_image_letterbox_ocv( ) -def crop_resize_ocv(image, size): +def crop_resize_ocv(image: np.ndarray, size: tuple[int, int]) -> np.ndarray: desired_aspect_ratio = size[1] / size[0] # width / height if desired_aspect_ratio == 1: if image.shape[0] > image.shape[1]: @@ -519,7 +526,7 @@ def crop_resize_ocv(image, size): } -INTERPOLATION_TYPES = { +INTERPOLATION_TYPES: dict[str, int] = { "LINEAR": cv2.INTER_LINEAR, "CUBIC": cv2.INTER_CUBIC, "NEAREST": cv2.INTER_NEAREST, @@ -530,9 +537,9 @@ def crop_resize_ocv(image, size): class InputTransform: def __init__( self, - reverse_input_channels=False, - mean_values=None, - scale_values=None, + reverse_input_channels: bool = False, + mean_values: list[float] | None = None, + scale_values: list[float] | None = None, ): self.reverse_input_channels = reverse_input_channels self.is_trivial = not (reverse_input_channels or mean_values or scale_values) diff --git a/model_api/python/model_api/models/anomaly.py b/model_api/python/model_api/models/anomaly.py index b5ad9427..8968af30 100644 --- a/model_api/python/model_api/models/anomaly.py +++ b/model_api/python/model_api/models/anomaly.py @@ -9,16 +9,17 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any import cv2 import numpy as np -from model_api.adapters.inference_adapter import InferenceAdapter +from model_api.models.image_model import ImageModel +from model_api.models.result_types import AnomalyResult +from model_api.models.types import ListValue, NumericalValue, StringValue -from .image_model import ImageModel -from .result_types import AnomalyResult -from .types import ListValue, NumericalValue, StringValue +if TYPE_CHECKING: + from model_api.adapters.inference_adapter import InferenceAdapter class AnomalyDetection(ImageModel): diff --git a/model_api/python/model_api/models/classification.py b/model_api/python/model_api/models/classification.py index d238b614..9a291840 100644 --- a/model_api/python/model_api/models/classification.py +++ b/model_api/python/model_api/models/classification.py @@ -7,19 +7,22 @@ import copy import json -from collections import defaultdict from pathlib import Path +from typing import TYPE_CHECKING, List, Tuple import numpy as np +from numpy import float32 from openvino.preprocess import PrePostProcessor from openvino.runtime import Model, Type from openvino.runtime import opset10 as opset -from model_api.adapters.inference_adapter import InferenceAdapter +from model_api.models.image_model import ImageModel +from model_api.models.result_types import ClassificationResult +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue +from model_api.models.utils import softmax -from .image_model import ImageModel -from .result_types import ClassificationResult -from .types import BooleanValue, ListValue, NumericalValue, StringValue +if TYPE_CHECKING: + from model_api.adapters.inference_adapter import InferenceAdapter class ClassificationModel(ImageModel): @@ -46,7 +49,7 @@ class ClassificationModel(ImageModel): __model__ = "Classification" - def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload=False) self.topk: int self.labels: list[str] @@ -123,7 +126,7 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} if preload: self.load() - def _load_labels(self, labels_file): + def _load_labels(self, labels_file: str) -> list: with Path(labels_file).open() as f: labels = [] for s in f: @@ -134,7 +137,7 @@ def _load_labels(self, labels_file): labels.append(s[(begin_idx + 1) : end_idx]) return labels - def _verify_single_output(self): + def _verify_single_output(self) -> None: layer_name = next(iter(self.outputs)) layer_shape = self.outputs[layer_name].shape @@ -158,7 +161,7 @@ def _verify_single_output(self): ) @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -201,7 +204,8 @@ def parameters(cls): ) return parameters - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> ClassificationResult: + del meta # unused if self.multilabel: result = self.get_multilabel_predictions( outputs[self.out_layer_names[0]].squeeze(), @@ -241,7 +245,7 @@ def get_saliency_maps(self, outputs: dict) -> np.ndarray: reordered_saliency_maps[batch].append(saliency_maps[batch][idx]) return np.array(reordered_saliency_maps) - def get_all_probs(self, logits: np.ndarray): + def get_all_probs(self, logits: np.ndarray) -> np.ndarray: if self.multilabel: probs = sigmoid_numpy(logits.reshape(-1)) elif self.hierarchical: @@ -250,7 +254,7 @@ def get_all_probs(self, logits: np.ndarray): cls_heads_info = self.hierarchical_info["cls_heads_info"] for i in range(cls_heads_info["num_multiclass_heads"]): logits_begin, logits_end = cls_heads_info["head_idx_to_logits_range"][str(i)] - probs[logits_begin:logits_end] = softmax_numpy( + probs[logits_begin:logits_end] = softmax( logits[logits_begin:logits_end], ) @@ -260,7 +264,7 @@ def get_all_probs(self, logits: np.ndarray): elif self.embedded_topk: probs = logits.reshape(-1) else: - probs = softmax_numpy(logits.reshape(-1)) + probs = softmax(logits.reshape(-1)) return probs def get_hierarchical_predictions(self, logits: np.ndarray): @@ -270,7 +274,7 @@ def get_hierarchical_predictions(self, logits: np.ndarray): for i in range(cls_heads_info["num_multiclass_heads"]): logits_begin, logits_end = cls_heads_info["head_idx_to_logits_range"][str(i)] head_logits = logits[logits_begin:logits_end] - head_logits = softmax_numpy(head_logits) + head_logits = softmax(head_logits) j = np.argmax(head_logits) label_str = cls_heads_info["all_groups"][i][j] predicted_labels.append(label_str) @@ -287,10 +291,10 @@ def get_hierarchical_predictions(self, logits: np.ndarray): predicted_labels.append(label_str) predicted_scores.append(head_logits[i]) - predictions = zip(predicted_labels, predicted_scores) + predictions = list(zip(predicted_labels, predicted_scores)) return self.labels_resolver.resolve_labels(predictions) - def get_multilabel_predictions(self, logits: np.ndarray): + def get_multilabel_predictions(self, logits: np.ndarray) -> List[Tuple[int, str, float32]]: logits = sigmoid_numpy(logits) scores = [] indices = [] @@ -302,19 +306,19 @@ def get_multilabel_predictions(self, logits: np.ndarray): return list(zip(indices, labels, scores)) - def get_multiclass_predictions(self, outputs): + def get_multiclass_predictions(self, outputs: dict) -> list[tuple[int, str, float]]: if self.embedded_topk: indicesTensor = outputs[self.out_layer_names[0]][0] scoresTensor = outputs[self.out_layer_names[1]][0] labels = [self.labels[i] if self.labels else "" for i in indicesTensor] else: - scoresTensor = softmax_numpy(outputs[self.out_layer_names[0]][0]) + scoresTensor = softmax(outputs[self.out_layer_names[0]][0]) indicesTensor = [np.argmax(scoresTensor)] labels = [self.labels[i] if self.labels else "" for i in indicesTensor] return list(zip(indicesTensor, labels, scoresTensor)) -def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: int, output_raw_scores: bool): +def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: int, output_raw_scores: bool) -> None: softmaxNode = None for i in range(len(inference_adapter.model.outputs)): output_node = inference_adapter.model.get_output_op(i).input(0).get_source_output().get_node() @@ -366,17 +370,12 @@ def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: in inference_adapter.model = ppp.build() -def sigmoid_numpy(x: np.ndarray): +def sigmoid_numpy(x: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-x)) -def softmax_numpy(x: np.ndarray, eps: float = 1e-9): - x = np.exp(x - np.max(x)) - return x / (np.sum(x) + eps) - - class GreedyLabelsResolver: - def __init__(self, hierarchical_config) -> None: + def __init__(self, hierarchical_config: dict) -> None: self.label_to_idx = hierarchical_config["cls_heads_info"]["label_to_idx"] self.label_relations = hierarchical_config["label_tree_edges"] self.label_groups = hierarchical_config["cls_heads_info"]["all_groups"] @@ -385,7 +384,7 @@ def __init__(self, hierarchical_config) -> None: for child, parent in self.label_relations: self.label_tree.add_edge(parent, child) - def resolve_labels(self, predictions): + def resolve_labels(self, predictions: list[tuple]) -> list: """Resolves hierarchical labels and exclusivity based on a list of ScoredLabels (labels with probability). The following two steps are taken: - select the most likely label from each label group @@ -395,7 +394,7 @@ def resolve_labels(self, predictions): predictions: a list of tuples (label name, score) """ - def get_predecessors(lbl, candidates): + def get_predecessors(lbl: str, candidates: list[str]) -> list: """Return all predecessors. Returns all the predecessors of the input label or an empty list if one of the predecessors is not a @@ -443,12 +442,12 @@ def get_predecessors(lbl, candidates): class ProbabilisticLabelsResolver(GreedyLabelsResolver): - def __init__(self, hierarchical_config, warmup_cache=True) -> None: + def __init__(self, hierarchical_config: dict, warmup_cache: bool = True) -> None: super().__init__(hierarchical_config) if warmup_cache: self.label_tree.get_labels_in_topological_order() - def resolve_labels(self, predictions): + def resolve_labels(self, predictions: list[tuple[str, float]]) -> list[tuple[int, str, float]]: """Resolves hierarchical labels and exclusivity based on a list of ScoredLabels (labels with probability). The following two steps are taken: @@ -467,8 +466,8 @@ def resolve_labels(self, predictions): def __resolve_labels_probabilistic( self, - label_to_probability, - ): + label_to_probability: dict[str, float], + ) -> list[tuple[int, str, float]]: """Resolves hierarchical labels and exclusivity based on a probabilistic label output. - selects the most likely (max) label from an exclusive group @@ -567,24 +566,24 @@ class SimpleLabelsGraph: like adding edges, getting children and parents. """ - def __init__(self, vertices): + def __init__(self, vertices: list[str]) -> None: self._v = vertices - self._adj = defaultdict(list) - self._topological_order_cache = None - self._parents_map = {} + self._adj: dict[str, list] = {v: [] for v in vertices} + self._topological_order_cache: list | None = None + self._parents_map: dict[str, str] = {} - def add_edge(self, parent, child): + def add_edge(self, parent: str, child: str) -> None: self._adj[parent].append(child) self._parents_map[child] = parent self.clear_topological_cache() - def get_children(self, label): + def get_children(self, label: str) -> list: return self._adj[label] - def get_parent(self, label): + def get_parent(self, label: str) -> str | None: return self._parents_map.get(label, None) - def get_ancestors(self, label): + def get_ancestors(self, label: str) -> list[str]: """Returns all the ancestors of the input label, including self.""" predecessors = [label] last_parent = self.get_parent(label) @@ -597,14 +596,14 @@ def get_ancestors(self, label): return predecessors - def get_labels_in_topological_order(self): + def get_labels_in_topological_order(self) -> list: if self._topological_order_cache is None: self._topological_order_cache = self.topological_sort() return self._topological_order_cache - def topological_sort(self): - in_degree = defaultdict(int) + def topological_sort(self) -> list: + in_degree: dict[str, int] = dict.fromkeys(self._v, 0) for node_adj in self._adj.values(): for j in node_adj: @@ -631,7 +630,7 @@ def topological_sort(self): return ordered - def clear_topological_cache(self): + def clear_topological_cache(self) -> None: self._topological_order_cache = None @@ -640,7 +639,7 @@ def clear_topological_cache(self): _raw_scores_name = "raw_scores" -def _get_non_xai_names(output_names): +def _get_non_xai_names(output_names: list[str]) -> list[str]: return [ output_name for output_name in output_names @@ -648,7 +647,7 @@ def _get_non_xai_names(output_names): ] -def _append_xai_names(outputs, output_names): +def _append_xai_names(outputs: dict, output_names: list[str]) -> None: if _saliency_map_name in outputs: output_names.append(_saliency_map_name) if _feature_vector_name in outputs: diff --git a/model_api/python/model_api/models/image_model.py b/model_api/python/model_api/models/image_model.py index f7ed1f9a..31404d62 100644 --- a/model_api/python/model_api/models/image_model.py +++ b/model_api/python/model_api/models/image_model.py @@ -3,10 +3,18 @@ # SPDX-License-Identifier: Apache-2.0 # +from __future__ import annotations # TODO: remove when Python3.9 support is dropped + +from typing import TYPE_CHECKING, Any + from model_api.adapters.utils import RESIZE_TYPES, InputTransform +from model_api.models.model import Model +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue + +if TYPE_CHECKING: + import numpy as np -from .model import Model -from .types import BooleanValue, ListValue, NumericalValue, StringValue + from model_api.adapters.inference_adapter import InferenceAdapter class ImageModel(Model): @@ -32,7 +40,7 @@ class ImageModel(Model): __model__ = "ImageModel" - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: """Image model constructor It extends the `Model` constructor. @@ -58,6 +66,7 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): self.scale_values: list self.reverse_input_channels: bool self.embedded_processing: bool + self.labels: list[str] self.nchw_layout = self.inputs[self.image_blob_name].layout == "NCHW" if self.nchw_layout: @@ -89,7 +98,7 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): self.orig_height, self.orig_width = self.h, self.w @classmethod - def parameters(cls): + def parameters(cls) -> dict[str, Any]: parameters = super().parameters() parameters.update( { @@ -136,14 +145,14 @@ def parameters(cls): ) return parameters - def get_label_name(self, label_id): + def get_label_name(self, label_id: int) -> str: if self.labels is None: return f"#{label_id}" if label_id >= len(self.labels): return f"#{label_id}" return self.labels[label_id] - def _get_inputs(self): + def _get_inputs(self) -> tuple[list[str], ...]: """Defines the model inputs for images and additional info. Raises: @@ -169,7 +178,7 @@ def _get_inputs(self): ) return image_blob_names, image_info_blob_names - def preprocess(self, inputs): + def preprocess(self, inputs: np.ndarray) -> list[dict]: """Data preprocess method It performs basic preprocessing of a single image: @@ -194,12 +203,15 @@ def preprocess(self, inputs): } - the input metadata, which might be used in `postprocess` method """ - return {self.image_blob_name: inputs[None]}, { - "original_shape": inputs.shape, - "resized_shape": (self.w, self.h, self.c), - } + return [ + {self.image_blob_name: inputs[None]}, + { + "original_shape": inputs.shape, + "resized_shape": (self.w, self.h, self.c), + }, + ] - def _change_layout(self, image): + def _change_layout(self, image: np.ndarray) -> np.ndarray: """Changes the input image layout to fit the layout of the model input layer. Args: diff --git a/model_api/python/model_api/models/instance_segmentation.py b/model_api/python/model_api/models/instance_segmentation.py index ff39c55b..621c4d61 100644 --- a/model_api/python/model_api/models/instance_segmentation.py +++ b/model_api/python/model_api/models/instance_segmentation.py @@ -6,6 +6,8 @@ import cv2 import numpy as np +from model_api.adapters.inference_adapter import InferenceAdapter + from .image_model import ImageModel from .result_types import InstanceSegmentationResult, SegmentedObject from .types import BooleanValue, ListValue, NumericalValue, StringValue @@ -15,17 +17,22 @@ class MaskRCNNModel(ImageModel): __model__ = "MaskRCNN" - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload) self._check_io_number((1, 2), (3, 4, 5, 6, 8)) + + self.confidence_threshold: float + self.labels: list[str] self.path_to_labels: str + self.postprocess_semantic_masks: bool + if self.path_to_labels: self.labels = load_labels(self.path_to_labels) self.is_segmentoly = len(self.inputs) == 2 self.output_blob_name = self._get_outputs() @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -45,7 +52,7 @@ def parameters(cls): ) return parameters - def _get_outputs(self): # noqa: C901 TODO: Fix this method to reduce complexity + def _get_outputs(self) -> dict: # noqa: C901 TODO: Fix this method to reduce complexity if self.is_segmentoly: return self._get_segmentoly_outputs() filtered_names = [] @@ -84,7 +91,7 @@ def _get_outputs(self): # noqa: C901 TODO: Fix this method to reduce complexity return outputs return self.raise_error(f"Unexpected outputs: {self.outputs}") - def _get_segmentoly_outputs(self): + def _get_segmentoly_outputs(self) -> dict: outputs = {} for layer_name in self.outputs: layer_shape = self.outputs[layer_name].shape @@ -102,7 +109,7 @@ def _get_segmentoly_outputs(self): ) return outputs - def preprocess(self, inputs): + def preprocess(self, inputs: np.ndarray) -> list[dict]: dict_inputs, meta = super().preprocess(inputs) input_image_size = meta["resized_shape"][:2] if self.is_segmentoly: @@ -112,9 +119,9 @@ def preprocess(self, inputs): dtype=np.float32, ) dict_inputs[self.image_info_blob_names[0]] = input_image_info - return dict_inputs, meta + return [dict_inputs, meta] - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: if ( outputs[self.output_blob_name["labels"]].ndim == 2 and outputs[self.output_blob_name["boxes"]].ndim == 3 @@ -174,7 +181,7 @@ def postprocess(self, outputs, meta): if has_feature_vector_name: if not self.labels: self.raise_error("Can't get number of classes because labels are empty") - saliency_maps = [[] for _ in range(len(self.labels))] + saliency_maps: list = [[] for _ in range(len(self.labels))] else: saliency_maps = [] for box, confidence, cls, raw_mask in zip(boxes, scores, labels, masks): @@ -200,13 +207,17 @@ def postprocess(self, outputs, meta): resized_mask = raw_cls_mask if confidence > self.confidence_threshold: output_mask = resized_mask if self.postprocess_semantic_masks else raw_cls_mask + xmin, ymin, xmax, ymax = box.astype(int) objects.append( SegmentedObject( - *box.astype(int), - confidence, - cls, - str_label, - output_mask, + xmin, + ymin, + xmax, + ymax, + score=confidence, + id=cls, + str_label=str_label, + mask=output_mask, ), ) if has_feature_vector_name and confidence > self.confidence_threshold: @@ -218,7 +229,7 @@ def postprocess(self, outputs, meta): ) -def _average_and_normalize(saliency_maps): +def _average_and_normalize(saliency_maps: list) -> list: aggregated = [] for per_object_maps in saliency_maps: if per_object_maps: @@ -231,7 +242,7 @@ def _average_and_normalize(saliency_maps): return aggregated -def _expand_box(box, scale): +def _expand_box(box: np.ndarray, scale: float) -> np.ndarray: w_half = (box[2] - box[0]) * 0.5 * scale h_half = (box[3] - box[1]) * 0.5 * scale x_c = (box[2] + box[0]) * 0.5 @@ -244,7 +255,7 @@ def _expand_box(box, scale): return box_exp -def _segm_postprocess(box, raw_cls_mask, im_h, im_w): +def _segm_postprocess(box: np.ndarray, raw_cls_mask: np.ndarray, im_h: int, im_w: int) -> np.ndarray: # Add zero border to prevent upsampling artifacts on segment borders. raw_cls_mask = np.pad(raw_cls_mask, ((1, 1), (1, 1)), "constant", constant_values=0) extended_box = _expand_box( @@ -270,7 +281,7 @@ def _segm_postprocess(box, raw_cls_mask, im_h, im_w): _feature_vector_name = "feature_vector" -def _append_xai_names(outputs, output_names): +def _append_xai_names(outputs: dict, output_names: dict) -> None: if _saliency_map_name in outputs: output_names["saliency_map"] = _saliency_map_name if _feature_vector_name in outputs: diff --git a/model_api/python/model_api/models/model.py b/model_api/python/model_api/models/model.py index 9b065455..1eb7a606 100644 --- a/model_api/python/model_api/models/model.py +++ b/model_api/python/model_api/models/model.py @@ -3,11 +3,13 @@ # SPDX-License-Identifier: Apache-2.0 # +from __future__ import annotations # TODO: remove when Python3.9 support is dropped + import logging as log import re from contextlib import contextmanager +from typing import TYPE_CHECKING, Any, NoReturn, Type -from model_api.adapters.inference_adapter import InferenceAdapter from model_api.adapters.onnx_adapter import ONNXRuntimeAdapter from model_api.adapters.openvino_adapter import ( OpenvinoAdapter, @@ -16,11 +18,18 @@ ) from model_api.adapters.ovms_adapter import OVMSAdapter +if TYPE_CHECKING: + from os import PathLike + + from numpy import ndarray + + from model_api.adapters.inference_adapter import InferenceAdapter + class WrapperError(Exception): """The class for errors occurred in Model API wrappers""" - def __init__(self, wrapper_name, message): + def __init__(self, wrapper_name, message) -> None: super().__init__(f"{wrapper_name}: {message}") @@ -52,7 +61,7 @@ class Model: __model__: str = "Model" - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: """Model constructor Args: @@ -98,7 +107,7 @@ def get_model(self): return model @classmethod - def get_model_class(cls, name): + def get_model_class(cls, name: str) -> Type: subclasses = [subclass for subclass in cls.get_subclasses() if subclass.__model__] if cls.__model__: subclasses.append(cls) @@ -113,21 +122,21 @@ def get_model_class(cls, name): @classmethod def create_model( cls, - model, - model_type=None, - configuration={}, - preload=True, - core=None, - weights_path="", - adaptor_parameters={}, - device="AUTO", - nstreams="1", - nthreads=None, - max_num_requests=0, - precision="FP16", - download_dir=None, - cache_dir=None, - ): + model: str, + model_type: Any | None = None, + configuration: dict[str, Any] = {}, + preload: bool = True, + core: Any | None = None, + weights_path: PathLike | None = None, + adaptor_parameters: dict[str, Any] = {}, + device: str = "AUTO", + nstreams: str = "1", + nthreads: int | None = None, + max_num_requests: int = 0, + precision: str = "FP16", + download_dir: PathLike | None = None, + cache_dir: PathLike | None = None, + ) -> Any: """Create an instance of the Model API model Args: @@ -152,9 +161,8 @@ def create_model( Returns: Model object """ - if isinstance(model, InferenceAdapter): - inference_adapter = model - elif isinstance(model, str) and re.compile( + inference_adapter: InferenceAdapter + if isinstance(model, str) and re.compile( r"(\w+\.*\-*)*\w+:\d+\/models\/[a-zA-Z0-9._-]+(\:\d+)*", ).fullmatch(model): inference_adapter = OVMSAdapter(model) @@ -182,7 +190,7 @@ def create_model( return Model(inference_adapter, configuration, preload) @classmethod - def get_subclasses(cls): + def get_subclasses(cls) -> list[Any]: all_subclasses = [] for subclass in cls.__subclasses__(): all_subclasses.append(subclass) @@ -196,7 +204,7 @@ def available_wrappers(cls): return [subclass.__model__ for subclass in available_classes if subclass.__model__] @classmethod - def parameters(cls): + def parameters(cls) -> dict[str, Any]: """Defines the description and type of configurable data parameters for the wrapper. See `types.py` to find available types of the data parameter. For each parameter @@ -214,7 +222,7 @@ def parameters(cls): """ return {} - def _load_config(self, config): + def _load_config(self, config: dict[str, Any]) -> None: """Reads the configuration and creates data attributes by setting the wrapper parameters with values from configuration. @@ -265,7 +273,7 @@ def _load_config(self, config): ) @classmethod - def raise_error(cls, message): + def raise_error(cls, message) -> NoReturn: """Raises the WrapperError. Args: @@ -292,7 +300,7 @@ def preprocess(self, inputs): """ raise NotImplementedError - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict[str, Any], meta: dict[str, Any]): """Interface for postprocess method. Args: @@ -309,7 +317,11 @@ def postprocess(self, outputs, meta): """ raise NotImplementedError - def _check_io_number(self, number_of_inputs, number_of_outputs): + def _check_io_number( + self, + number_of_inputs: int | tuple[int, ...], + number_of_outputs: int | tuple[int, ...], + ) -> None: """Checks whether the number of model inputs/outputs is supported. Args: @@ -321,47 +333,32 @@ def _check_io_number(self, number_of_inputs, number_of_outputs): Raises: WrapperError: if the model has unsupported number of inputs/outputs """ - if not isinstance(number_of_inputs, tuple): + if isinstance(number_of_inputs, int): if len(self.inputs) != number_of_inputs and number_of_inputs != -1: self.raise_error( - "Expected {} input blob{}, but {} found: {}".format( - number_of_inputs, - "s" if number_of_inputs != 1 else "", - len(self.inputs), - ", ".join(self.inputs), - ), + f"Expected {number_of_inputs} input blob {'s' if number_of_inputs != 1 else ''}, " + f"but {len(self.inputs)} found: {', '.join(self.inputs)}", ) elif len(self.inputs) not in number_of_inputs: self.raise_error( - "Expected {} or {} input blobs, but {} found: {}".format( - ", ".join(str(n) for n in number_of_inputs[:-1]), - int(number_of_inputs[-1]), - len(self.inputs), - ", ".join(self.inputs), - ), + f"Expected {', '.join(str(n) for n in number_of_inputs[:-1])} or " + f"{int(number_of_inputs[-1])} input blobs, but {len(self.inputs)} found: {', '.join(self.inputs)}", ) - if not isinstance(number_of_outputs, tuple): + if isinstance(number_of_outputs, int): if len(self.outputs) != number_of_outputs and number_of_outputs != -1: self.raise_error( - "Expected {} output blob{}, but {} found: {}".format( - number_of_outputs, - "s" if number_of_outputs != 1 else "", - len(self.outputs), - ", ".join(self.outputs), - ), + f"Expected {number_of_outputs} output blob {'s' if number_of_outputs != 1 else ''}, " + f"but {len(self.outputs)} found: {', '.join(self.outputs)}", ) elif len(self.outputs) not in number_of_outputs: self.raise_error( - "Expected {} or {} output blobs, but {} found: {}".format( - ", ".join(str(n) for n in number_of_outputs[:-1]), - int(number_of_outputs[-1]), - len(self.outputs), - ", ".join(self.outputs), - ), + f"Expected {', '.join(str(n) for n in number_of_outputs[:-1])} or " + f"{int(number_of_outputs[-1])} output blobs, " + f"but {len(self.outputs)} found: {', '.join(self.outputs)}", ) - def __call__(self, inputs): + def __call__(self, inputs: ndarray): """Applies preprocessing, synchronous inference, postprocessing routines while one call. Args: @@ -407,7 +404,7 @@ def batch_infer_callback(result, id): return [completed_results[i] for i in range(len(inputs))] - def load(self, force=False): + def load(self, force: bool = False) -> None: if not self.model_loaded or force: self.model_loaded = True self.inference_adapter.load_model() @@ -423,7 +420,7 @@ def reshape(self, new_shape): self.inputs = self.inference_adapter.get_input_layers() self.outputs = self.inference_adapter.get_output_layers() - def infer_sync(self, dict_data): + def infer_sync(self, dict_data: dict[str, ndarray]) -> dict[str, ndarray]: if not self.model_loaded: self.raise_error( "The model is not loaded to the device. Please, create the wrapper " diff --git a/model_api/python/model_api/models/sam_models.py b/model_api/python/model_api/models/sam_models.py index 06eeaf5f..d0d3f4fb 100644 --- a/model_api/python/model_api/models/sam_models.py +++ b/model_api/python/model_api/models/sam_models.py @@ -6,7 +6,7 @@ from __future__ import annotations # TODO: remove when Python3.9 support is dropped from copy import deepcopy -from typing import Any +from typing import TYPE_CHECKING, Any import numpy as np @@ -16,6 +16,9 @@ from .image_model import ImageModel from .segmentation import SegmentationModel +if TYPE_CHECKING: + from model_api.adapters.inference_adapter import InferenceAdapter + class SAMImageEncoder(ImageModel): """Image Encoder for SAM: https://arxiv.org/abs/2304.02643""" @@ -51,11 +54,11 @@ def parameters(cls) -> dict[str, Any]: def preprocess( self, inputs: np.ndarray, - ) -> tuple[dict[str, np.ndarray], dict[str, Any]]: + ) -> list[dict]: """Update meta for image encoder.""" dict_inputs, meta = super().preprocess(inputs) meta["resize_type"] = self.resize_type - return dict_inputs, meta + return [dict_inputs, meta] def postprocess( self, @@ -123,7 +126,7 @@ def parameters(cls) -> dict[str, Any]: def _get_outputs(self) -> str: return "upscaled_masks" - def preprocess(self, inputs: dict[str, Any]) -> list[dict[str, Any]]: + def preprocess(self, inputs: dict[str, Any]) -> list[dict]: """Preprocess prompts.""" processed_prompts: list[dict[str, Any]] = [] for prompt_name in ["bboxes", "points"]: @@ -189,8 +192,8 @@ def _get_preprocess_shape( def _check_io_number( self, - number_of_inputs: int | tuple[int], - number_of_outputs: int | tuple[int], + number_of_inputs: int | tuple[int, ...], + number_of_outputs: int | tuple[int, ...], ) -> None: pass diff --git a/model_api/python/model_api/models/segmentation.py b/model_api/python/model_api/models/segmentation.py index fdd88764..cbb5f7c8 100644 --- a/model_api/python/model_api/models/segmentation.py +++ b/model_api/python/model_api/models/segmentation.py @@ -5,15 +5,20 @@ from __future__ import annotations # TODO: remove when Python3.9 support is dropped -from collections.abc import Iterable +from typing import TYPE_CHECKING import cv2 import numpy as np -from .image_model import ImageModel -from .result_types import Contour, ImageResultWithSoftPrediction -from .types import BooleanValue, ListValue, NumericalValue, StringValue -from .utils import load_labels +from model_api.models.image_model import ImageModel +from model_api.models.result_types import Contour, ImageResultWithSoftPrediction +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue +from model_api.models.utils import load_labels + +if TYPE_CHECKING: + from collections.abc import Iterable + + from model_api.adapters.inference_adapter import InferenceAdapter def create_hard_prediction_from_soft_prediction( @@ -69,16 +74,20 @@ class SegmentationModel(ImageModel): __model__ = "Segmentation" - def __init__(self, inference_adapter, configuration: dict = {}, preload=False): + def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, (1, 2)) + self.labels: list[str] self.path_to_labels: str + self.blur_strength: int + self.soft_threshold: float + self.return_soft_prediction: bool if self.path_to_labels: self.labels = load_labels(self.path_to_labels) self.output_blob_name = self._get_outputs() - def _get_outputs(self): + def _get_outputs(self) -> str: out_name = "" for name, output in self.outputs.items(): if _feature_vector_name not in output.names: @@ -104,7 +113,7 @@ def _get_outputs(self): return out_name @classmethod - def parameters(cls): + def parameters(cls) -> dict: parameters = super().parameters() parameters.update( { @@ -133,7 +142,7 @@ def parameters(cls): ) return parameters - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> ImageResultWithSoftPrediction | cv2.Mat: input_image_height = meta["original_shape"][0] input_image_width = meta["original_shape"][1] predictions = outputs[self.output_blob_name].squeeze() @@ -222,7 +231,7 @@ def get_contours( class SalientObjectDetectionModel(SegmentationModel): __model__ = "Salient_Object_Detection" - def postprocess(self, outputs, meta): + def postprocess(self, outputs: dict, meta: dict) -> cv2.Mat: input_image_height = meta["original_shape"][0] input_image_width = meta["original_shape"][1] result = outputs[self.output_blob_name].squeeze() @@ -239,7 +248,7 @@ def postprocess(self, outputs, meta): _feature_vector_name = "feature_vector" -def _get_activation_map(features: np.ndarray | Iterable | int | float): +def _get_activation_map(features: np.ndarray | Iterable | int | float) -> np.ndarray: """Getter activation_map functions.""" min_soft_score = np.min(features) max_soft_score = np.max(features) diff --git a/model_api/python/model_api/models/types.py b/model_api/python/model_api/models/types.py index 37013f2e..53d86b38 100644 --- a/model_api/python/model_api/models/types.py +++ b/model_api/python/model_api/models/types.py @@ -3,6 +3,10 @@ # SPDX-License-Identifier: Apache-2.0 # +from __future__ import annotations # TODO: remove when Python3.9 support is dropped + +from typing import Any + class ConfigurableValueError(ValueError): def __init__(self, message, prefix=None): @@ -13,8 +17,8 @@ def __init__(self, message, prefix=None): class BaseValue: def __init__( self, - description="No description available", - default_value=None, + description: str = "No description available", + default_value: Any | None = None, ) -> None: self.default_value = default_value self.description = description @@ -45,10 +49,10 @@ def __str__(self) -> str: class NumericalValue(BaseValue): def __init__( self, - value_type=float, - choices=(), - min=None, - max=None, + value_type: Any = float, + choices: tuple[Any, ...] = (), + min: Any | None = None, + max: Any | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -57,7 +61,7 @@ def __init__( self.max = max self.value_type = value_type - def from_str(self, value): + def from_str(self, value: str) -> Any: return self.value_type(value) def validate(self, value): @@ -102,10 +106,10 @@ def __str__(self) -> str: class StringValue(BaseValue): def __init__( self, - choices=(), - description="No description available", - default_value="", - ): + choices: tuple[Any, ...] = (), + description: str = "No description available", + default_value: str = "", + ) -> None: super().__init__(description, default_value) self.choices = choices for choice in self.choices: @@ -113,7 +117,7 @@ def __init__( msg = f"Incorrect option in choice list - {choice}." raise ValueError(msg) - def from_str(self, value): + def from_str(self, value: str) -> str: return value def validate(self, value): @@ -147,7 +151,7 @@ class BooleanValue(BaseValue): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - def from_str(self, value): + def from_str(self, value: str) -> bool: return value == "YES" or value == "True" def validate(self, value): @@ -166,14 +170,14 @@ def validate(self, value): class ListValue(BaseValue): def __init__( self, - value_type=None, - description="No description available", - default_value=[], + value_type: Any | None = None, + description: str = "No description available", + default_value: list[Any] = [], ) -> None: super().__init__(description, default_value) self.value_type = value_type - def from_str(self, value): + def from_str(self, value: str) -> list[Any]: try: floats = [float(i) for i in value.split()] try: @@ -224,7 +228,7 @@ class DictValue(BaseValue): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - def from_str(self, value): + def from_str(self, value: str): raise NotImplementedError def validate(self, value): diff --git a/model_api/python/model_api/models/utils.py b/model_api/python/model_api/models/utils.py index 98d9e09d..823a1ea6 100644 --- a/model_api/python/model_api/models/utils.py +++ b/model_api/python/model_api/models/utils.py @@ -10,10 +10,10 @@ import cv2 import numpy as np -from .result_types import Contour, SegmentedObject, SegmentedObjectWithRects +from model_api.models.result_types import Contour, Detection, SegmentedObject, SegmentedObjectWithRects -def add_rotated_rects(segmented_objects): +def add_rotated_rects(segmented_objects: list[SegmentedObject]) -> list[SegmentedObjectWithRects]: objects_with_rects = [] for segmented_object in segmented_objects: mask = segmented_object.mask.astype(np.uint8) @@ -32,7 +32,7 @@ def add_rotated_rects(segmented_objects): def get_contours( segmentedObjects: list[SegmentedObject | SegmentedObjectWithRects], -): +) -> list[Contour]: combined_contours = [] for obj in segmentedObjects: contours, _ = cv2.findContours( @@ -49,7 +49,7 @@ def get_contours( return combined_contours -def clip_detections(detections, size): +def clip_detections(detections: list[Detection], size: tuple[int, int]) -> list[Detection]: for detection in detections: detection.xmin = min(max(round(detection.xmin), 0), size[1]) detection.ymin = min(max(round(detection.ymin), 0), size[0]) @@ -94,7 +94,16 @@ def load_labels(label_file): return [x.strip() for x in f] -def nms(x1, y1, x2, y2, scores, thresh, include_boundaries=False, keep_top_k=0): +def nms( + x1: np.ndarray, + y1: np.ndarray, + x2: np.ndarray, + y2: np.ndarray, + scores: np.ndarray, + thresh: float, + include_boundaries: bool = False, + keep_top_k: int = 0, +) -> list[int]: b = 1 if include_boundaries else 0 areas = (x2 - x1 + b) * (y2 - y1 + b) order = scores.argsort()[::-1] @@ -130,9 +139,9 @@ def nms(x1, y1, x2, y2, scores, thresh, include_boundaries=False, keep_top_k=0): def multiclass_nms( - detections, - iou_threshold=0.45, - max_num=200, + detections: np.ndarray, + iou_threshold: float = 0.45, + max_num: int = 200, ): """Multi-class NMS. @@ -158,7 +167,7 @@ def multiclass_nms( offsets = labels.astype(boxes.dtype) * (max_coordinate + 1) boxes_for_nms = boxes + offsets[:, None] - keep = nms(*boxes_for_nms.T, scores, iou_threshold) + keep = nms(*boxes_for_nms.T, scores=scores, thresh=iou_threshold) # type: ignore[misc] if max_num > 0: keep = keep[:max_num] keep = np.array(keep) @@ -166,6 +175,6 @@ def multiclass_nms( return det, keep -def softmax(logits, axis=None, keepdims=False): +def softmax(logits: np.ndarray, eps: float = 1e-9, axis=None, keepdims: bool = False) -> np.ndarray: exp = np.exp(logits - np.max(logits)) - return exp / np.sum(exp, axis=axis, keepdims=keepdims) + return exp / (np.sum(exp, axis=axis, keepdims=keepdims) + eps) diff --git a/model_api/python/pyproject.toml b/model_api/python/pyproject.toml index c3628d22..c011e57f 100644 --- a/model_api/python/pyproject.toml +++ b/model_api/python/pyproject.toml @@ -121,7 +121,7 @@ lint.select = [ "SLF", # flake8-self (`SLF`) "SIM", # flake8-simplify (`SIM`) "TID", # flake8-tidy-imports (`TID`) - # "TCH", # flake8-type-checking (`TCH`) + "TCH", # flake8-type-checking (`TCH`) "INT", # flake8-gettext (`INT`) # "ARG", # flake8-unsused-arguments (`ARG`) "PTH", # flake8-use-pathlib (`PTH`)