Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 51 additions & 41 deletions elementary/monitor/data_monitoring/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
from datetime import datetime
from enum import Enum
from functools import cached_property
from typing import Any, Generic, Iterable, List, Optional, Pattern, Set, Tuple, TypeVar

from elementary.utils.log import get_logger
Expand Down Expand Up @@ -49,23 +50,12 @@ def normalized_status(self) -> List[Status]:
return [Status(status) for status in self.statuses if status in list(Status)]


def apply_filter(filter_type: FilterType, value: Any, filter_value: Any) -> bool:
if filter_type == FilterType.IS:
return value == filter_value
elif filter_type == FilterType.IS_NOT:
return value != filter_value
elif filter_type == FilterType.CONTAINS:
return str(filter_value).lower() in str(value).lower()
elif filter_type == FilterType.NOT_CONTAINS:
return str(filter_value).lower() not in str(value).lower()
raise ValueError(f"Unsupported filter type: {filter_type}")


ValueT = TypeVar("ValueT")


ANY_OPERATORS = [FilterType.IS, FilterType.CONTAINS]
ALL_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS]
NEGATIVE_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS]


class FilterSchema(BaseModel, Generic[ValueT]):
Expand All @@ -77,42 +67,62 @@ class Config:
# Make sure that serializing Enum return values
use_enum_values = True

def _apply_filter_type(self, value: ValueT, filter_value: ValueT) -> bool:
return apply_filter(self.type, value, filter_value)
@staticmethod
def normalize_value(value: Any) -> str:
if isinstance(value, Enum):
return str(value.value).lower()
return str(value).lower()

def apply_filter_on_value(self, value: ValueT) -> bool:
if self.type in ANY_OPERATORS:
return any(
self._apply_filter_type(value, filter_value)
for filter_value in self.values
@staticmethod
def normalize_values(values: Iterable[ValueT]) -> Set[str]:
return {FilterSchema.normalize_value(value) for value in values}

@cached_property
def _normalized_values(self) -> Set[str]:
return FilterSchema.normalize_values(self.values)

def get_matching_normalized_values(self, values: Set[str]) -> Set[str]:
if self.type == FilterType.IS:
return values.intersection(self._normalized_values)
elif self.type == FilterType.IS_NOT:
matching_values = values.difference(self._normalized_values)
if len(matching_values) != len(values):
return set()
return matching_values
if self.type == FilterType.CONTAINS:
return set(
value
for value in values
if any(
filter_value in str(value).lower()
for filter_value in self._normalized_values
)
)
elif self.type in ALL_OPERATORS:
return all(
self._apply_filter_type(value, filter_value)
for filter_value in self.values
if self.type == FilterType.NOT_CONTAINS:
matching_values = set(
value
for value in values
if not any(
filter_value in str(value).lower()
for filter_value in self._normalized_values
)
)
if len(matching_values) != len(values):
return set()
return matching_values
raise ValueError(f"Unsupported filter type: {self.type}")

def apply_filter_on_values(self, values: List[ValueT]) -> bool:
if self.type in ANY_OPERATORS:
return any(self.apply_filter_on_value(value) for value in values)
elif self.type in ALL_OPERATORS:
return all(self.apply_filter_on_value(value) for value in values)
raise ValueError(f"Unsupported filter type: {self.type}")
def get_matching_values(self, values: Iterable[ValueT]) -> Set[str]:
values_set = FilterSchema.normalize_values(values)
return self.get_matching_normalized_values(values_set)

def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]:
values_list = set(values)
matching_values = set(
value for value in values_list if self.apply_filter_on_value(value)
)
if self.type in ANY_OPERATORS:
return matching_values
elif self.type in ALL_OPERATORS:
if len(matching_values) != len(values_list):
return set()
return matching_values
def apply_filter_on_values(self, values: List[ValueT]) -> bool:
if self.type in NEGATIVE_OPERATORS and not values:
return True
return bool(self.get_matching_values(values))

raise ValueError(f"Unsupported filter type: {self.type}")
def apply_filter_on_value(self, value: ValueT) -> bool:
return self.apply_filter_on_values([value])


class StatusFilterSchema(FilterSchema[Status]):
Expand Down