diff --git a/elementary/messages/messaging_integrations/README.md b/elementary/messages/messaging_integrations/README.md new file mode 100644 index 000000000..4fad946bf --- /dev/null +++ b/elementary/messages/messaging_integrations/README.md @@ -0,0 +1,58 @@ +# Elementary Messaging Integration System + +## Overview + +The Elementary Messaging Integration system provides a flexible and extensible framework for sending alerts and messages to various messaging platforms (e.g., Slack, Teams). The system is designed to support a gradual migration from the legacy integration system to a more generic messaging-based approach. + +## Architecture + +### BaseMessagingIntegration + +The core of the new messaging system is the `BaseMessagingIntegration` abstract class. This class defines the contract that all messaging integrations must follow: + +- `send_message()`: Send a message to a specific destination +- `supports_reply()`: Check if the integration supports message threading/replies +- `reply_to_message()`: Reply to an existing message (if supported) + +### Key Components + +1. **MessageBody**: A platform-agnostic representation of a message +2. **MessageSendResult**: Contains information about a sent message, including timestamp and platform-specific context +3. **DestinationType**: Generic type representing the destination for a message (e.g., webhook URL, channel) +4. **MessageContextType**: Generic type for platform-specific message context + +## Migration Strategy + +The system currently supports both: + +- Legacy `BaseIntegration` implementations (e.g., Slack) +- New `BaseMessagingIntegration` implementations (e.g., Teams) + +This dual support allows for a gradual migration path where: + +1. New integrations are implemented using `BaseMessagingIntegration` +2. Existing integrations can be migrated one at a time +3. The legacy `BaseIntegration` will eventually be deprecated + +## Implementing a New Integration + +To add a new messaging platform integration: + +1. Create a new class that extends `BaseMessagingIntegration` +2. Implement the required abstract methods: + ```python + def send_message(self, destination: DestinationType, body: MessageBody) -> MessageSendResult + def supports_reply(self) -> bool + def reply_to_message(self, destination, message_context, message_body) -> MessageSendResult # if supported + ``` +3. Update the `Integrations` factory class to support the new integration + +## Current Implementations + +- **Teams**: Uses the new `BaseMessagingIntegration` system with webhook support +- **Slack**: Currently uses the legacy `BaseIntegration` system (planned for migration) + +## Future Improvements + +1. Complete migration of Slack to `BaseMessagingIntegration` +2. Add support for more messaging platforms diff --git a/elementary/messages/messaging_integrations/__init__.py b/elementary/messages/messaging_integrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elementary/messages/messaging_integrations/base_messaging_integration.py b/elementary/messages/messaging_integrations/base_messaging_integration.py new file mode 100644 index 000000000..169f18433 --- /dev/null +++ b/elementary/messages/messaging_integrations/base_messaging_integration.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Generic, Optional, TypeVar + +from pydantic import BaseModel + +from elementary.messages.message_body import MessageBody +from elementary.messages.messaging_integrations.exceptions import ( + MessageIntegrationReplyNotSupportedError, +) +from elementary.utils.log import get_logger + +logger = get_logger(__name__) + + +T = TypeVar("T") + + +class MessageSendResult(BaseModel, Generic[T]): + timestamp: datetime + message_context: Optional[T] = None + + +DestinationType = TypeVar("DestinationType") +MessageContextType = TypeVar("MessageContextType") + + +class BaseMessagingIntegration(ABC, Generic[DestinationType, MessageContextType]): + @abstractmethod + def send_message( + self, + destination: DestinationType, + body: MessageBody, + ) -> MessageSendResult[MessageContextType]: + raise NotImplementedError + + @abstractmethod + def supports_reply(self) -> bool: + raise NotImplementedError + + def reply_to_message( + self, + destination: DestinationType, + message_context: MessageContextType, + body: MessageBody, + ) -> MessageSendResult[MessageContextType]: + if not self.supports_reply(): + raise MessageIntegrationReplyNotSupportedError + raise NotImplementedError diff --git a/elementary/messages/messaging_integrations/exceptions.py b/elementary/messages/messaging_integrations/exceptions.py new file mode 100644 index 000000000..dc1272423 --- /dev/null +++ b/elementary/messages/messaging_integrations/exceptions.py @@ -0,0 +1,6 @@ +class MessagingIntegrationError(Exception): + pass + + +class MessageIntegrationReplyNotSupportedError(MessagingIntegrationError): + pass diff --git a/elementary/messages/messaging_integrations/teams_webhook.py b/elementary/messages/messaging_integrations/teams_webhook.py new file mode 100644 index 000000000..d1ced03b5 --- /dev/null +++ b/elementary/messages/messaging_integrations/teams_webhook.py @@ -0,0 +1,82 @@ +from datetime import datetime +from typing import Optional + +import requests +from pydantic import BaseModel + +from elementary.messages.formats.adaptive_cards import format_adaptive_card +from elementary.messages.message_body import MessageBody +from elementary.messages.messaging_integrations.base_messaging_integration import ( + BaseMessagingIntegration, + MessageSendResult, +) +from elementary.messages.messaging_integrations.exceptions import ( + MessageIntegrationReplyNotSupportedError, + MessagingIntegrationError, +) +from elementary.utils.log import get_logger + +logger = get_logger(__name__) + + +class ChannelWebhook(BaseModel): + webhook: str + channel: Optional[str] = None + + +def send_adaptive_card(webhook_url: str, card: dict) -> requests.Response: + """Sends an Adaptive Card to the specified webhook URL.""" + payload = { + "type": "message", + "attachments": [ + { + "contentType": "application/vnd.microsoft.card.adaptive", + "contentUrl": None, + "content": card, + } + ], + } + + response = requests.post( + webhook_url, + json=payload, + headers={"Content-Type": "application/json"}, + ) + response.raise_for_status() + if response.status_code == 202: + logger.debug("Got 202 response from Teams webhook, assuming success") + return response + + +class TeamsWebhookMessagingIntegration( + BaseMessagingIntegration[ChannelWebhook, ChannelWebhook] +): + def send_message( + self, + destination: ChannelWebhook, + body: MessageBody, + ) -> MessageSendResult[ChannelWebhook]: + card = format_adaptive_card(body) + try: + send_adaptive_card(destination.webhook, card) + return MessageSendResult( + message_context=destination, + timestamp=datetime.utcnow(), + ) + except requests.RequestException as e: + raise MessagingIntegrationError( + "Failed to send message to Teams webhook" + ) from e + + def supports_reply(self) -> bool: + return False + + def reply_to_message( + self, + destination: ChannelWebhook, + message_context: ChannelWebhook, + body: MessageBody, + ) -> MessageSendResult[ChannelWebhook]: + raise MessageIntegrationReplyNotSupportedError( + "Teams webhook message integration does not support replying to messages" + ) diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index eb5ccaeb4..496dba5a8 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -6,8 +6,19 @@ from alive_progress import alive_bar from elementary.config.config import Config +from elementary.messages.block_builders import TextLineBlock +from elementary.messages.blocks import HeaderBlock, LinesBlock +from elementary.messages.message_body import MessageBody +from elementary.messages.messaging_integrations.base_messaging_integration import ( + BaseMessagingIntegration, + MessageSendResult, +) +from elementary.messages.messaging_integrations.exceptions import ( + MessagingIntegrationError, +) +from elementary.monitor.alerts.alert_messages.builder import AlertMessageBuilder from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts -from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup +from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup from elementary.monitor.alerts.grouping_type import GroupingType from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel @@ -31,7 +42,26 @@ logger = get_logger(__name__) +def get_health_check_message() -> MessageBody: + return MessageBody( + blocks=[ + HeaderBlock(text="Elementary monitor ran successfully"), + LinesBlock( + lines=[ + TextLineBlock( + text=f"Elementary monitor ran successfully on {datetime.now().strftime('%Y-%m-%d %H:%M')}" + ), + ] + ), + ] + ) + + class DataMonitoringAlerts(DataMonitoring): + # The alerts_integration field now supports both the legacy BaseIntegration and the new BaseMessagingIntegration + # This dual support allows for a gradual migration from the old integration system to the new messaging system + alerts_integration: Union[BaseIntegration, BaseMessagingIntegration] + def __init__( self, config: Config, @@ -60,7 +90,9 @@ def __init__( self.override_config_defaults = override_config self.alerts_integration = self._get_integration_client() - def _get_integration_client(self) -> BaseIntegration: + def _get_integration_client( + self, + ) -> Union[BaseIntegration, BaseMessagingIntegration]: return Integrations.get_integration( config=self.config, tracking=self.tracking, @@ -182,8 +214,10 @@ def _format_alerts( ModelAlertModel, SourceFreshnessAlertModel, GroupedByTableAlerts, + AlertsGroup, ] ]: + group_all_alerts = len(alerts) >= self.config.group_alerts_threshold # type: ignore[arg-type] formatted_alerts = [] grouped_by_table_alerts = [] model_ids_to_alerts_map = defaultdict(lambda: []) @@ -204,6 +238,10 @@ def _format_alerts( disable_samples=self.disable_samples, env=self.config.specified_env, ) + if group_all_alerts: + formatted_alerts.append(formatted_alert) + continue + try: grouping_type = GroupingType(group_alerts_by) if grouping_type == GroupingType.BY_TABLE: @@ -218,29 +256,76 @@ def _format_alerts( f"Failed to extract value as a group-by config: '{group_alerts_by}'. Allowed Values: {list(GroupingType.__members__.keys())} Ignoring it for now and default grouping strategy will be used" ) - for alerts_by_model in model_ids_to_alerts_map.values(): - grouped_by_table_alerts.append( - GroupedByTableAlerts( - alerts=alerts_by_model, - env=self.config.specified_env, + if group_all_alerts: + return [AlertsGroup(alerts=formatted_alerts, env=self.config.specified_env)] + + else: + for alerts_by_model in model_ids_to_alerts_map.values(): + grouped_by_table_alerts.append( + GroupedByTableAlerts( + alerts=alerts_by_model, env=self.config.specified_env + ) ) + + self.execution_properties["had_group_by_table"] = ( + len(grouped_by_table_alerts) > 0 ) + self.execution_properties["had_group_by_alert"] = len(formatted_alerts) > 0 - self.execution_properties["had_group_by_table"] = ( - len(grouped_by_table_alerts) > 0 - ) - self.execution_properties["had_group_by_alert"] = len(formatted_alerts) > 0 + all_alerts = formatted_alerts + grouped_by_table_alerts + return sorted( + all_alerts, + key=lambda alert: alert.detected_at or datetime.max, + ) - all_alerts = formatted_alerts + grouped_by_table_alerts - return sorted( - all_alerts, - key=lambda alert: alert.detected_at or datetime.max, + def _send_message( + self, integration: BaseMessagingIntegration, body: MessageBody + ) -> MessageSendResult: + destination = Integrations.get_destination( + integration=integration, config=self.config ) + return integration.send_message(destination=destination, body=body) def _send_test_message(self): - self.alerts_integration.send_test_message( - channel_name=self.config.slack_channel_name - ) + if isinstance(self.alerts_integration, BaseIntegration): + self.alerts_integration.send_test_message( + channel_name=self.config.slack_channel_name + ) + else: + test_message = get_health_check_message() + return self._send_message( + integration=self.alerts_integration, body=test_message + ) + + def _send_alert( + self, + alert: Union[ + TestAlertModel, + ModelAlertModel, + SourceFreshnessAlertModel, + GroupedByTableAlerts, + AlertsGroup, + ], + ): + # Support both legacy BaseIntegration and new BaseMessagingIntegration + # BaseIntegration will be deprecated in favor of BaseMessagingIntegration + if isinstance(self.alerts_integration, BaseIntegration): + return self.alerts_integration.send_alert(alert) + else: + # New messaging integration path - converts alerts to message bodies + alert_message_builder = AlertMessageBuilder() + alert_message_body = alert_message_builder.build( + alert=alert, + ) + try: + self._send_message( + integration=self.alerts_integration, + body=alert_message_body, + ) + return True + except MessagingIntegrationError: + logger.error(f"Could not send the alert - {type(alert)}.") + return False def _send_alerts( self, @@ -250,6 +335,7 @@ def _send_alerts( ModelAlertModel, SourceFreshnessAlertModel, GroupedByTableAlerts, + AlertsGroup, ] ], ): @@ -257,20 +343,25 @@ def _send_alerts( self.execution_properties["sent_alert_count"] = self.sent_alert_count return - sent_successfully_alerts = [] + sent_successfully_alerts: List[ + Union[ + TestAlertModel, + ModelAlertModel, + SourceFreshnessAlertModel, + ] + ] = [] with alive_bar(len(alerts), title="Sending alerts") as bar: - for alert, sent_successfully in self.alerts_integration.send_alerts( - alerts, self.config.group_alerts_threshold - ): + for alert in alerts: + sent_successfully = self._send_alert(alert) bar() if sent_successfully: - if isinstance(alert, BaseAlertsGroup): + if isinstance(alert, AlertsGroup): sent_successfully_alerts.extend(alert.alerts) else: sent_successfully_alerts.append(alert) else: - if isinstance(alert, BaseAlertsGroup): + if isinstance(alert, AlertsGroup): for inner_alert in alert.alerts: logger.error( f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}" diff --git a/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py b/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py index 1b076b4ec..e4781b5ab 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py @@ -129,6 +129,7 @@ def _group_alerts( AlertsGroup, ] ]: + # Deprecated: the grouping logic is now handled outside of the integration, and the integration only sends the alerts if not alerts: return [] @@ -176,6 +177,7 @@ def send_alerts( None, None, ]: + # Deprecated: the grouping logic is now handled outside of the integration, and the integration only sends the alerts grouped_alerts = self._group_alerts(alerts, group_alerts_threshold) for alert in grouped_alerts: if isinstance(alert, BaseAlertsGroup): diff --git a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py index 6f1f976d4..8a37b5bdf 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py @@ -1,16 +1,20 @@ -from typing import Optional +from typing import Any, Optional, Union from elementary.config.config import Config from elementary.exceptions.exceptions import Error +from elementary.messages.messaging_integrations.base_messaging_integration import ( + BaseMessagingIntegration, +) +from elementary.messages.messaging_integrations.teams_webhook import ( + ChannelWebhook, + TeamsWebhookMessagingIntegration, +) from elementary.monitor.data_monitoring.alerts.integrations.base_integration import ( BaseIntegration, ) from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import ( SlackIntegration, ) -from elementary.monitor.data_monitoring.alerts.integrations.teams.teams import ( - TeamsIntegration, -) from elementary.tracking.tracking_interface import Tracking @@ -29,7 +33,11 @@ def get_integration( config: Config, tracking: Optional[Tracking] = None, override_config_defaults: bool = False, - ) -> BaseIntegration: + ) -> Union[BaseIntegration, BaseMessagingIntegration]: + # Factory method that returns either a legacy BaseIntegration or new BaseMessagingIntegration + # This allows for a gradual migration from the old integration system to the new messaging system + # - Slack currently uses the legacy BaseIntegration + # - Teams uses the new BaseMessagingIntegration if config.has_slack: return SlackIntegration( config=config, @@ -37,10 +45,19 @@ def get_integration( override_config_defaults=override_config_defaults, ) elif config.has_teams: - return TeamsIntegration( - config=config, - tracking=tracking, - override_config_defaults=override_config_defaults, - ) + return TeamsWebhookMessagingIntegration() else: raise UnsupportedAlertIntegrationError + + @staticmethod + def get_destination(integration: BaseMessagingIntegration, config: Config) -> Any: + # Helper method to get the appropriate destination for BaseMessagingIntegration implementations + # Each messaging integration type may have different destination requirements + # Currently supports Teams webhook destinations + if ( + isinstance(integration, TeamsWebhookMessagingIntegration) + and config.has_teams + and config.teams_webhook + ): + return ChannelWebhook(webhook=config.teams_webhook) + return None