diff --git a/monai/deploy/operators/dicom_series_selector_operator.py b/monai/deploy/operators/dicom_series_selector_operator.py index 0a1a4545..9249b4d9 100644 --- a/monai/deploy/operators/dicom_series_selector_operator.py +++ b/monai/deploy/operators/dicom_series_selector_operator.py @@ -1,4 +1,4 @@ -# Copyright 2021-2023 MONAI Consortium +# Copyright 2021-2025 MONAI Consortium # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,6 +15,8 @@ from json import loads as json_loads from typing import List +import numpy as np + from monai.deploy.core import ConditionType, Fragment, Operator, OperatorSpec from monai.deploy.core.domain.dicom_series import DICOMSeries from monai.deploy.core.domain.dicom_series_selection import SelectedSeries, StudySelectedSeries @@ -30,23 +32,24 @@ class DICOMSeriesSelectorOperator(Operator): Named output: study_selected_series_list: A list of StudySelectedSeries objects. Downstream receiver optional. - This class can be considered a base class, and a derived class can override the 'filter' function to with - custom logics. + This class can be considered a base class, and a derived class can override the 'filter' function with + custom logic. In its default implementation, this class 1. selects a series or all matched series within the scope of a study in a list of studies 2. uses rules defined in JSON string, see below for details 3. supports DICOM Study and Series module attribute matching 4. supports multiple named selections, in the scope of each DICOM study - 5. outputs a list of SutdySelectedSeries objects, as well as a flat list of SelectedSeries (to be deprecated) + 5. outputs a list of StudySelectedSeries objects, as well as a flat list of SelectedSeries (to be deprecated) The selection rules are defined in JSON, 1. attribute "selections" value is a list of selections 2. each selection has a "name", and its "conditions" value is a list of matching criteria 3. each condition uses the implicit equal operator; in addition, the following are supported: - - regex and range matching for float and int types + - regex, relational, and range matching for float and int types - regex matching for str type - - union matching for set type + - inclusion and exclusion matching for set type + - image orientation check for the ImageOrientationPatient tag 4. DICOM attribute keywords are used, and only for those defined as DICOMStudy and DICOMSeries properties An example selection rules: @@ -76,6 +79,23 @@ class DICOMSeriesSelectorOperator(Operator): "ImageType": ["PRIMARY", "ORIGINAL", "AXIAL"], "SliceThickness": [3, 5] } + }, + { + "name": "CT Series 4", + "conditions": { + "StudyDescription": "(.*?)", + "Modality": "(?i)CT", + "ImageOrientationPatient": "Axial", + "SliceThickness": [2, ">"] + } + }, + { + "name": "CT Series 5", + "conditions": { + "StudyDescription": "(.*?)", + "Modality": "(?i)CT", + "ImageType": ["PRIMARY", "!SECONDARY"] + } } ] } @@ -101,8 +121,6 @@ def __init__( of DICOM images); Defaults to False for no sorting. """ - # rules: Text = "", all_matched: bool = False, - # Delay loading the rules as JSON string till compute time. self._rules_json_str = rules if rules and rules.strip() else None self._all_matched = all_matched # all_matched @@ -128,12 +146,12 @@ def compute(self, op_input, op_output, context): selection_rules, dicom_study_list, self._all_matched, self._sort_by_sop_instance_count ) - # log Series Description and Series Instance UID of the first selected DICOM Series (i.e. the one to be used for inference) + # Log Series Description and Series Instance UID of the first selected DICOM Series (i.e. the one to be used for inference) if study_selected_series and len(study_selected_series) > 0: inference_study = study_selected_series[0] if inference_study.selected_series and len(inference_study.selected_series) > 0: inference_series = inference_study.selected_series[0].series - logging.info("Series Selection finalized.") + logging.info("Series Selection finalized") logging.info( f"Series Description of selected DICOM Series for inference: {inference_series.SeriesDescription}" ) @@ -151,9 +169,10 @@ def filter( If rules object is None, all series will be returned with series instance UID as the selection name. Supported matching logic: - Float + Int: exact matching, range matching (if a list with two numerical elements is provided), and regex matching + Float + Int: exact matching, relational matching, range matching, and regex matching String: matches case insensitive, if fails then tries RegEx search - String array (set): matches as subset, case insensitive + String array (set): inclusive and exclusive (via !) matching as subsets, case insensitive + ImageOrientationPatient tag: image orientation (Axial, Coronal, Sagittal) matching Args: selection_rules (object): JSON object containing the matching rules. @@ -244,6 +263,9 @@ def _select_series( Returns: List of DICOMSeries. At most one element if all_matched is False. + + Raises: + NotImplementedError: If the value_to_match type is not supported for matching or unsupported PatientPosition value. """ assert isinstance(attributes, dict), '"attributes" must be a dict.' @@ -261,13 +283,13 @@ def _select_series( matched = True # Simple matching on attribute value for key, value_to_match in attributes.items(): - logging.info(f"On attribute: {key!r} to match value: {value_to_match!r}") + logging.info(f" On attribute: {key!r} to match value: {value_to_match!r}") # Ignore None if not value_to_match: continue # Try getting the attribute value from Study and current Series prop dict attr_value = series_attr.get(key, None) - logging.info(f" Series attribute {key} value: {attr_value}") + logging.info(f" Series attribute {key} value: {attr_value}") # If not found, try the best at the native instance level for string VR # This is mainly for attributes like ImageType @@ -280,29 +302,31 @@ def _select_series( else: attr_value = elem.value # element's value + logging.info(f" Instance level attribute {key} value: {attr_value}") series_attr.update({key: attr_value}) except Exception: - logging.info(f" Attribute {key} not at instance level either.") + logging.info(f" Attribute {key} not at instance level either") if not attr_value: + logging.info(f" Missing attribute: {key!r}") matched = False - elif isinstance(attr_value, float) or isinstance(attr_value, int): - # range matching - if isinstance(value_to_match, list) and len(value_to_match) == 2: - lower_bound, upper_bound = map(float, value_to_match) - matched = lower_bound <= attr_value <= upper_bound - # RegEx matching - elif isinstance(value_to_match, str): - matched = bool(re.fullmatch(value_to_match, str(attr_value))) - # exact matching - else: - matched = value_to_match == attr_value + # Image orientation check + elif key == "ImageOrientationPatient": + patient_position = series_attr.get("PatientPosition") + if patient_position is None: + raise NotImplementedError( + "PatientPosition tag absent; value required for image orientation calculation" + ) + if patient_position not in ("HFP", "HFS", "HFDL", "HFDR", "FFP", "FFS", "FFDL", "FFDR"): + raise NotImplementedError(f"No support for PatientPosition value {patient_position}") + matched = self._match_image_orientation(value_to_match, attr_value) + elif isinstance(attr_value, (float, int)): + matched = self._match_numeric_condition(value_to_match, attr_value) elif isinstance(attr_value, str): matched = attr_value.casefold() == (value_to_match.casefold()) if not matched: # For str, also try RegEx search to check for a match anywhere in the string # unless the user constrains it in the expression. - logging.info("Series attribute string value did not match. Try regEx.") if re.search(value_to_match, attr_value, re.IGNORECASE): matched = True elif isinstance(attr_value, list): @@ -310,14 +334,26 @@ def _select_series( meta_data_list = str(attr_value).lower() if isinstance(value_to_match, list): value_set = {str(element).lower() for element in value_to_match} - matched = all(val in meta_data_list for val in value_set) + # split inclusion and exclusion matches using ! indicator + include_terms = {v for v in value_set if not v.startswith("!")} + exclude_terms = {v[1:] for v in value_set if v.startswith("!")} + matched = all(term in meta_data_list for term in include_terms) and all( + term not in meta_data_list for term in exclude_terms + ) elif isinstance(value_to_match, (str, numbers.Number)): - matched = str(value_to_match).lower() in meta_data_list + v = str(value_to_match).lower() + # ! indicates exclusion match + if v.startswith("!"): + matched = v[1:] not in meta_data_list + else: + matched = v in meta_data_list else: - raise NotImplementedError(f"Not support for matching on this type: {type(value_to_match)}") + raise NotImplementedError( + f"No support for matching condition {value_to_match} (type: {type(value_to_match)})" + ) if not matched: - logging.info("This series does not match the selection conditions.") + logging.info("This series does not match the selection conditions") break if matched: @@ -327,9 +363,8 @@ def _select_series( if not all_matched: return found_series - # if sorting indicated and multiple series found + # If sorting indicated and multiple series found, sort series in descending SOP instance count if sort_by_sop_instance_count and len(found_series) > 1: - # sort series in descending SOP instance count logging.info( "Multiple series matched the selection criteria; choosing series with the highest number of DICOM images." ) @@ -337,6 +372,145 @@ def _select_series( return found_series + def _match_numeric_condition(self, value_to_match, attr_value): + """ + Helper method to match numeric conditions, supporting relational, inclusive range, regex, and exact match checks. + + Supported formats: + - [val, ">"]: match if attr_value > val + - [val, ">="]: match if attr_value >= val + - [val, "<"]: match if attr_value < val + - [val, "<="]: match if attr_value <= val + - [val, "!="]: match if attr_value != val + - [min_val, max_val]: inclusive range check + - "regex": regular expression match + - number: exact match + + Args: + value_to_match (Union[list, str, int, float]): The condition to match against. + attr_value (Union[int, float]): The attribute value from the series. + + Returns: + bool: True if the attribute value matches the condition, else False. + + Raises: + NotImplementedError: If the value_to_match condition is not supported for numeric matching. + """ + + if isinstance(value_to_match, list): + # Relational operator check: >, >=, <, <=, != + if len(value_to_match) == 2 and isinstance(value_to_match[1], str): + val = float(value_to_match[0]) + op = value_to_match[1] + if op == ">": + return attr_value > val + elif op == ">=": + return attr_value >= val + elif op == "<": + return attr_value < val + elif op == "<=": + return attr_value <= val + elif op == "!=": + return attr_value != val + else: + raise NotImplementedError( + f"Unsupported relational operator {op!r} in numeric condition. Must be one of: '>', '>=', '<', '<=', '!='" + ) + + # Inclusive range check + elif len(value_to_match) == 2 and all(isinstance(v, (int, float)) for v in value_to_match): + return value_to_match[0] <= attr_value <= value_to_match[1] + + else: + raise NotImplementedError(f"No support for numeric matching condition {value_to_match}") + + # Regular expression match + elif isinstance(value_to_match, str): + return bool(re.fullmatch(value_to_match, str(attr_value))) + + # Exact numeric match + elif isinstance(value_to_match, (int, float)): + return value_to_match == attr_value + + else: + raise NotImplementedError(f"No support for numeric matching on this type: {type(value_to_match)}") + + def _match_image_orientation(self, value_to_match, attr_value): + """ + Helper method to calculate and match the image orientation using the ImageOrientationPatient tag. + The following PatientPosition values are supported and have been tested: + - "HFP" + - "HFS" + - "HFDL" + - "HFDR" + - "FFP" + - "FFS" + - "FFDL" + - "FFDR" + + Supported image orientation inputs for matching (case-insensitive): + - "Axial" + - "Coronal" + - "Sagittal" + + Args: + value_to_match (str): The image orientation condition to match against. + attr_value (List[str]): Raw ImageOrientationPatient tag value from the series. + + Returns: + bool: True if the computed orientation matches the expected orientation, else False. + + Raises: + ValueError: If the expected orientation is invalid or the normal vector cannot be computed. + """ + + # Validate image orientation to match input + value_to_match = value_to_match.strip().lower().capitalize() + allowed_orientations = {"Axial", "Coronal", "Sagittal"} + if value_to_match not in allowed_orientations: + raise ValueError(f"Invalid orientation string {value_to_match!r}. Must be one of: {allowed_orientations}") + + # Format ImageOrientationPatient tag value as an array and grab row and column cosines + iop_str = attr_value[0].strip("[]") + iop = [float(x.strip()) for x in iop_str.split(",")] + row_cosines = np.array(iop[:3], dtype=np.float64) + col_cosines = np.array(iop[3:], dtype=np.float64) + + # Validate DICOM constraints (normal row and column cosines + should be orthogonal) + # Throw warnings if tolerance exceeded + tolerance = 1e-4 + row_norm = np.linalg.norm(row_cosines) + col_norm = np.linalg.norm(col_cosines) + dot_product = np.dot(row_cosines, col_cosines) + + if abs(row_norm - 1.0) > tolerance: + logging.warn(f"Row direction cosine normal is {row_norm}, deviates from 1 by more than {tolerance}") + if abs(col_norm - 1.0) > tolerance: + logging.warn(f"Column direction cosine normal is {col_norm}, deviates from 1 by more than {tolerance}") + if abs(dot_product) > tolerance: + logging.warn(f"Row and Column cosines are not orthogonal: dot product = {dot_product}") + + # Normalize row and column vectors + row_cosines /= np.linalg.norm(row_cosines) + col_cosines /= np.linalg.norm(col_cosines) + + # Compute and validate slice normal + normal = np.cross(row_cosines, col_cosines) + if np.linalg.norm(normal) == 0: + raise ValueError("Invalid normal vector computed from IOP") + + # Normalize the slice normal + normal /= np.linalg.norm(normal) + + # Identify the dominant image orientation + axis_labels = ["Sagittal", "Coronal", "Axial"] + major_axis = np.argmax(np.abs(normal)) + computed_orientation = axis_labels[major_axis] + + logging.info(f" Computed orientation from ImageOrientationPatient value: {computed_orientation}") + + return bool(computed_orientation == value_to_match) + @staticmethod def _get_instance_properties(obj: object): if not obj: @@ -368,9 +542,9 @@ def test(): study_list = loader.load_data_to_studies(data_path) sample_selection_rule = json_loads(Sample_Rules_Text) print(f"Selection rules in JSON:\n{sample_selection_rule}") - study_selected_seriee_list = selector.filter(sample_selection_rule, study_list) + study_selected_series_list = selector.filter(sample_selection_rule, study_list) - for sss_obj in study_selected_seriee_list: + for sss_obj in study_selected_series_list: _print_instance_properties(sss_obj, pre_fix="", print_val=False) study = sss_obj.study pre_fix = " " @@ -429,6 +603,23 @@ def test(): "ImageType": ["PRIMARY", "ORIGINAL", "AXIAL"], "SliceThickness": [3, 5] } + }, + { + "name": "CT Series 4", + "conditions": { + "StudyDescription": "(.*?)", + "Modality": "(?i)MR", + "ImageOrientationPatient": "Axial", + "SliceThickness": [2, ">"] + } + }, + { + "name": "CT Series 5", + "conditions": { + "StudyDescription": "(.*?)", + "Modality": "(?i)CT", + "ImageType": ["PRIMARY", "!SECONDARY"] + } } ] }