diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index 856e83c51..df203d076 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import DefaultDict, Dict, List, Optional, Union -from alive_progress import alive_it +from alive_progress import alive_bar from elementary.config.config import Config from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts @@ -251,27 +251,29 @@ def _send_alerts( self.execution_properties["sent_alert_count"] = self.sent_alert_count return - alerts_with_progress_bar = alive_it(alerts, title="Sending alerts") sent_successfully_alerts = [] - for alert, sent_successfully in self.alerts_integration.send_alerts( - alerts_with_progress_bar, self.config.group_alerts_threshold - ): - if sent_successfully: - if isinstance(alert, BaseAlertsGroup): - sent_successfully_alerts.extend(alert.alerts) + + with alive_bar(len(alerts), title="Sending alerts") as bar: + for alert, sent_successfully in self.alerts_integration.send_alerts( + alerts, self.config.group_alerts_threshold + ): + bar() + if sent_successfully: + if isinstance(alert, BaseAlertsGroup): + sent_successfully_alerts.extend(alert.alerts) + else: + sent_successfully_alerts.append(alert) else: - sent_successfully_alerts.append(alert) - else: - if isinstance(alert, BaseAlertsGroup): - for inner_alert in alert.alerts: + if isinstance(alert, BaseAlertsGroup): + for inner_alert in alert.alerts: + logger.error( + f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}" + ) + else: logger.error( - f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}" + f"Could not send the alert - {alert.id}. Full alert: {json.dumps(alert.data)}" ) - else: - logger.error( - f"Could not send the alert - {alert.id}. Full alert: {json.dumps(alert.data)}" - ) - self.success = False + self.success = False # Now update as sent: self.sent_alert_count = len(sent_successfully_alerts) diff --git a/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py b/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py index 4f2015841..4c6970c3e 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py @@ -109,8 +109,8 @@ def send_alert( ) -> bool: raise NotImplementedError + @staticmethod def _group_alerts( - self, alerts: Sequence[ Union[ TestAlertModel, @@ -129,6 +129,9 @@ def _group_alerts( AlertsGroup, ] ]: + if not alerts: + return [] + flattened_alerts: List[ Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel] ] = [] diff --git a/tests/mocks/data_monitoring/alerts/integrations/test_group_alerts.py b/tests/mocks/data_monitoring/alerts/integrations/test_group_alerts.py new file mode 100644 index 000000000..ac87fd573 --- /dev/null +++ b/tests/mocks/data_monitoring/alerts/integrations/test_group_alerts.py @@ -0,0 +1,70 @@ +from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup +from elementary.monitor.alerts.test_alert import TestAlertModel +from elementary.monitor.data_monitoring.alerts.integrations.base_integration import ( + BaseIntegration, +) + + +def test_group_alerts(): + grouped_alerts = BaseIntegration._group_alerts(alerts=[], threshold=0) + assert len(grouped_alerts) == 0 + grouped_alerts = BaseIntegration._group_alerts(alerts=[], threshold=1) + assert len(grouped_alerts) == 0 + + alerts = [ + TestAlertModel( + id="1", + test_unique_id="1", + elementary_unique_id="1", + test_name="1", + severity="WARN", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="1", + alert_class_id="1", + ) + ] + grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=0) + assert len(grouped_alerts) == 1 + assert isinstance(grouped_alerts[0], AlertsGroup) + assert grouped_alerts[0].alerts == alerts + grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=1) + assert len(grouped_alerts) == 1 + assert isinstance(grouped_alerts[0], AlertsGroup) + assert grouped_alerts[0].alerts == alerts + + alerts = [ + TestAlertModel( + id="1", + test_unique_id="1", + elementary_unique_id="1", + test_name="1", + severity="WARN", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="1", + alert_class_id="1", + ), + TestAlertModel( + id="2", + test_unique_id="2", + elementary_unique_id="2", + test_name="2", + severity="WARN", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="2", + alert_class_id="2", + ), + ] + grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=0) + assert len(grouped_alerts) == 1 + assert isinstance(grouped_alerts[0], AlertsGroup) + assert grouped_alerts[0].alerts == alerts + grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=2) + assert len(grouped_alerts) == 1 + assert isinstance(grouped_alerts[0], AlertsGroup) + assert grouped_alerts[0].alerts == alerts + grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=10) + assert len(grouped_alerts) == 2 + assert grouped_alerts == alerts