11import re
22from datetime import datetime
33from enum import Enum
4+ from functools import cached_property
45from typing import Any , Generic , Iterable , List , Optional , Pattern , Set , Tuple , TypeVar
56
67from elementary .utils .log import get_logger
@@ -49,23 +50,12 @@ def normalized_status(self) -> List[Status]:
4950 return [Status (status ) for status in self .statuses if status in list (Status )]
5051
5152
52- def apply_filter (filter_type : FilterType , value : Any , filter_value : Any ) -> bool :
53- if filter_type == FilterType .IS :
54- return value == filter_value
55- elif filter_type == FilterType .IS_NOT :
56- return value != filter_value
57- elif filter_type == FilterType .CONTAINS :
58- return str (filter_value ).lower () in str (value ).lower ()
59- elif filter_type == FilterType .NOT_CONTAINS :
60- return str (filter_value ).lower () not in str (value ).lower ()
61- raise ValueError (f"Unsupported filter type: { filter_type } " )
62-
63-
6453ValueT = TypeVar ("ValueT" )
6554
6655
6756ANY_OPERATORS = [FilterType .IS , FilterType .CONTAINS ]
6857ALL_OPERATORS = [FilterType .IS_NOT , FilterType .NOT_CONTAINS ]
58+ NEGATIVE_OPERATORS = [FilterType .IS_NOT , FilterType .NOT_CONTAINS ]
6959
7060
7161class FilterSchema (BaseModel , Generic [ValueT ]):
@@ -77,42 +67,62 @@ class Config:
7767 # Make sure that serializing Enum return values
7868 use_enum_values = True
7969
80- def _apply_filter_type (self , value : ValueT , filter_value : ValueT ) -> bool :
81- return apply_filter (self .type , value , filter_value )
70+ @staticmethod
71+ def normalize_value (value : Any ) -> str :
72+ if isinstance (value , Enum ):
73+ return str (value .value ).lower ()
74+ return str (value ).lower ()
8275
83- def apply_filter_on_value (self , value : ValueT ) -> bool :
84- if self .type in ANY_OPERATORS :
85- return any (
86- self ._apply_filter_type (value , filter_value )
87- for filter_value in self .values
76+ @staticmethod
77+ def normalize_values (values : Iterable [ValueT ]) -> Set [str ]:
78+ return {FilterSchema .normalize_value (value ) for value in values }
79+
80+ @cached_property
81+ def _normalized_values (self ) -> Set [str ]:
82+ return FilterSchema .normalize_values (self .values )
83+
84+ def get_matching_normalized_values (self , values : Set [str ]) -> Set [str ]:
85+ if self .type == FilterType .IS :
86+ return values .intersection (self ._normalized_values )
87+ elif self .type == FilterType .IS_NOT :
88+ matching_values = values .difference (self ._normalized_values )
89+ if len (matching_values ) != len (values ):
90+ return set ()
91+ return matching_values
92+ if self .type == FilterType .CONTAINS :
93+ return set (
94+ value
95+ for value in values
96+ if any (
97+ filter_value in str (value ).lower ()
98+ for filter_value in self ._normalized_values
99+ )
88100 )
89- elif self .type in ALL_OPERATORS :
90- return all (
91- self ._apply_filter_type (value , filter_value )
92- for filter_value in self .values
101+ if self .type == FilterType .NOT_CONTAINS :
102+ matching_values = set (
103+ value
104+ for value in values
105+ if not any (
106+ filter_value in str (value ).lower ()
107+ for filter_value in self ._normalized_values
108+ )
93109 )
110+ if len (matching_values ) != len (values ):
111+ return set ()
112+ return matching_values
94113 raise ValueError (f"Unsupported filter type: { self .type } " )
95114
96- def apply_filter_on_values (self , values : List [ValueT ]) -> bool :
97- if self .type in ANY_OPERATORS :
98- return any (self .apply_filter_on_value (value ) for value in values )
99- elif self .type in ALL_OPERATORS :
100- return all (self .apply_filter_on_value (value ) for value in values )
101- raise ValueError (f"Unsupported filter type: { self .type } " )
115+ def get_matching_values (self , values : Iterable [ValueT ]) -> Set [str ]:
116+ values_set = FilterSchema .normalize_values (values )
117+ return self .get_matching_normalized_values (values_set )
102118
103- def get_matching_values (self , values : Iterable [ValueT ]) -> Set [ValueT ]:
104- values_list = set (values )
105- matching_values = set (
106- value for value in values_list if self .apply_filter_on_value (value )
107- )
108- if self .type in ANY_OPERATORS :
109- return matching_values
110- elif self .type in ALL_OPERATORS :
111- if len (matching_values ) != len (values_list ):
112- return set ()
113- return matching_values
119+ def apply_filter_on_values (self , values : List [ValueT ]) -> bool :
120+ if self .type in NEGATIVE_OPERATORS and not values :
121+ return True
122+ return bool (self .get_matching_values (values ))
114123
115- raise ValueError (f"Unsupported filter type: { self .type } " )
124+ def apply_filter_on_value (self , value : ValueT ) -> bool :
125+ return self .apply_filter_on_values ([value ])
116126
117127
118128class StatusFilterSchema (FilterSchema [Status ]):
0 commit comments