diff --git a/elementary/clients/slack/client.py b/elementary/clients/slack/client.py index fb366d60b..f66a9b969 100644 --- a/elementary/clients/slack/client.py +++ b/elementary/clients/slack/client.py @@ -1,7 +1,8 @@ import json from abc import ABC, abstractmethod -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union +import requests from ratelimit import limits, sleep_and_retry from slack_sdk import WebClient, WebhookClient from slack_sdk.errors import SlackApiError @@ -41,7 +42,11 @@ def create_client( return SlackWebClient(token=config.slack_token, tracking=tracking) elif config.slack_webhook: logger.debug("Creating Slack client with webhook.") - return SlackWebhookClient(webhook=config.slack_webhook, tracking=tracking) + return SlackWebhookClient( + webhook=config.slack_webhook, + is_workflow=config.is_slack_workflow, + tracking=tracking, + ) return None @abstractmethod @@ -49,8 +54,9 @@ def _initial_client(self): raise NotImplementedError def _initial_retry_handlers(self): - rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=5) - self.client.retry_handlers.append(rate_limit_handler) + if isinstance(self.client, WebClient): + rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=5) + self.client.retry_handlers.append(rate_limit_handler) @abstractmethod def send_message(self, **kwargs): @@ -223,12 +229,16 @@ class SlackWebhookClient(SlackClient): def __init__( self, webhook: str, + is_workflow: bool, tracking: Optional[Tracking] = None, ): self.webhook = webhook + self.is_workflow = is_workflow super().__init__(tracking) def _initial_client(self): + if self.is_workflow: + return requests.Session() return WebhookClient( url=self.webhook, default_headers={"Content-type": "application/json"} ) @@ -236,15 +246,26 @@ def _initial_client(self): @sleep_and_retry @limits(calls=1, period=ONE_SECOND) def send_message(self, message: SlackMessageSchema, **kwargs) -> bool: - response: WebhookResponse = self.client.send( - text=message.text, blocks=message.blocks, attachments=message.attachments - ) + response: Union[requests.Response, WebhookResponse] + if self.is_workflow: + # For slack workflows, we need to send the message raw to the webhook + response = self.client.post(self.webhook, data=message.text) + else: + response = self.client.send( + text=message.text, + blocks=message.blocks, + attachments=message.attachments, + ) if response.status_code == OK_STATUS_CODE: return True - else: + response_body = ( + response.text + if isinstance(response, requests.Response) + else response.body + ) logger.error( - f"Could not post message to slack via webhook - {self.webhook}. Status code: {response.status_code}, Error: {response.body}" + f"Could not post message to slack via webhook - {self.webhook}. Status code: {response.status_code}, Error: {response_body}" ) return False diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index f0cbbf9db..ce504c4b3 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -25,6 +25,9 @@ from elementary.monitor.alerts.test_alert import TestAlertModel from elementary.monitor.api.alerts.alert_filters import filter_alerts from elementary.monitor.api.alerts.alerts import AlertsAPI +from elementary.monitor.data_monitoring.alerts.integrations.base_integration import ( + BaseIntegration, +) from elementary.monitor.data_monitoring.alerts.integrations.integrations import ( Integrations, ) @@ -55,7 +58,7 @@ def get_health_check_message() -> MessageBody: class DataMonitoringAlerts(DataMonitoring): - alerts_integration: BaseMessagingIntegration + alerts_integration: Union[BaseMessagingIntegration, BaseIntegration] def __init__( self, @@ -87,7 +90,7 @@ def __init__( def _get_integration_client( self, - ) -> BaseMessagingIntegration: + ) -> Union[BaseMessagingIntegration, BaseIntegration]: return Integrations.get_integration( config=self.config, tracking=self.tracking, @@ -284,6 +287,11 @@ def _send_message( return integration.send_message(destination=destination, body=body) def _send_test_message(self) -> MessageSendResult: + if isinstance(self.alerts_integration, BaseIntegration): + raise ValueError( + "Cannot send test message with a BaseIntegration of type " + f"{type(self.alerts_integration)}" + ) test_message = get_health_check_message() return self._send_message( integration=self.alerts_integration, body=test_message, metadata={} @@ -298,7 +306,9 @@ def _send_alert( GroupedByTableAlerts, AlertsGroup, ], - ): + ) -> bool: + if isinstance(self.alerts_integration, BaseIntegration): + return self.alerts_integration.send_alert(alert) alert_message_builder = AlertMessageBuilder() alert_message_body = alert_message_builder.build( alert=alert, diff --git a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py index 89b0519b2..7a8ed59db 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py @@ -1,4 +1,4 @@ -from typing import Any, Optional, cast +from typing import Any, Optional, Union, cast from elementary.config.config import Config from elementary.exceptions.exceptions import Error @@ -15,6 +15,12 @@ from elementary.messages.messaging_integrations.teams_webhook import ( TeamsWebhookMessagingIntegration, ) +from elementary.monitor.data_monitoring.alerts.integrations.base_integration import ( + BaseIntegration, +) +from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import ( + SlackIntegration, +) from elementary.tracking.tracking_interface import Tracking from elementary.utils.log import get_logger @@ -35,8 +41,13 @@ class Integrations: def get_integration( config: Config, tracking: Optional[Tracking] = None, - ) -> BaseMessagingIntegration: + ) -> Union[BaseMessagingIntegration, BaseIntegration]: if config.has_slack: + if config.is_slack_workflow: + return SlackIntegration( + config=config, + tracking=tracking, + ) if config.slack_token: return SlackWebMessagingIntegration.from_token( config.slack_token, tracking