Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Generator, List, Sequence, Tuple, Union
from typing import Union

from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
Expand Down Expand Up @@ -109,84 +109,6 @@ def send_alert(
) -> bool:
raise NotImplementedError

@staticmethod
def _group_alerts(
alerts: Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
]
],
threshold: int,
) -> Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
]
]:
# Deprecated: the grouping logic is now handled outside of the integration, and the integration only sends the alerts
if not alerts:
return []

flattened_alerts: List[
Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]
] = []
env = None
for alert in alerts:
if isinstance(alert, BaseAlertsGroup):
flattened_alerts.extend(alert.alerts)
if env is None and alert.env is not None:
env = alert.env
else:
flattened_alerts.append(alert)

if len(flattened_alerts) >= threshold:
logger.info(f"Grouping {len(flattened_alerts)} alerts into one")
return [
AlertsGroup(alerts=flattened_alerts, env=env),
]
return alerts

def send_alerts(
self,
alerts: Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
]
],
group_alerts_threshold: int,
*args,
**kwargs,
) -> Generator[
Tuple[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
],
bool,
],
None,
None,
]:
# Deprecated: the grouping logic is now handled outside of the integration, and the integration only sends the alerts
grouped_alerts = self._group_alerts(alerts, group_alerts_threshold)
for alert in grouped_alerts:
if isinstance(alert, BaseAlertsGroup):
sent_successfully = self.send_alert(alert, *args, **kwargs)
for inner_alert in alert.alerts:
yield inner_alert, sent_successfully
else:
yield alert, self.send_alert(alert, *args, **kwargs)

@abstractmethod
def send_test_message(self, *args, **kwargs) -> bool:
raise NotImplementedError

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import pytest

from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
from elementary.monitor.alerts.test_alert import TestAlertModel
from tests.mocks.data_monitoring.alerts.integrations.alerts_data_mock import (
AlertsDataMock,
)
Expand All @@ -20,77 +18,6 @@ def alerts_data_mock() -> AlertsDataMock:
return AlertsDataMock()


def test_group_alerts(base_integration_mock: BaseIntegrationMock):
assert (
base_integration_mock._group_alerts(alerts=[], threshold=1) == []
), "Empty list should return empty list"

alerts = [
TestAlertModel(
id="1",
test_unique_id="1",
elementary_unique_id="1",
test_name="1",
severity="WARN",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="1",
alert_class_id="1",
)
]
grouped = base_integration_mock._group_alerts(alerts=alerts, threshold=2)
assert len(grouped) == 1, "Should return one group"
assert type(grouped[0]) is TestAlertModel, "Should be the same alert"
grouped = base_integration_mock._group_alerts(alerts=alerts, threshold=1)
assert len(grouped) == 1, "Should return one group"
assert type(grouped[0]) is AlertsGroup, "Group should contain alerts"

alerts = [
TestAlertModel(
id="1",
test_unique_id="1",
elementary_unique_id="1",
test_name="1",
severity="WARN",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="1",
alert_class_id="1",
),
TestAlertModel(
id="2",
test_unique_id="2",
elementary_unique_id="2",
test_name="2",
severity="ERROR",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="2",
alert_class_id="2",
),
TestAlertModel(
id="3",
test_unique_id="3",
elementary_unique_id="3",
test_name="3",
severity="ERROR",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="3",
alert_class_id="3",
),
]
grouped = base_integration_mock._group_alerts(alerts=alerts, threshold=2)
assert len(grouped) == 1, "Should return one group"
assert type(grouped[0]) is AlertsGroup, "Group should contain alerts"
assert len(grouped[0].alerts) == 3, "Group should contain all alerts"
grouped = base_integration_mock._group_alerts(alerts=alerts, threshold=5)
assert len(grouped) == 3, "Should return three alerts"
assert type(grouped[0]) is TestAlertModel
assert type(grouped[1]) is TestAlertModel
assert type(grouped[2]) is TestAlertModel


def test_get_alert_template(
base_integration_mock: BaseIntegrationMock, alerts_data_mock: AlertsDataMock
):
Expand Down
Loading