Skip to content

Commit 044b0c0

Browse files
committed
Add support for new messaging integration in data monitoring alerts
- Updated `DataMonitoringAlerts` to support both legacy and new messaging integrations - Implemented dual-path alert sending logic for BaseIntegration and BaseMessagingIntegration - Added `_send_message()` method to handle new messaging integration message sending - Created `get_health_check_message()` for test message support in new integrations - Updated `Integrations` class to support gradual migration to new messaging system - Added destination resolution method for messaging integrations
1 parent 343e65d commit 044b0c0

File tree

2 files changed

+114
-20
lines changed

2 files changed

+114
-20
lines changed

elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py

Lines changed: 87 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,19 @@
66
from alive_progress import alive_bar
77

88
from elementary.config.config import Config
9+
from elementary.messages.block_builders import TextLineBlock
10+
from elementary.messages.blocks import HeaderBlock, LinesBlock
11+
from elementary.messages.message_body import MessageBody
12+
from elementary.messages.messaging_integrations.base_messaging_integration import (
13+
BaseMessagingIntegration,
14+
MessageSendResult,
15+
)
16+
from elementary.messages.messaging_integrations.exceptions import (
17+
MessagingIntegrationError,
18+
)
19+
from elementary.monitor.alerts.alert_messages.builder import AlertMessageBuilder
920
from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts
1021
from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
11-
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
1222
from elementary.monitor.alerts.grouping_type import GroupingType
1323
from elementary.monitor.alerts.model_alert import ModelAlertModel
1424
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
@@ -32,7 +42,26 @@
3242
logger = get_logger(__name__)
3343

3444

