diff --git a/.vscode/launch.json b/.vscode/launch.json index 6a095aa39..2e1e2e299 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,6 +8,29 @@ "module": "elementary.cli.cli", "console": "integratedTerminal", "args": "${command:pickArgs}" + }, + { + "name": "pytest: Current File", + "type": "debugpy", + "request": "launch", + "module": "pytest", + "args": ["-vvv", "-s", "${file}"], + "console": "integratedTerminal" + }, + { + "name": "pytest: Selector", + "type": "debugpy", + "request": "launch", + "module": "pytest", + "args": ["-vvv", "-s", "${file}::${input:selector}"], + "console": "integratedTerminal" + } + ], + "inputs": [ + { + "id": "selector", + "type": "promptString", + "description": "Selector" } ] } diff --git a/elementary/monitor/api/alerts/alert_filters.py b/elementary/monitor/api/alerts/alert_filters.py index 871a8a16c..779079579 100644 --- a/elementary/monitor/api/alerts/alert_filters.py +++ b/elementary/monitor/api/alerts/alert_filters.py @@ -1,11 +1,9 @@ -from functools import reduce -from typing import List +from typing import List, Optional from elementary.monitor.data_monitoring.schema import ( - FilterSchema, FiltersSchema, - ResourceTypeFilterSchema, - StatusFilterSchema, + ResourceType, + Status, ) from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( AlertTypes, @@ -16,6 +14,61 @@ logger = get_logger(__name__) +def get_string_ends(input_string: str, splitter: str) -> List[str]: + parts = input_string.split(splitter) + result = [] + + for i in range(len(parts)): + result.append(splitter.join(parts[i:])) + + return result + + +def _get_alert_node_name(alert: PendingAlertSchema) -> Optional[str]: + alert_node_name = None + alert_type = AlertTypes(alert.type) + if alert_type is AlertTypes.TEST: + alert_node_name = alert.data.test_name # type: ignore[union-attr] + elif alert_type is AlertTypes.MODEL or alert_type is AlertTypes.SOURCE_FRESHNESS: + alert_node_name = alert.data.model_unique_id + else: + raise ValueError(f"Unexpected alert type: {alert_type}") + return alert_node_name + + +def apply_filters_schema_on_alert( + alert: PendingAlertSchema, filters_schema: FiltersSchema +) -> bool: + tags = alert.data.tags or [] + models = ( + [ + alert.data.model_unique_id, + *get_string_ends(alert.data.model_unique_id, "."), + ] + if alert.data.model_unique_id + else [] + ) + owners = alert.data.unified_owners or [] + status = Status(alert.data.status) + resource_type = ResourceType(alert.data.resource_type) + + alert_node_name = _get_alert_node_name(alert) + node_names = ( + [alert_node_name, *get_string_ends(alert_node_name, ".")] + if alert_node_name + else [] + ) + + return filters_schema.apply( + tags=tags, + models=models, + owners=owners, + statuses=[status], + resource_types=[resource_type], + node_names=node_names, + ) + + def filter_alerts( alerts: List[PendingAlertSchema], alerts_filter: FiltersSchema = FiltersSchema(), @@ -29,188 +82,6 @@ def filter_alerts( logger.warning("Invalid filter for alerts: %s", alerts_filter.selector) return [] # type: ignore[return-value] - # If the filter is empty, we want to return all of the alerts - filtered_alerts = alerts - filtered_alerts = _filter_alerts_by_tags(filtered_alerts, alerts_filter.tags) - filtered_alerts = _filter_alerts_by_models(filtered_alerts, alerts_filter.models) - filtered_alerts = _filter_alerts_by_owners(filtered_alerts, alerts_filter.owners) - filtered_alerts = _filter_alerts_by_statuses( - filtered_alerts, alerts_filter.statuses - ) - filtered_alerts = _filter_alerts_by_resource_types( - filtered_alerts, alerts_filter.resource_types - ) - if alerts_filter.node_names: - filtered_alerts = _filter_alerts_by_node_names( - filtered_alerts, alerts_filter.node_names - ) - - return filtered_alerts - - -def _find_common_alerts( - first_alerts: List[PendingAlertSchema], - second_alerts: List[PendingAlertSchema], -) -> List[PendingAlertSchema]: - first_alert_ids = [alert.id for alert in first_alerts] - second_alert_ids = [alert.id for alert in second_alerts] - common_alert_ids = list(set(first_alert_ids) & set(second_alert_ids)) - - common_alerts = [] - # To handle dedupping common alerts - alert_ids_already_handled = [] - - for alert in [*first_alerts, *second_alerts]: - if alert.id in common_alert_ids and alert.id not in alert_ids_already_handled: - common_alerts.append(alert) - alert_ids_already_handled.append(alert.id) - return common_alerts - - -def _filter_alerts_by_tags( - alerts: List[PendingAlertSchema], - tags_filters: List[FilterSchema], -) -> List[PendingAlertSchema]: - if not tags_filters: - return [*alerts] - - grouped_filtered_alerts_by_tags = [] - - # OR filter for each tags_filter's values - for tags_filter in tags_filters: - filtered_alerts_by_tags = [] - for alert in alerts: - if any(tag in (alert.data.tags or []) for tag in tags_filter.values): - filtered_alerts_by_tags.append(alert) - grouped_filtered_alerts_by_tags.append(filtered_alerts_by_tags) - - # AND filter between all tags_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_tags) - - -def _filter_alerts_by_owners( - alerts: List[PendingAlertSchema], - owners_filters: List[FilterSchema], -) -> List[PendingAlertSchema]: - if not owners_filters: - return [*alerts] - - grouped_filtered_alerts_by_owners = [] - - # OR filter for each owners_filter's values - for owners_filter in owners_filters: - filtered_alerts_by_owners = [] - for alert in alerts: - if any( - owner in alert.data.unified_owners for owner in owners_filter.values - ): - filtered_alerts_by_owners.append(alert) - grouped_filtered_alerts_by_owners.append(filtered_alerts_by_owners) - - # AND filter between all owners_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_owners) - - -def _filter_alerts_by_models( - alerts: List[PendingAlertSchema], - models_filters: List[FilterSchema], -) -> List[PendingAlertSchema]: - if not models_filters: - return [*alerts] - - grouped_filtered_alerts_by_models = [] - - # OR filter for each models_filter's values - for models_filter in models_filters: - filtered_alerts_by_models = [] - for alert in alerts: - if any( - ( - alert.data.model_unique_id - and alert.data.model_unique_id.endswith(model) - ) - for model in models_filter.values - ): - filtered_alerts_by_models.append(alert) - grouped_filtered_alerts_by_models.append(filtered_alerts_by_models) - - # AND filter between all models_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_models) - - -def _filter_alerts_by_node_names( - alerts: List[PendingAlertSchema], - node_names_filters: List[str], -) -> List[PendingAlertSchema]: - if not node_names_filters: - return [*alerts] - - filtered_alerts = [] - for alert in alerts: - alert_node_name = None - alert_type = AlertTypes(alert.type) - if alert_type is AlertTypes.TEST: - alert_node_name = alert.data.test_name # type: ignore[union-attr] - elif ( - alert_type is AlertTypes.MODEL or alert_type is AlertTypes.SOURCE_FRESHNESS - ): - alert_node_name = alert.data.model_unique_id - else: - # Shouldn't happen - raise Exception(f"Unexpected alert type: {type(alert)}") - - if alert_node_name: - for node_name in node_names_filters: - if alert_node_name.endswith(node_name) or node_name.endswith( - alert_node_name - ): - filtered_alerts.append(alert) - break - return filtered_alerts # type: ignore[return-value] - - -def _filter_alerts_by_statuses( - alerts: List[PendingAlertSchema], - statuses_filters: List[StatusFilterSchema], -) -> List[PendingAlertSchema]: - if not statuses_filters: - return [*alerts] - - grouped_filtered_alerts_by_statuses = [] - - # OR filter for each statuses_filter's values - for statuses_filter in statuses_filters: - filtered_alerts_by_statuses = [] - for alert in alerts: - if any(status == alert.data.status for status in statuses_filter.values): - filtered_alerts_by_statuses.append(alert) - grouped_filtered_alerts_by_statuses.append(filtered_alerts_by_statuses) - - # AND filter between all statuses_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_statuses) - - -def _filter_alerts_by_resource_types( - alerts: List[PendingAlertSchema], - resource_types_filters: List[ResourceTypeFilterSchema], -) -> List[PendingAlertSchema]: - if not resource_types_filters: - return [*alerts] - - grouped_filtered_alerts_by_resource_types = [] - - # OR filter for each resource_types_filter's values - for resource_types_filter in resource_types_filters: - filtered_alerts_by_resource_types = [] - for alert in alerts: - if any( - resource_type == alert.data.resource_type.value - for resource_type in resource_types_filter.values - ): - filtered_alerts_by_resource_types.append(alert) - grouped_filtered_alerts_by_resource_types.append( - filtered_alerts_by_resource_types - ) - - # AND filter between all resource_types_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_resource_types) + return [ + alert for alert in alerts if apply_filters_schema_on_alert(alert, alerts_filter) + ] diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 47d8a9e84..06d2f61bc 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -1,7 +1,7 @@ import re from datetime import datetime from enum import Enum -from typing import Any, List, Optional, Pattern, Tuple +from typing import Any, Generic, List, Optional, Pattern, Tuple, TypeVar from elementary.utils.log import get_logger from elementary.utils.pydantic_shim import BaseModel, Field, validator @@ -14,7 +14,7 @@ class InvalidSelectorError(Exception): pass -class Status(Enum): +class Status(str, Enum): WARN = "warn" FAIL = "fail" SKIPPED = "skipped" @@ -22,38 +22,80 @@ class Status(Enum): RUNTIME_ERROR = "runtime error" -class ResourceType(Enum): +class ResourceType(str, Enum): TEST = "test" MODEL = "model" SOURCE_FRESHNESS = "source_freshness" -class SupportedFilterTypes(Enum): +class FilterType(str, Enum): IS = "is" + IS_NOT = "is_not" + CONTAINS = "contains" -class FilterSchema(BaseModel): +def apply_filter(filter_type: FilterType, value: Any, filter_value: Any) -> bool: + if filter_type == FilterType.IS: + return value == filter_value + elif filter_type == FilterType.IS_NOT: + return value != filter_value + elif filter_type == FilterType.CONTAINS: + return str(filter_value).lower() in str(value).lower() + raise ValueError(f"Unsupported filter type: {filter_type}") + + +ValueT = TypeVar("ValueT") + + +ANY_OPERATORS = [FilterType.IS, FilterType.CONTAINS] +ALL_OPERATORS = [FilterType.IS_NOT] + + +class FilterSchema(BaseModel, Generic[ValueT]): # The relation between values is OR. - values: List[Any] - type: SupportedFilterTypes = SupportedFilterTypes.IS + values: List[ValueT] + type: FilterType = FilterType.IS class Config: # Make sure that serializing Enum return values use_enum_values = True + def _apply_filter_type(self, value: ValueT, filter_value: ValueT) -> bool: + return apply_filter(self.type, value, filter_value) + + def apply_filter_on_value(self, value: ValueT) -> bool: + if self.type in ANY_OPERATORS: + return any( + self._apply_filter_type(value, filter_value) + for filter_value in self.values + ) + elif self.type in ALL_OPERATORS: + return all( + self._apply_filter_type(value, filter_value) + for filter_value in self.values + ) + raise ValueError(f"Unsupported filter type: {self.type}") + + def apply_filter_on_values(self, values: List[ValueT]) -> bool: + if self.type in ANY_OPERATORS: + return any(self.apply_filter_on_value(value) for value in values) + elif self.type in ALL_OPERATORS: + return all(self.apply_filter_on_value(value) for value in values) + raise ValueError(f"Unsupported filter type: {self.type}") -class StatusFilterSchema(FilterSchema): + +class StatusFilterSchema(FilterSchema[Status]): values: List[Status] -class ResourceTypeFilterSchema(FilterSchema): +class ResourceTypeFilterSchema(FilterSchema[ResourceType]): values: List[ResourceType] def _get_default_statuses_filter() -> List[StatusFilterSchema]: return [ StatusFilterSchema( - type=SupportedFilterTypes.IS, + type=FilterType.IS, values=[Status.FAIL, Status.ERROR, Status.RUNTIME_ERROR, Status.WARN], ) ] @@ -73,7 +115,7 @@ class FiltersSchema(BaseModel): resource_types: List[ResourceTypeFilterSchema] = Field(default_factory=list) @validator("invocation_time", pre=True) - def format_invocation_time(cls, invocation_time): + def format_invocation_time(cls, invocation_time) -> Optional[str]: if invocation_time: try: invocation_datetime = convert_local_time_to_timezone( @@ -87,7 +129,7 @@ def format_invocation_time(cls, invocation_time): raise return None - def validate_report_selector(self): + def validate_report_selector(self) -> None: # If we start supporting multiple selectors we need to change this logic if not self.selector: return @@ -205,6 +247,45 @@ def to_selector_filter_schema(self) -> "SelectorFilterSchema": resource_types=resource_types, ) + def apply( + self, + tags: List[str], + models: List[str], + owners: List[str], + statuses: List[Status], + resource_types: List[ResourceType], + node_names: List[str], + ) -> bool: + return ( + all( + filter_schema.apply_filter_on_values(tags) + for filter_schema in self.tags + ) + and all( + filter_schema.apply_filter_on_values(models) + for filter_schema in self.models + ) + and all( + filter_schema.apply_filter_on_values(owners) + for filter_schema in self.owners + ) + and all( + filter_schema.apply_filter_on_values(statuses) + for filter_schema in self.statuses + ) + and all( + filter_schema.apply_filter_on_values(resource_types) + for filter_schema in self.resource_types + ) + and ( + FilterSchema( + values=self.node_names, type=FilterType.IS + ).apply_filter_on_values(node_names) + if self.node_names + else True + ) + ) + class SelectorFilterSchema(BaseModel): selector: Optional[str] = None diff --git a/elementary/monitor/fetchers/alerts/schema/pending_alerts.py b/elementary/monitor/fetchers/alerts/schema/pending_alerts.py index 677149990..7dbf1affc 100644 --- a/elementary/monitor/fetchers/alerts/schema/pending_alerts.py +++ b/elementary/monitor/fetchers/alerts/schema/pending_alerts.py @@ -68,6 +68,18 @@ def parse_data(cls, values: dict) -> dict: new_values = {**values} alert_type = AlertTypes(values.get("type")) + data = values.get("data") + + if ( + alert_type is AlertTypes.TEST + and isinstance(data, TestAlertDataSchema) + or alert_type is AlertTypes.MODEL + and isinstance(data, ModelAlertDataSchema) + or alert_type is AlertTypes.SOURCE_FRESHNESS + and isinstance(data, SourceFreshnessAlertDataSchema) + ): + return values + raw_data = try_load_json(values.get("data")) data = None diff --git a/tests/unit/monitor/api/alerts/test_alert_filters.py b/tests/unit/monitor/api/alerts/test_alert_filters.py index 572b81f17..c53ba7fea 100644 --- a/tests/unit/monitor/api/alerts/test_alert_filters.py +++ b/tests/unit/monitor/api/alerts/test_alert_filters.py @@ -1,23 +1,22 @@ -from elementary.monitor.api.alerts.alert_filters import ( - _filter_alerts_by_models, - _filter_alerts_by_node_names, - _filter_alerts_by_owners, - _filter_alerts_by_resource_types, - _filter_alerts_by_statuses, - _filter_alerts_by_tags, - _find_common_alerts, - filter_alerts, -) +from datetime import datetime + +from elementary.monitor.api.alerts.alert_filters import filter_alerts from elementary.monitor.data_monitoring.schema import ( FilterSchema, FiltersSchema, + FilterType, ResourceType, ResourceTypeFilterSchema, Status, StatusFilterSchema, - SupportedFilterTypes, +) +from elementary.monitor.fetchers.alerts.schema.alert_data import ( + ModelAlertDataSchema, + SourceFreshnessAlertDataSchema, + TestAlertDataSchema, ) from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( + AlertStatus, AlertTypes, PendingAlertSchema, ) @@ -29,129 +28,125 @@ def initial_alerts(): id="test_alert_1", alert_class_id="test_id_1", type=AlertTypes.TEST, - detected_at="2022-10-10 10:00:00", - created_at="2022-10-10 10:00:00", - updated_at="2022-10-10 10:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 10, 0, 0), + created_at=datetime(2022, 10, 10, 10, 0, 0), + updated_at=datetime(2022, 10, 10, 10, 0, 0), + status=AlertStatus.PENDING, + data=TestAlertDataSchema( id="1", alert_class_id="test_id_1", model_unique_id="elementary.model_id_1", test_unique_id="test_id_1", test_name="test_1", - test_created_at="2022-10-10 10:10:10", - tags='["one", "two"]', + tags=["one", "two"], model_meta=dict(owner='["jeff", "john"]'), status="fail", elementary_unique_id="elementary.model_id_1.test_id_1.9cf2f5f6ad.None.generic", - detected_at="2022-10-10 10:00:00", + detected_at=datetime(2022, 10, 10, 10, 0, 0), database_name="test_db", schema_name="test_schema", table_name="table", - suppression_status="pending", test_type="dbt_test", test_sub_type="generic", test_results_description="a mock alert", test_results_query="select * from table", test_short_name="test_1", severity="ERROR", + resource_type=ResourceType.TEST, ), ), PendingAlertSchema( id="test_alert_2", alert_class_id="test_id_2", type=AlertTypes.TEST, - detected_at="2022-10-10 10:00:00", - created_at="2022-10-10 10:00:00", - updated_at="2022-10-10 10:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 10, 0, 0), + created_at=datetime(2022, 10, 10, 10, 0, 0), + updated_at=datetime(2022, 10, 10, 10, 0, 0), + status=AlertStatus.PENDING, + data=TestAlertDataSchema( id="2", alert_class_id="test_id_2", model_unique_id="elementary.model_id_1", test_unique_id="test_id_2", test_name="test_2", - test_created_at="2022-10-10 09:10:10", - tags='["three"]', + tags=["three"], model_meta=dict(owner='["jeff", "john"]'), status="fail", elementary_unique_id="elementary.model_id_1.test_id_2.9cf2f5f6ad.None.generic", - detected_at="2022-10-10 10:00:00", + detected_at=datetime(2022, 10, 10, 10, 0, 0), database_name="test_db", schema_name="test_schema", table_name="table", - suppression_status="pending", test_type="dbt_test", test_sub_type="generic", test_results_description="a mock alert", test_results_query="select * from table", test_short_name="test_2", severity="ERROR", + resource_type=ResourceType.TEST, ), ), PendingAlertSchema( id="test_alert_3", alert_class_id="test_id_3", type=AlertTypes.TEST, - detected_at="2022-10-10 10:00:00", - created_at="2022-10-10 10:00:00", - updated_at="2022-10-10 10:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 10, 0, 0), + created_at=datetime(2022, 10, 10, 10, 0, 0), + updated_at=datetime(2022, 10, 10, 10, 0, 0), + status=AlertStatus.PENDING, + data=TestAlertDataSchema( id="3", alert_class_id="test_id_3", model_unique_id="elementary.model_id_2", test_unique_id="test_id_3", test_name="test_3", - test_created_at="2022-10-10 10:10:10", # invalid tag - tags="one", + tags="one", # type: ignore[arg-type] model_meta=dict(owner='["john"]'), status="fail", elementary_unique_id="elementary.model_id_1.test_id_3.9cf2f5f6ad.None.generic", - detected_at="2022-10-10 10:00:00", + detected_at=datetime(2022, 10, 10, 10, 0, 0), database_name="test_db", schema_name="test_schema", table_name="table", - suppression_status="pending", test_type="dbt_test", test_sub_type="generic", test_results_description="a mock alert", test_results_query="select * from table", test_short_name="test_3", severity="ERROR", + resource_type=ResourceType.TEST, ), ), PendingAlertSchema( id="test_alert_4", alert_class_id="test_id_4", type=AlertTypes.TEST, - detected_at="2022-10-10 10:00:00", - created_at="2022-10-10 10:00:00", - updated_at="2022-10-10 10:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 10, 0, 0), + created_at=datetime(2022, 10, 10, 10, 0, 0), + updated_at=datetime(2022, 10, 10, 10, 0, 0), + status=AlertStatus.PENDING, + data=TestAlertDataSchema( id="4", alert_class_id="test_id_4", model_unique_id="elementary.model_id_2", test_unique_id="test_id_4", test_name="test_4", - test_created_at="2022-10-10 09:10:10", - tags='["three", "four"]', + tags=["three", "four"], model_meta=dict(owner='["jeff"]'), status="warn", elementary_unique_id="elementary.model_id_1.test_id_4.9cf2f5f6ad.None.generic", - detected_at="2022-10-10 10:00:00", + detected_at=datetime(2022, 10, 10, 10, 0, 0), database_name="test_db", schema_name="test_schema", table_name="table", - suppression_status="pending", test_type="dbt_test", test_sub_type="generic", test_results_description="a mock alert", test_results_query="select * from table", test_short_name="test_4", severity="ERROR", + resource_type=ResourceType.TEST, ), ), ] @@ -160,11 +155,11 @@ def initial_alerts(): id="model_alert_1", alert_class_id="elementary.model_id_1", type=AlertTypes.MODEL, - detected_at="2022-10-10 10:00:00", - created_at="2022-10-10 10:00:00", - updated_at="2022-10-10 10:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 10, 0, 0), + created_at=datetime(2022, 10, 10, 10, 0, 0), + updated_at=datetime(2022, 10, 10, 10, 0, 0), + status=AlertStatus.PENDING, + data=ModelAlertDataSchema( id="1", alert_class_id="elementary.model_id_1", model_unique_id="elementary.model_id_1", @@ -174,25 +169,24 @@ def initial_alerts(): materialization="table", message="", full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', + detected_at=datetime(2022, 10, 10, 10, 0, 0), + tags=["one", "two"], model_meta=dict(owner='["jeff", "john"]'), status="error", database_name="test_db", schema_name="test_schema", - suppression_status="pending", + resource_type=ResourceType.MODEL, ), ), PendingAlertSchema( id="model_alert_2", alert_class_id="elementary.model_id_1", type=AlertTypes.MODEL, - detected_at="2022-10-10 10:00:00", - created_at="2022-10-10 10:00:00", - updated_at="2022-10-10 10:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 10, 0, 0), + created_at=datetime(2022, 10, 10, 10, 0, 0), + updated_at=datetime(2022, 10, 10, 10, 0, 0), + status=AlertStatus.PENDING, + data=ModelAlertDataSchema( id="2", alert_class_id="elementary.model_id_1", model_unique_id="elementary.model_id_1", @@ -202,25 +196,24 @@ def initial_alerts(): materialization="table", message="", full_refresh=False, - detected_at="2022-10-10 09:00:00", - alert_suppression_interval=3, - tags='["three"]', + detected_at=datetime(2022, 10, 10, 9, 0, 0), + tags=["three"], model_meta=dict(owner='["john"]'), status="error", database_name="test_db", schema_name="test_schema", - suppression_status="pending", + resource_type=ResourceType.MODEL, ), ), PendingAlertSchema( id="model_alert_3", alert_class_id="elementary.model_id_2", type=AlertTypes.MODEL, - detected_at="2022-10-10 08:00:00", - created_at="2022-10-10 08:00:00", - updated_at="2022-10-10 08:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 8, 0, 0), + created_at=datetime(2022, 10, 10, 8, 0, 0), + updated_at=datetime(2022, 10, 10, 8, 0, 0), + status=AlertStatus.PENDING, + data=ModelAlertDataSchema( id="3", alert_class_id="elementary.model_id_2", model_unique_id="elementary.model_id_2", @@ -230,14 +223,13 @@ def initial_alerts(): materialization="table", message="", full_refresh=False, - detected_at="2022-10-10 08:00:00", - alert_suppression_interval=1, - tags='["three", "four"]', + detected_at=datetime(2022, 10, 10, 8, 0, 0), + tags=["three", "four"], model_meta=dict(owner='["jeff"]'), status="skipped", database_name="test_db", schema_name="test_schema", - suppression_status="pending", + resource_type=ResourceType.MODEL, ), ), ] @@ -246,30 +238,24 @@ def initial_alerts(): id="freshness_alert_1", alert_class_id="elementary.model_id_1", type=AlertTypes.SOURCE_FRESHNESS, - detected_at="2022-10-10 08:00:00", - created_at="2022-10-10 08:00:00", - updated_at="2022-10-10 08:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 8, 0, 0), + created_at=datetime(2022, 10, 10, 8, 0, 0), + updated_at=datetime(2022, 10, 10, 8, 0, 0), + status=AlertStatus.PENDING, + data=SourceFreshnessAlertDataSchema( id="1", source_freshness_execution_id="1", alert_class_id="elementary.model_id_1", model_unique_id="elementary.model_id_1", - alias="modely", path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', + detected_at=datetime(2022, 10, 10, 10, 0, 0), + tags=["one", "two"], model_meta=dict(owner='["jeff", "john"]'), original_status="error", status="fail", - snapshotted_at="2023-08-15T12:26:06.884065+00:00", - max_loaded_at="1969-12-31T00:00:00+00:00", - max_loaded_at_time_ago_in_s=1692188766.884065, + snapshotted_at=datetime(2023, 8, 15, 12, 26, 6, 884065), + max_loaded_at=datetime(1969, 12, 31, 0, 0, 0), + max_loaded_at_time_ago_in_s=1692188766, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", error_after='{"count": null, "period": null}', @@ -278,37 +264,31 @@ def initial_alerts(): error="problemz", database_name="test_db", schema_name="test_schema", - suppression_status="pending", + resource_type=ResourceType.SOURCE_FRESHNESS, ), ), PendingAlertSchema( id="freshness_alert_2", alert_class_id="elementary.model_id_2", type=AlertTypes.SOURCE_FRESHNESS, - detected_at="2022-10-10 08:00:00", - created_at="2022-10-10 08:00:00", - updated_at="2022-10-10 08:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 8, 0, 0), + created_at=datetime(2022, 10, 10, 8, 0, 0), + updated_at=datetime(2022, 10, 10, 8, 0, 0), + status=AlertStatus.PENDING, + data=SourceFreshnessAlertDataSchema( id="2", source_freshness_execution_id="2", alert_class_id="elementary.model_id_2", model_unique_id="elementary.model_id_2", - alias="modely", path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', + detected_at=datetime(2022, 10, 10, 10, 0, 0), + tags=["one", "two"], model_meta=dict(owner='["jeff", "john"]'), - status="warn", original_status="warn", - snapshotted_at="2023-08-15T12:26:06.884065+00:00", - max_loaded_at="1969-12-31T00:00:00+00:00", - max_loaded_at_time_ago_in_s=1692188766.884065, + status="warn", + snapshotted_at=datetime(2023, 8, 15, 12, 26, 6, 884065), + max_loaded_at=datetime(1969, 12, 31, 0, 0, 0), + max_loaded_at_time_ago_in_s=1692188766, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", error_after='{"count": null, "period": null}', @@ -317,37 +297,31 @@ def initial_alerts(): error="problemz", database_name="test_db", schema_name="test_schema", - suppression_status="pending", + resource_type=ResourceType.SOURCE_FRESHNESS, ), ), PendingAlertSchema( id="freshness_alert_3", alert_class_id="elementary.model_id_3", type=AlertTypes.SOURCE_FRESHNESS, - detected_at="2022-10-10 08:00:00", - created_at="2022-10-10 08:00:00", - updated_at="2022-10-10 08:00:00", - status="pending", - data=dict( + detected_at=datetime(2022, 10, 10, 8, 0, 0), + created_at=datetime(2022, 10, 10, 8, 0, 0), + updated_at=datetime(2022, 10, 10, 8, 0, 0), + status=AlertStatus.PENDING, + data=SourceFreshnessAlertDataSchema( id="3", source_freshness_execution_id="3", alert_class_id="elementary.model_id_3", model_unique_id="elementary.model_id_3", - alias="modely", path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', + detected_at=datetime(2022, 10, 10, 10, 0, 0), + tags=["one", "two"], model_meta=dict(owner='["jeff", "john"]'), original_status="runtime error", status="error", - snapshotted_at="2023-08-15T12:26:06.884065+00:00", - max_loaded_at="1969-12-31T00:00:00+00:00", - max_loaded_at_time_ago_in_s=1692188766.884065, + snapshotted_at=datetime(2023, 8, 15, 12, 26, 6, 884065), + max_loaded_at=datetime(1969, 12, 31, 0, 0, 0), + max_loaded_at_time_ago_in_s=1692188766, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", error_after='{"count": null, "period": null}', @@ -356,38 +330,21 @@ def initial_alerts(): error="problemz", database_name="test_db", schema_name="test_schema", - suppression_status="pending", + resource_type=ResourceType.SOURCE_FRESHNESS, ), ), ] return test_alerts, model_alerts, source_freshness_alerts -def test_find_common_alerts(): - test_alerts, model_alerts, _ = initial_alerts() - - common_alerts = _find_common_alerts(test_alerts, model_alerts) - assert len(common_alerts) == 0 - - common_alerts = _find_common_alerts( - [test_alerts[0], test_alerts[1], test_alerts[2]], - [test_alerts[0], test_alerts[2], test_alerts[3]], - ) - assert len(common_alerts) == 2 - assert sorted([alert.id for alert in common_alerts]) == [ - "test_alert_1", - "test_alert_3", - ] - - def test_filter_alerts_by_tags(): test_alerts, model_alerts, _ = initial_alerts() filter = FiltersSchema( - tags=[FilterSchema(values=["one"], type=SupportedFilterTypes.IS)] + tags=[FilterSchema(values=["one"], type=FilterType.IS)], statuses=[] ) - filter_test_alerts = _filter_alerts_by_tags(test_alerts, filter.tags) - filter_model_alerts = _filter_alerts_by_tags(model_alerts, filter.tags) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 2 assert filter_test_alerts[0].id == "test_alert_1" assert filter_test_alerts[1].id == "test_alert_3" @@ -395,10 +352,22 @@ def test_filter_alerts_by_tags(): assert filter_model_alerts[0].id == "model_alert_1" filter = FiltersSchema( - tags=[FilterSchema(values=["three"], type=SupportedFilterTypes.IS)] + tags=[FilterSchema(values=["one"], type=FilterType.IS_NOT)], statuses=[] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert filter_test_alerts[0].id == "test_alert_2" + assert filter_test_alerts[1].id == "test_alert_4" + assert len(filter_model_alerts) == 2 + assert filter_model_alerts[0].id == "model_alert_2" + assert filter_model_alerts[1].id == "model_alert_3" + + filter = FiltersSchema( + tags=[FilterSchema(values=["three"], type=FilterType.IS)], statuses=[] ) - filter_test_alerts = _filter_alerts_by_tags(test_alerts, filter.tags) - filter_model_alerts = _filter_alerts_by_tags(model_alerts, filter.tags) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 2 assert filter_test_alerts[0].id == "test_alert_2" assert filter_test_alerts[1].id == "test_alert_4" @@ -407,23 +376,52 @@ def test_filter_alerts_by_tags(): assert filter_model_alerts[1].id == "model_alert_3" filter = FiltersSchema( - tags=[FilterSchema(values=["four"], type=SupportedFilterTypes.IS)] + tags=[FilterSchema(values=["three"], type=FilterType.IS_NOT)], statuses=[] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert filter_test_alerts[0].id == "test_alert_1" + assert filter_test_alerts[1].id == "test_alert_3" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_1" + + filter = FiltersSchema( + tags=[FilterSchema(values=["four"], type=FilterType.IS)], statuses=[] ) - filter_test_alerts = _filter_alerts_by_tags(test_alerts, filter.tags) - filter_model_alerts = _filter_alerts_by_tags(model_alerts, filter.tags) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 1 assert filter_test_alerts[0].id == "test_alert_4" assert len(filter_model_alerts) == 1 assert filter_model_alerts[0].id == "model_alert_3" + filter = FiltersSchema( + tags=[FilterSchema(values=["four"], type=FilterType.IS_NOT)], statuses=[] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 3 + assert sorted([alert.id for alert in filter_test_alerts]) == [ + "test_alert_1", + "test_alert_2", + "test_alert_3", + ] + assert len(filter_model_alerts) == 2 + assert sorted([alert.id for alert in filter_model_alerts]) == [ + "model_alert_1", + "model_alert_2", + ] + filter = FiltersSchema( tags=[ - FilterSchema(values=["one"], type=SupportedFilterTypes.IS), - FilterSchema(values=["two"], type=SupportedFilterTypes.IS), - ] + FilterSchema(values=["one"], type=FilterType.IS), + FilterSchema(values=["two"], type=FilterType.IS), + ], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_tags(test_alerts, filter.tags) - filter_model_alerts = _filter_alerts_by_tags(model_alerts, filter.tags) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 1 assert filter_test_alerts[0].id == "test_alert_1" assert len(filter_model_alerts) == 1 @@ -431,22 +429,58 @@ def test_filter_alerts_by_tags(): filter = FiltersSchema( tags=[ - FilterSchema(values=["one"], type=SupportedFilterTypes.IS), - FilterSchema(values=["four"], type=SupportedFilterTypes.IS), - ] + FilterSchema(values=["one"], type=FilterType.IS_NOT), + FilterSchema(values=["two"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert sorted([alert.id for alert in filter_test_alerts]) == [ + "test_alert_2", + "test_alert_4", + ] + assert len(filter_model_alerts) == 2 + assert sorted([alert.id for alert in filter_model_alerts]) == [ + "model_alert_2", + "model_alert_3", + ] + + filter = FiltersSchema( + tags=[ + FilterSchema(values=["one"], type=FilterType.IS), + FilterSchema(values=["four"], type=FilterType.IS), + ], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_tags(test_alerts, filter.tags) - filter_model_alerts = _filter_alerts_by_tags(model_alerts, filter.tags) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 0 assert len(filter_model_alerts) == 0 filter = FiltersSchema( tags=[ - FilterSchema(values=["one", "four"], type=SupportedFilterTypes.IS), - ] + FilterSchema(values=["one"], type=FilterType.IS_NOT), + FilterSchema(values=["four"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 1 + assert filter_test_alerts[0].id == "test_alert_2" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_2" + + filter = FiltersSchema( + tags=[ + FilterSchema(values=["one", "four"], type=FilterType.IS), + ], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_tags(test_alerts, filter.tags) - filter_model_alerts = _filter_alerts_by_tags(model_alerts, filter.tags) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 3 assert sorted([alert.id for alert in filter_test_alerts]) == [ "test_alert_1", @@ -459,15 +493,62 @@ def test_filter_alerts_by_tags(): "model_alert_3", ] + filter = FiltersSchema( + tags=[ + FilterSchema(values=["one", "four"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 1 + assert filter_test_alerts[0].id == "test_alert_2" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_2" + + filter = FiltersSchema( + tags=[ + FilterSchema(values=["one"], type=FilterType.IS), + FilterSchema(values=["three"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert sorted([alert.id for alert in filter_test_alerts]) == [ + "test_alert_1", + "test_alert_3", + ] + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_1" + + filter = FiltersSchema( + tags=[ + FilterSchema(values=["one", "two"], type=FilterType.IS), + FilterSchema(values=["three", "four"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert sorted([alert.id for alert in filter_test_alerts]) == [ + "test_alert_1", + "test_alert_3", + ] + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_1" + def test_filter_alerts_by_owners(): test_alerts, model_alerts, _ = initial_alerts() filter = FiltersSchema( - owners=[FilterSchema(values=["jeff"], type=SupportedFilterTypes.IS)] + owners=[FilterSchema(values=["jeff"], type=FilterType.IS)], statuses=[] ) - filter_test_alerts = _filter_alerts_by_owners(test_alerts, filter.owners) - filter_model_alerts = _filter_alerts_by_owners(model_alerts, filter.owners) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 3 assert filter_test_alerts[0].id == "test_alert_1" assert filter_test_alerts[1].id == "test_alert_2" @@ -477,10 +558,20 @@ def test_filter_alerts_by_owners(): assert filter_model_alerts[1].id == "model_alert_3" filter = FiltersSchema( - owners=[FilterSchema(values=["john"], type=SupportedFilterTypes.IS)] + owners=[FilterSchema(values=["jeff"], type=FilterType.IS_NOT)], statuses=[] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 1 + assert filter_test_alerts[0].id == "test_alert_3" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_2" + + filter = FiltersSchema( + owners=[FilterSchema(values=["john"], type=FilterType.IS)], statuses=[] ) - filter_test_alerts = _filter_alerts_by_owners(test_alerts, filter.owners) - filter_model_alerts = _filter_alerts_by_owners(model_alerts, filter.owners) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 3 assert filter_test_alerts[0].id == "test_alert_1" assert filter_test_alerts[1].id == "test_alert_2" @@ -489,15 +580,62 @@ def test_filter_alerts_by_owners(): assert filter_model_alerts[0].id == "model_alert_1" assert filter_model_alerts[1].id == "model_alert_2" + filter = FiltersSchema( + owners=[FilterSchema(values=["john"], type=FilterType.IS_NOT)], statuses=[] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 1 + assert filter_test_alerts[0].id == "test_alert_4" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_3" + + filter = FiltersSchema( + owners=[ + FilterSchema(values=["jeff"], type=FilterType.IS), + FilterSchema(values=["john"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 1 + assert filter_test_alerts[0].id == "test_alert_4" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_3" + + filter = FiltersSchema( + owners=[ + FilterSchema(values=["jeff", "john"], type=FilterType.IS), + FilterSchema(values=["fake"], type=FilterType.IS_NOT), + ], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 4 + assert sorted([alert.id for alert in filter_test_alerts]) == [ + "test_alert_1", + "test_alert_2", + "test_alert_3", + "test_alert_4", + ] + assert len(filter_model_alerts) == 3 + assert sorted([alert.id for alert in filter_model_alerts]) == [ + "model_alert_1", + "model_alert_2", + "model_alert_3", + ] + def test_filter_alerts_by_model(): test_alerts, model_alerts, _ = initial_alerts() filter = FiltersSchema( - models=[FilterSchema(values=["model_id_1"], type=SupportedFilterTypes.IS)] + models=[FilterSchema(values=["model_id_1"], type=FilterType.IS)], statuses=[] ) - filter_test_alerts = _filter_alerts_by_models(test_alerts, filter.models) - filter_model_alerts = _filter_alerts_by_models(model_alerts, filter.models) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 2 assert filter_test_alerts[0].id == "test_alert_1" assert filter_test_alerts[1].id == "test_alert_2" @@ -506,10 +644,11 @@ def test_filter_alerts_by_model(): assert filter_model_alerts[1].id == "model_alert_2" filter = FiltersSchema( - models=[FilterSchema(values=["model_id_2"], type=SupportedFilterTypes.IS)] + models=[FilterSchema(values=["model_id_1"], type=FilterType.IS_NOT)], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_models(test_alerts, filter.models) - filter_model_alerts = _filter_alerts_by_models(model_alerts, filter.models) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 2 assert filter_test_alerts[0].id == "test_alert_3" assert filter_test_alerts[1].id == "test_alert_4" @@ -517,14 +656,35 @@ def test_filter_alerts_by_model(): assert filter_model_alerts[0].id == "model_alert_3" filter = FiltersSchema( - models=[ - FilterSchema( - values=["model_id_1", "model_id_2"], type=SupportedFilterTypes.IS - ) - ] + models=[FilterSchema(values=["model_id_2"], type=FilterType.IS)], statuses=[] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert filter_test_alerts[0].id == "test_alert_3" + assert filter_test_alerts[1].id == "test_alert_4" + assert len(filter_model_alerts) == 1 + assert filter_model_alerts[0].id == "model_alert_3" + + filter = FiltersSchema( + models=[FilterSchema(values=["model_id_2"], type=FilterType.IS_NOT)], + statuses=[], + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 2 + assert filter_test_alerts[0].id == "test_alert_1" + assert filter_test_alerts[1].id == "test_alert_2" + assert len(filter_model_alerts) == 2 + assert filter_model_alerts[0].id == "model_alert_1" + assert filter_model_alerts[1].id == "model_alert_2" + + filter = FiltersSchema( + models=[FilterSchema(values=["model_id_1", "model_id_2"], type=FilterType.IS)], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_models(test_alerts, filter.models) - filter_model_alerts = _filter_alerts_by_models(model_alerts, filter.models) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 4 assert filter_test_alerts[0].id == "test_alert_1" assert filter_test_alerts[1].id == "test_alert_2" @@ -537,12 +697,13 @@ def test_filter_alerts_by_model(): filter = FiltersSchema( models=[ - FilterSchema(values=["model_id_1"], type=SupportedFilterTypes.IS), - FilterSchema(values=["model_id_2"], type=SupportedFilterTypes.IS), - ] + FilterSchema(values=["model_id_1"], type=FilterType.IS), + FilterSchema(values=["model_id_2"], type=FilterType.IS), + ], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_models(test_alerts, filter.models) - filter_model_alerts = _filter_alerts_by_models(model_alerts, filter.models) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 0 assert len(filter_model_alerts) == 0 @@ -550,25 +711,25 @@ def test_filter_alerts_by_model(): def test_filter_alerts_by_node_names(): test_alerts, model_alerts, _ = initial_alerts() - filter = FiltersSchema(node_names=["test_3", "model_id_1"]) - filter_test_alerts = _filter_alerts_by_node_names(test_alerts, filter.node_names) - filter_model_alerts = _filter_alerts_by_node_names(model_alerts, filter.node_names) + filter = FiltersSchema(node_names=["test_3", "model_id_1"], statuses=[]) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 1 assert filter_test_alerts[0].id == "test_alert_3" assert len(filter_model_alerts) == 2 assert filter_model_alerts[0].id == "model_alert_1" assert filter_model_alerts[1].id == "model_alert_2" - filter = FiltersSchema(node_names=["model_id_2"]) - filter_test_alerts = _filter_alerts_by_node_names(test_alerts, filter.node_names) - filter_model_alerts = _filter_alerts_by_node_names(model_alerts, filter.node_names) + filter = FiltersSchema(node_names=["model_id_2"], statuses=[]) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 0 assert len(filter_model_alerts) == 1 assert filter_model_alerts[0].id == "model_alert_3" - filter = FiltersSchema(node_names=["model_id_3"]) - filter_test_alerts = _filter_alerts_by_node_names(test_alerts, filter.node_names) - filter_model_alerts = _filter_alerts_by_node_names(model_alerts, filter.node_names) + filter = FiltersSchema(node_names=["model_id_3"], statuses=[]) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 0 assert len(filter_model_alerts) == 0 @@ -581,15 +742,11 @@ def test_filter_alerts_by_statuses(): ) = initial_alerts() filter = FiltersSchema( - statuses=[ - StatusFilterSchema(values=[Status.WARN], type=SupportedFilterTypes.IS) - ] - ) - filter_test_alerts = _filter_alerts_by_statuses(test_alerts, filter.statuses) - filter_model_alerts = _filter_alerts_by_statuses(model_alerts, filter.statuses) - filter_source_freshness_alerts = _filter_alerts_by_statuses( - source_freshness_alerts, filter.statuses + statuses=[StatusFilterSchema(values=[Status.WARN], type=FilterType.IS)], ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + filter_source_freshness_alerts = filter_alerts(source_freshness_alerts, filter) assert len(filter_test_alerts) == 1 assert filter_test_alerts[0].id == "test_alert_4" assert len(filter_model_alerts) == 0 @@ -598,12 +755,12 @@ def test_filter_alerts_by_statuses(): filter = FiltersSchema( statuses=[ StatusFilterSchema( - values=[Status.ERROR, Status.SKIPPED], type=SupportedFilterTypes.IS + values=[Status.ERROR, Status.SKIPPED], type=FilterType.IS ) ] ) - filter_test_alerts = _filter_alerts_by_statuses(test_alerts, filter.statuses) - filter_model_alerts = _filter_alerts_by_statuses(model_alerts, filter.statuses) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) assert len(filter_test_alerts) == 0 assert len(filter_model_alerts) == 3 @@ -611,19 +768,30 @@ def test_filter_alerts_by_statuses(): statuses=[ StatusFilterSchema( values=[Status.FAIL, Status.WARN, Status.RUNTIME_ERROR], - type=SupportedFilterTypes.IS, + type=FilterType.IS, ) ] ) - filter_test_alerts = _filter_alerts_by_statuses(test_alerts, filter.statuses) - filter_model_alerts = _filter_alerts_by_statuses(model_alerts, filter.statuses) - filter_source_freshness_alerts = _filter_alerts_by_statuses( - source_freshness_alerts, filter.statuses - ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + filter_source_freshness_alerts = filter_alerts(source_freshness_alerts, filter) assert len(filter_test_alerts) == 4 assert len(filter_model_alerts) == 0 assert len(filter_source_freshness_alerts) == 2 + filter = FiltersSchema( + statuses=[ + StatusFilterSchema( + values=[Status.FAIL, Status.WARN, Status.ERROR], + type=FilterType.IS_NOT, + ) + ] + ) + filter_test_alerts = filter_alerts(test_alerts, filter) + filter_model_alerts = filter_alerts(model_alerts, filter) + assert len(filter_test_alerts) == 0 + assert len(filter_model_alerts) == 1 + def test_filter_alerts_by_resource_types(): test_alerts, model_alerts, _ = initial_alerts() @@ -631,26 +799,29 @@ def test_filter_alerts_by_resource_types(): filter = FiltersSchema( resource_types=[ - ResourceTypeFilterSchema( - values=[ResourceType.TEST], type=SupportedFilterTypes.IS - ) - ] - ) - filter_test_alerts = _filter_alerts_by_resource_types( - all_alerts, filter.resource_types + ResourceTypeFilterSchema(values=[ResourceType.TEST], type=FilterType.IS) + ], + statuses=[], ) + filter_test_alerts = filter_alerts(all_alerts, filter) assert filter_test_alerts == test_alerts filter = FiltersSchema( resource_types=[ - ResourceTypeFilterSchema( - values=[ResourceType.MODEL], type=SupportedFilterTypes.IS - ) - ] + ResourceTypeFilterSchema(values=[ResourceType.MODEL], type=FilterType.IS) + ], + statuses=[], ) - filter_test_alerts = _filter_alerts_by_resource_types( - all_alerts, filter.resource_types + filter_test_alerts = filter_alerts(all_alerts, filter) + assert filter_test_alerts == model_alerts + + filter = FiltersSchema( + resource_types=[ + ResourceTypeFilterSchema(values=[ResourceType.TEST], type=FilterType.IS_NOT) + ], + statuses=[], ) + filter_test_alerts = filter_alerts(all_alerts, filter) assert filter_test_alerts == model_alerts @@ -682,8 +853,8 @@ def test_multi_filters(): test_alerts, _, _ = initial_alerts() filter = FiltersSchema( - tags=[FilterSchema(values=["one", "three"], type=SupportedFilterTypes.IS)], - owners=[FilterSchema(values=["jeff"], type=SupportedFilterTypes.IS)], + tags=[FilterSchema(values=["one", "three"], type=FilterType.IS)], + owners=[FilterSchema(values=["jeff"], type=FilterType.IS)], ) filter_test_alerts = filter_alerts(test_alerts, filter) assert len(filter_test_alerts) == 3 @@ -694,19 +865,19 @@ def test_multi_filters(): ] filter = FiltersSchema( - tags=[FilterSchema(values=["one", "three"], type=SupportedFilterTypes.IS)], - owners=[FilterSchema(values=["fake"], type=SupportedFilterTypes.IS)], + tags=[FilterSchema(values=["one", "three"], type=FilterType.IS)], + owners=[FilterSchema(values=["fake"], type=FilterType.IS)], ) filter_test_alerts = filter_alerts(test_alerts, filter) assert len(filter_test_alerts) == 0 filter = FiltersSchema( - tags=[FilterSchema(values=["one", "three"], type=SupportedFilterTypes.IS)], - owners=[FilterSchema(values=["jeff"], type=SupportedFilterTypes.IS)], + tags=[FilterSchema(values=["one", "three"], type=FilterType.IS)], + owners=[FilterSchema(values=["jeff"], type=FilterType.IS)], statuses=[ StatusFilterSchema( values=[Status.WARN], - type=SupportedFilterTypes.IS, + type=FilterType.IS, ) ], ) @@ -715,12 +886,12 @@ def test_multi_filters(): assert filter_test_alerts[0].id == "test_alert_4" filter = FiltersSchema( - tags=[FilterSchema(values=["one", "three"], type=SupportedFilterTypes.IS)], - owners=[FilterSchema(values=["jeff"], type=SupportedFilterTypes.IS)], + tags=[FilterSchema(values=["one", "three"], type=FilterType.IS)], + owners=[FilterSchema(values=["jeff"], type=FilterType.IS)], statuses=[ StatusFilterSchema( values=[Status.FAIL], - type=SupportedFilterTypes.IS, + type=FilterType.IS, ) ], ) diff --git a/tests/unit/monitor/data_monitoring/test_filter_schema.py b/tests/unit/monitor/data_monitoring/test_filter_schema.py new file mode 100644 index 000000000..34b7de270 --- /dev/null +++ b/tests/unit/monitor/data_monitoring/test_filter_schema.py @@ -0,0 +1,73 @@ +import pytest + +from elementary.monitor.data_monitoring.schema import FilterSchema, FilterType + + +def test_filter_schema_is_operator(): + filter_schema = FilterSchema(values=["test1", "test2"], type=FilterType.IS) + + # Should match when value is in the filter values + assert filter_schema.apply_filter_on_value("test1") is True + assert filter_schema.apply_filter_on_value("test2") is True + + # Should not match when value is not in filter values + assert filter_schema.apply_filter_on_value("test3") is False + + +def test_filter_schema_is_not_operator(): + filter_schema = FilterSchema(values=["test1", "test2"], type=FilterType.IS_NOT) + + # Should not match when value is in the filter values + assert filter_schema.apply_filter_on_value("test1") is False + assert filter_schema.apply_filter_on_value("test2") is False + + # Should match when value is not in filter values + assert filter_schema.apply_filter_on_value("test3") is True + + +def test_filter_schema_apply_filter_on_values_is_operator(): + filter_schema = FilterSchema(values=["test1", "test2"], type=FilterType.IS) + + # Should match when any value matches (ANY_OPERATORS) + assert filter_schema.apply_filter_on_values(["test1", "test3"]) is True + assert filter_schema.apply_filter_on_values(["test3", "test4"]) is False + + +def test_filter_schema_apply_filter_on_values_is_not_operator(): + filter_schema = FilterSchema(values=["test1", "test2"], type=FilterType.IS_NOT) + + # Should match all values for IS_NOT (ALL_OPERATORS) + assert filter_schema.apply_filter_on_values(["test3", "test4"]) is True + assert filter_schema.apply_filter_on_values(["test1", "test3"]) is False + + +def test_filter_schema_invalid_filter_type(): + with pytest.raises(ValueError): + FilterSchema(values=["test1"], type="invalid") # type: ignore[arg-type] + + +def test_filter_schema_contains_operator(): + filter_schema = FilterSchema(values=["test"], type=FilterType.CONTAINS) + + # Should match when value contains the filter value + assert filter_schema.apply_filter_on_value("test123") is True + assert filter_schema.apply_filter_on_value("123test") is True + assert filter_schema.apply_filter_on_value("123test456") is True + + # Should match case-insensitive + assert filter_schema.apply_filter_on_value("TEST123") is True + assert filter_schema.apply_filter_on_value("123TEST") is True + + # Should not match when value doesn't contain filter value + assert filter_schema.apply_filter_on_value("123") is False + + +def test_filter_schema_apply_filter_on_values_contains_operator(): + filter_schema = FilterSchema(values=["test1", "test2"], type=FilterType.CONTAINS) + + # Should match when any value contains any filter value + assert filter_schema.apply_filter_on_values(["abc_test1_def", "xyz"]) is True + assert filter_schema.apply_filter_on_values(["abc", "xyz_test2"]) is True + + # Should not match when no values contain any filter values + assert filter_schema.apply_filter_on_values(["abc", "xyz"]) is False