Skip to content

Commit ca841c0

Browse files
authored
Merge pull request #1868 from elementary-data/fix-workflows-regression
re-added support for workflows
2 parents 650619e + fc9fb80 commit ca841c0

File tree

3 files changed

+56
-14
lines changed

3 files changed

+56
-14
lines changed

elementary/clients/slack/client.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import json
22
from abc import ABC, abstractmethod
3-
from typing import Dict, List, Optional, Tuple
3+
from typing import Dict, List, Optional, Tuple, Union
44

5+
import requests
56
from ratelimit import limits, sleep_and_retry
67
from slack_sdk import WebClient, WebhookClient
78
from slack_sdk.errors import SlackApiError
@@ -41,16 +42,21 @@ def create_client(
4142
return SlackWebClient(token=config.slack_token, tracking=tracking)
4243
elif config.slack_webhook:
4344
logger.debug("Creating Slack client with webhook.")
44-
return SlackWebhookClient(webhook=config.slack_webhook, tracking=tracking)
45+
return SlackWebhookClient(
46+
webhook=config.slack_webhook,
47+
is_workflow=config.is_slack_workflow,
48+
tracking=tracking,
49+
)
4550
return None
4651

4752
@abstractmethod
4853
def _initial_client(self):
4954
raise NotImplementedError
5055

5156
def _initial_retry_handlers(self):
52-
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=5)
53-
self.client.retry_handlers.append(rate_limit_handler)
57+
if isinstance(self.client, WebClient):
58+
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=5)
59+
self.client.retry_handlers.append(rate_limit_handler)
5460

5561
@abstractmethod
5662
def send_message(self, **kwargs):
@@ -223,28 +229,43 @@ class SlackWebhookClient(SlackClient):
223229
def __init__(
224230
self,
225231
webhook: str,
232+
is_workflow: bool,
226233
tracking: Optional[Tracking] = None,
227234
):
228235
self.webhook = webhook
236+
self.is_workflow = is_workflow
229237
super().__init__(tracking)
230238

231239
def _initial_client(self):
240+
if self.is_workflow:
241+
return requests.Session()
232242
return WebhookClient(
233243
url=self.webhook, default_headers={"Content-type": "application/json"}
234244
)
235245

236246
@sleep_and_retry
237247
@limits(calls=1, period=ONE_SECOND)
238248
def send_message(self, message: SlackMessageSchema, **kwargs) -> bool:
239-
response: WebhookResponse = self.client.send(
240-
text=message.text, blocks=message.blocks, attachments=message.attachments
241-
)
249+
response: Union[requests.Response, WebhookResponse]
250+
if self.is_workflow:
251+
# For slack workflows, we need to send the message raw to the webhook
252+
response = self.client.post(self.webhook, data=message.text)
253+
else:
254+
response = self.client.send(
255+
text=message.text,
256+
blocks=message.blocks,
257+
attachments=message.attachments,
258+
)
242259
if response.status_code == OK_STATUS_CODE:
243260
return True
244-
245261
else:
262+
response_body = (
263+
response.text
264+
if isinstance(response, requests.Response)
265+
else response.body
266+
)
246267
logger.error(
247-
f"Could not post message to slack via webhook - {self.webhook}. Status code: {response.status_code}, Error: {response.body}"
268+
f"Could not post message to slack via webhook - {self.webhook}. Status code: {response.status_code}, Error: {response_body}"
248269
)
249270
return False
250271

elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
from elementary.monitor.alerts.test_alert import TestAlertModel
2626
from elementary.monitor.api.alerts.alert_filters import filter_alerts
2727
from elementary.monitor.api.alerts.alerts import AlertsAPI
28+
from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
29+
BaseIntegration,
30+
)
2831
from elementary.monitor.data_monitoring.alerts.integrations.integrations import (
2932
Integrations,
3033
)
@@ -55,7 +58,7 @@ def get_health_check_message() -> MessageBody:
5558

5659

5760
class DataMonitoringAlerts(DataMonitoring):
58-
alerts_integration: BaseMessagingIntegration
61+
alerts_integration: Union[BaseMessagingIntegration, BaseIntegration]
5962

6063
def __init__(
6164
self,
@@ -87,7 +90,7 @@ def __init__(
8790

8891
def _get_integration_client(
8992
self,
90-
) -> BaseMessagingIntegration:
93+
) -> Union[BaseMessagingIntegration, BaseIntegration]:
9194
return Integrations.get_integration(
9295
config=self.config,
9396
tracking=self.tracking,
@@ -284,6 +287,11 @@ def _send_message(
284287
return integration.send_message(destination=destination, body=body)
285288

286289
def _send_test_message(self) -> MessageSendResult:
290+
if isinstance(self.alerts_integration, BaseIntegration):
291+
raise ValueError(
292+
"Cannot send test message with a BaseIntegration of type "
293+
f"{type(self.alerts_integration)}"
294+
)
287295
test_message = get_health_check_message()
288296
return self._send_message(
289297
integration=self.alerts_integration, body=test_message, metadata={}
@@ -298,7 +306,9 @@ def _send_alert(
298306
GroupedByTableAlerts,
299307
AlertsGroup,
300308
],
301-
):
309+
) -> bool:
310+
if isinstance(self.alerts_integration, BaseIntegration):
311+
return self.alerts_integration.send_alert(alert)
302312
alert_message_builder = AlertMessageBuilder()
303313
alert_message_body = alert_message_builder.build(
304314
alert=alert,

elementary/monitor/data_monitoring/alerts/integrations/integrations.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Optional, cast
1+
from typing import Any, Optional, Union, cast
22

33
from elementary.config.config import Config
44
from elementary.exceptions.exceptions import Error
@@ -15,6 +15,12 @@
1515
from elementary.messages.messaging_integrations.teams_webhook import (
1616
TeamsWebhookMessagingIntegration,
1717
)
18+
from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
19+
BaseIntegration,
20+
)
21+
from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import (
22+
SlackIntegration,
23+
)
1824
from elementary.tracking.tracking_interface import Tracking
1925
from elementary.utils.log import get_logger
2026

@@ -35,8 +41,13 @@ class Integrations:
3541
def get_integration(
3642
config: Config,
3743
tracking: Optional[Tracking] = None,
38-
) -> BaseMessagingIntegration:
44+
) -> Union[BaseMessagingIntegration, BaseIntegration]:
3945
if config.has_slack:
46+
if config.is_slack_workflow:
47+
return SlackIntegration(
48+
config=config,
49+
tracking=tracking,
50+
)
4051
if config.slack_token:
4152
return SlackWebMessagingIntegration.from_token(
4253
config.slack_token, tracking

0 commit comments

Comments
 (0)