45+
def get_health_check_message() -> MessageBody:
46+
return MessageBody(
47+
blocks=[
48+
HeaderBlock(text="Elementary monitor ran successfully"),
49+
LinesBlock(
50+
lines=[
51+
TextLineBlock(
52+
text=f"Elementary monitor ran successfully on {datetime.now().strftime('%Y-%m-%d %H:%M')}"
53+
),
54+
]
55+
),
56+
]
57+
)
58+
59+
3560
class DataMonitoringAlerts(DataMonitoring):
61+
# The alerts_integration field now supports both the legacy BaseIntegration and the new BaseMessagingIntegration
62+
# This dual support allows for a gradual migration from the old integration system to the new messaging system
63+
alerts_integration: Union[BaseIntegration, BaseMessagingIntegration]
64+
3665
def __init__(
3766
self,
3867
config: Config,
@@ -61,7 +90,9 @@ def __init__(
6190
self.override_config_defaults = override_config
6291
self.alerts_integration = self._get_integration_client()
6392

64-
def _get_integration_client(self) -> BaseIntegration:
93+
def _get_integration_client(
94+
self,
95+
) -> Union[BaseIntegration, BaseMessagingIntegration]:
6596
return Integrations.get_integration(
6697
config=self.config,
6798
tracking=self.tracking,
@@ -183,7 +214,7 @@ def _format_alerts(
183214
ModelAlertModel,
184215
SourceFreshnessAlertModel,
185216
GroupedByTableAlerts,
186-
BaseAlertsGroup,
217+
AlertsGroup,
187218
]
188219
]:
189220
group_all_alerts = len(alerts) >= self.config.group_alerts_threshold # type: ignore[arg-type]
@@ -244,11 +275,57 @@ def _format_alerts(
244275
key=lambda alert: alert.detected_at or datetime.max,
245276
)
246277

247-
def _send_test_message(self):
248-
self.alerts_integration.send_test_message(
249-
channel_name=self.config.slack_channel_name
278+
def _send_message(
279+
self, integration: BaseMessagingIntegration, message_body: MessageBody
280+
) -> MessageSendResult:
281+
destination = Integrations.get_destination(
282+
integration=integration, config=self.config
283+
)
284+
return integration.send_message(
285+
destination=destination, message_body=message_body
250286
)
251287

288+
def _send_test_message(self):
289+
if isinstance(self.alerts_integration, BaseIntegration):
290+
self.alerts_integration.send_test_message(
291+
channel_name=self.config.slack_channel_name
292+
)
293+
else:
294+
test_message = get_health_check_message()
295+
return self._send_message(
296+
integration=self.alerts_integration, message_body=test_message
297+
)
298+
299+
def _send_alert(
300+
self,
301+
alert: Union[
302+
TestAlertModel,
303+
ModelAlertModel,
304+
SourceFreshnessAlertModel,
305+
GroupedByTableAlerts,
306+
AlertsGroup,
307+
],
308+
):
309+
# Support both legacy BaseIntegration and new BaseMessagingIntegration
310+
# BaseIntegration will be deprecated in favor of BaseMessagingIntegration
311+
if isinstance(self.alerts_integration, BaseIntegration):
312+
return self.alerts_integration.send_alert(alert)
313+
else:
314+
# New messaging integration path - converts alerts to message bodies
315+
alert_message_builder = AlertMessageBuilder()
316+
alert_message_body = alert_message_builder.build(
317+
alert=alert,
318+
)
319+
try:
320+
self._send_message(
321+
integration=self.alerts_integration,
322+
message_body=alert_message_body,
323+
)
324+
return True
325+
except MessagingIntegrationError:
326+
logger.error(f"Could not send the alert - {type(alert)}.")
327+
return False
328+
252329
def _send_alerts(
253330
self,
254331
alerts: List[
@@ -257,7 +334,7 @@ def _send_alerts(
257334
ModelAlertModel,
258335
SourceFreshnessAlertModel,
259336
GroupedByTableAlerts,
260-
BaseAlertsGroup,
337+
AlertsGroup,
261338
]
262339
],
263340
):
@@ -275,15 +352,15 @@ def _send_alerts(
275352

276353
with alive_bar(len(alerts), title="Sending alerts") as bar:
277354
for alert in alerts:
278-
sent_successfully = self.alerts_integration.send_alert(alert)
355+
sent_successfully = self._send_alert(alert)
279356
bar()
280357
if sent_successfully:
281-
if isinstance(alert, BaseAlertsGroup):
358+
if isinstance(alert, AlertsGroup):
282359
sent_successfully_alerts.extend(alert.alerts)
283360
else:
284361
sent_successfully_alerts.append(alert)
285362
else:
286-
if isinstance(alert, BaseAlertsGroup):
363+
if isinstance(alert, AlertsGroup):
287364
for inner_alert in alert.alerts:
288365
logger.error(
289366
f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}"

elementary/monitor/data_monitoring/alerts/integrations/integrations.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
1-
from typing import Optional
1+
from typing import Any, Optional, Union
22

33
from elementary.config.config import Config
44
from elementary.exceptions.exceptions import Error
5+
from elementary.messages.messaging_integrations.base_messaging_integration import (
6+
BaseMessagingIntegration,
7+
)
8+
from elementary.messages.messaging_integrations.teams_webhook import (
9+
ChannelWebhook,
10+
TeamsWebhookMessagingIntegration,
11+
)
512
from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
613
BaseIntegration,
714
)
815
from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import (
916
SlackIntegration,
1017
)
11-
from elementary.monitor.data_monitoring.alerts.integrations.teams.teams import (
12-
TeamsIntegration,
13-
)
1418
from elementary.tracking.tracking_interface import Tracking
1519

1620

@@ -29,18 +33,31 @@ def get_integration(
2933
config: Config,
3034
tracking: Optional[Tracking] = None,
3135
override_config_defaults: bool = False,
32-
) -> BaseIntegration:
36+
) -> Union[BaseIntegration, BaseMessagingIntegration]:
37+
# Factory method that returns either a legacy BaseIntegration or new BaseMessagingIntegration
38+
# This allows for a gradual migration from the old integration system to the new messaging system
39+
# - Slack currently uses the legacy BaseIntegration
40+
# - Teams uses the new BaseMessagingIntegration
3341
if config.has_slack:
3442
return SlackIntegration(
3543
config=config,
3644
tracking=tracking,
3745
override_config_defaults=override_config_defaults,
3846
)
3947
elif config.has_teams:
40-
return TeamsIntegration(
41-
config=config,
42-
tracking=tracking,
43-
override_config_defaults=override_config_defaults,
44-
)
48+
return TeamsWebhookMessagingIntegration()
4549
else:
4650
raise UnsupportedAlertIntegrationError
51+
52+
@staticmethod
53+
def get_destination(integration: BaseMessagingIntegration, config: Config) -> Any:
54+
# Helper method to get the appropriate destination for BaseMessagingIntegration implementations
55+
# Each messaging integration type may have different destination requirements
56+
# Currently supports Teams webhook destinations
57+
if (
58+
isinstance(integration, TeamsWebhookMessagingIntegration)
59+
and config.has_teams
60+
and config.teams_webhook
61+
):
62+
return ChannelWebhook(webhook=config.teams_webhook)
63+
return None

0 commit comments

Comments
 (0)