diff --git a/dev-requirements.txt b/dev-requirements.txt index f41268a0d..299f10888 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -16,6 +16,5 @@ types-pytz types-jsonschema types-PyYAML types-setuptools -pandas-stubs types-retry types-decorator diff --git a/docs/_snippets/setup-teams-integration.mdx b/docs/_snippets/setup-teams-integration.mdx index 9780e3c97..477a82ff8 100644 --- a/docs/_snippets/setup-teams-integration.mdx +++ b/docs/_snippets/setup-teams-integration.mdx @@ -34,11 +34,13 @@ Call it `Elementary` (or whatever you prefer) and connect it to the workspace of -Now it is time to setup the webhook for this channel. +Now it's time to set up a webhook. You have two options for creating a webhook: - + -## Create a webhook +## Create a webhook using Connectors + +**Note:** Microsoft 365 Connectors are being deprecated. Consider using Power Automate Workflows (Option 2) for new integrations. Go to a channel in your Team and choose `Manage channel` @@ -77,7 +79,38 @@ Copy the URL of the webhook. -Lastly, pass the webhook to the CLI as a param or in the `config.yml` file: + + +## Create a webhook using Power Automate + +You can create a webhook using Power Automate in two ways: + +### Method 1: Directly from Teams (Recommended) + +1. Go to your Teams channel +2. Click the three dots (...) next to the channel name +3. Select `Workflows` +4. Choose the template "Post to channel when a webhook request is received" +5. Copy the webhook URL + +### Method 2: From Power Automate Website + +1. Go to [Power Automate](https://flow.microsoft.com) +2. Create a new instant cloud flow +3. Search for "When a HTTP request is received" as your trigger +4. In the flow, add a "Post adaptive card in a chat or channel" action +5. Configure the team and channel where you want to post +6. Save the flow and copy the HTTP POST URL + +**Important Notes:** + +- When using Power Automate Workflows, Elementary CLI cannot directly verify if messages were successfully delivered. You'll need to monitor your workflow runs in Power Automate to check for any delivery issues. +- Workflows can't post in private channels as a flow bot, but can post on behalf of a user +- Workflows can only be created in your default environment + + + +Lastly, pass the webhook URL (from either method) to the CLI as a param or in the `config.yml` file: diff --git a/docs/cloud/integrations/alerts/ms-teams.mdx b/docs/cloud/integrations/alerts/ms-teams.mdx index 9cfbbe15c..d7ea82311 100644 --- a/docs/cloud/integrations/alerts/ms-teams.mdx +++ b/docs/cloud/integrations/alerts/ms-teams.mdx @@ -3,6 +3,7 @@ title: "Microsoft Teams" --- Elementary's Microsoft Teams integration enables sending alerts when data issues happen. +The alerts are sent using Adaptive Cards format, which provides rich formatting and interactive capabilities. The alerts include rich context, and you can create [alert rules](/features/alerts-and-incidents/alert-rules) to distribute alerts to different channels and destinations. @@ -30,30 +31,29 @@ The alerts include rich context, and you can create [alert rules](/features/aler -3. For each MS Teams channel you connect to Elementary, you will need to create a Webhook. +3. For each MS Teams channel you connect to Elementary, you will need to create a Webhook. There are two ways to create a webhook: - - 1. Go to a channel in your Team and choose `Manage channel` + +1. Go to a channel in your Team and choose `Manage channel`
Teams manage channel
- 2. Click on `Edit` connectors.
Teams edit connectors
@@ -63,9 +63,9 @@ The alerts include rich context, and you can create [alert rules](/features/aler
Teams add incoming webhook
@@ -74,11 +74,11 @@ The alerts include rich context, and you can create [alert rules](/features/aler
- + Teams create webhook
@@ -86,17 +86,43 @@ width="400"
- + Teams copy URL webhook
+**Note:** Microsoft 365 Connectors (previously called Office 365 Connectors) are nearing deprecation, and the creation of new Microsoft 365 Connectors will soon be blocked. Consider using Power Automate Workflows instead.
+ + +You can create a webhook using Power Automate in two ways: + +### Method 1: Directly from Teams (Recommended) + +1. Go to your Teams channel +2. Click the three dots (...) next to the channel name +3. Select `Workflows` +4. Choose the template "Post to channel when a webhook request is received" +5. Copy the webhook URL + +### Method 2: From Power Automate Website + +1. Go to [Power Automate](https://flow.microsoft.com) +2. Create a new instant cloud flow +3. Search for "When a HTTP request is received" as your trigger +4. In the flow, add a "Post adaptive card in a chat or channel" action +5. Configure the team and channel where you want to post +6. Save the flow and copy the HTTP POST URL + +**Important Note:** When using Power Automate Workflows, Elementary CLI cannot directly verify if messages were successfully delivered. You'll need to monitor your workflow runs in Power Automate to check for any errors. + + + 4. Configure your Microsoft Teams webhooks, and give each one a name indicating it's connected channel: @@ -112,17 +138,18 @@ width="400" 5. Select a default channel for alerts, and set the suppression interval. -The default channel you select will automatically add a default [alert rule](/features/alerts-and-incidents/alert-rules) -to sends all failures to this channel. Alerts on warnings are not sent by default. To modify and add tules, navigate to `Alert Rules` page. + The default channel you select will automatically add a default [alert + rule](/features/alerts-and-incidents/alert-rules) to sends all failures to + this channel. Alerts on warnings are not sent by default. To modify and add + tules, navigate to `Alert Rules` page. -
Select channel and suppression interval
diff --git a/docs/oss/deployment-and-configuration/teams.mdx b/docs/oss/deployment-and-configuration/teams.mdx index a1b331751..85eff3cb9 100644 --- a/docs/oss/deployment-and-configuration/teams.mdx +++ b/docs/oss/deployment-and-configuration/teams.mdx @@ -3,17 +3,23 @@ title: "Teams setup for Elementary CLI" sidebarTitle: "Teams" --- -Elementary Teams integration includes sending [Teams alerts](/oss/guides/alerts/send-teams-alerts) on failures in dbt tests and models. +Elementary Teams integration includes sending [Teams alerts](/oss/guides/alerts/send-teams-alerts) on failures in dbt tests and models. The alerts are sent using Microsoft Teams Adaptive Cards format, which provides rich formatting and interactive capabilities. ## Integration options -There is one integration option for Microsoft Teams: a Webhook. This method let you receive alerts from Elementary, but lacks -some support that is available in the Slack integration solution. -Below is features support comparison table (with Slack), to help you select the integration method. +There are two ways to create a webhook for Microsoft Teams: -| Integration | Elementary alerts | Elementary report | Multiple channels | Slack workflows | -| ------------- | ----------------- | ----------------- | ----------------- | --------------- | -| Teams Webhook | ✅ | ❌ | ❌ | ❌ | +1. **Microsoft Teams Connectors (Legacy)**: The traditional way of creating webhooks, but this method is being deprecated by Microsoft. +2. **Power Automate Workflows (Recommended)**: The newer, more flexible way to create webhooks. Note that when using this method, Elementary CLI cannot directly verify if messages were delivered - you'll need to monitor your workflow runs in Power Automate. + +Below is a features support comparison table (with Slack), to help you select the integration method. + +| Integration | Elementary alerts | Elementary report | Multiple channels | +| ------------------------ | ----------------- | ----------------- | ----------------- | +| Teams Connector (Legacy) | ✅ | ❌ | ❌ | +| Power Automate Workflows | ✅ | ❌ | ❌ | + +**Note:** Microsoft 365 Connectors (previously called Office 365 Connectors) are nearing deprecation. We recommend using Power Automate Workflows for new integrations. ## Teams integration setup diff --git a/docs/oss/guides/alerts/send-teams-alerts.mdx b/docs/oss/guides/alerts/send-teams-alerts.mdx index cbc0c630b..fe2665983 100644 --- a/docs/oss/guides/alerts/send-teams-alerts.mdx +++ b/docs/oss/guides/alerts/send-teams-alerts.mdx @@ -9,6 +9,8 @@ title: "Setup Teams alerts" Before you can start using the alerts, make sure to [install the dbt package](/oss/quickstart/quickstart-cli-package), [configure a profile and install the CLI](/oss/quickstart/quickstart-cli). This is **required for the alerts to work.** +Elementary sends alerts using Microsoft Teams Adaptive Cards format, which provides rich formatting and interactive capabilities. You can create a webhook URL using either Microsoft Teams Connectors (legacy, being deprecated) or Power Automate Workflows (recommended). +
diff --git a/elementary/clients/teams/__init__.py b/elementary/clients/teams/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/elementary/clients/teams/client.py b/elementary/clients/teams/client.py deleted file mode 100644 index ad897deb3..000000000 --- a/elementary/clients/teams/client.py +++ /dev/null @@ -1,91 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Optional - -from pymsteams import cardsection, connectorcard, potentialaction # type: ignore -from ratelimit import limits, sleep_and_retry -from requests import Response - -from elementary.config.config import Config -from elementary.tracking.tracking_interface import Tracking -from elementary.utils.log import get_logger - -logger = get_logger(__name__) - -OK_STATUS_CODE = 200 -ONE_SECOND = 1 - - -class TeamsClient(ABC): - def __init__( - self, - webhook: str, - tracking: Optional[Tracking] = None, - ): - self.webhook = webhook - self.tracking = tracking - self.client = self._initial_client() - - @staticmethod - def create_client( - config: Config, tracking: Optional[Tracking] = None - ) -> Optional["TeamsClient"]: - if not config.has_teams: - return None - if config.teams_webhook: - return TeamsWebhookClient(webhook=config.teams_webhook, tracking=tracking) - return None - - @abstractmethod - def _initial_client(self): - raise NotImplementedError - - @abstractmethod - def send_message(self, **kwargs): - raise NotImplementedError - - @abstractmethod - def title(self, title: str): - raise NotImplementedError - - @abstractmethod - def text(self, text: str): - raise NotImplementedError - - @abstractmethod - def addSection(self, section: cardsection): - raise NotImplementedError - - @abstractmethod - def addPotentialAction(self, action: potentialaction): - raise NotImplementedError - - -class TeamsWebhookClient(TeamsClient): - def _initial_client(self): - return connectorcard(self.webhook) - - @sleep_and_retry - @limits(calls=1, period=ONE_SECOND) - def send_message(self, **kwargs) -> bool: - self.client.send() - response: Response = self.client.last_http_response - - if response.status_code == OK_STATUS_CODE: - return True - else: - logger.error( - f"Could not post message to teams via webhook - {self.webhook}. Status code: {response.status_code}, Error: {response.text}" - ) - return False - - def title(self, title: str): - self.client.title(title) - - def text(self, text: str): - self.client.text(text) - - def addSection(self, section: cardsection): - self.client.addSection(section) - - def addPotentialAction(self, action: potentialaction): - self.client.addPotentialAction(action) diff --git a/elementary/messages/messaging_integrations/README.md b/elementary/messages/messaging_integrations/README.md index 4fad946bf..8d34c99fb 100644 --- a/elementary/messages/messaging_integrations/README.md +++ b/elementary/messages/messaging_integrations/README.md @@ -16,11 +16,82 @@ The core of the new messaging system is the `BaseMessagingIntegration` abstract ### Key Components -1. **MessageBody**: A platform-agnostic representation of a message +1. **MessageBody**: A platform-agnostic representation of a message, containing: + + - `blocks`: List of message blocks (headers, code, dividers, lines, facts, expandable sections) + - `color`: Optional color theme (red, yellow, green) + 2. **MessageSendResult**: Contains information about a sent message, including timestamp and platform-specific context 3. **DestinationType**: Generic type representing the destination for a message (e.g., webhook URL, channel) 4. **MessageContextType**: Generic type for platform-specific message context +## Contributing a New Integration + +### Prerequisites + +1. Understand Elementary's message blocks (see `message_body.py`) +2. Check if your platform's message format is already supported in `elementary/messages/formats/` +3. Review existing implementations (e.g., Teams with Adaptive Cards) for reference + +### Step 1: Message Format + +If your platform's message format is not yet supported: + +1. Create a new format module in `elementary/messages/formats/{format_name}.py` +2. Implement conversion of all Elementary message blocks: + ```python + # Required message blocks to support: + - HeaderBlock: Title/heading formatting + - CodeBlock: Code snippets with optional syntax highlighting + - DividerBlock: Visual separator + - LinesBlock: Plain text content + - FactListBlock: Key-value pairs + - ExpandableBlock: Collapsible sections + ``` +3. Add tests in `tests/unit/messages/formats/` + +See existing implementations for reference: + +- `adaptive_cards.py` - Microsoft Teams Adaptive Cards +- `block_kit.py` - Slack Block Kit + +### Step 2: Messaging Integration + +Once the message format is ready: + +1. Create `{platform_name}.py` in this directory + +2. Define your destination and context types: + + - **Destination**: Where to send the message (e.g., webhook URL, channel ID, user ID) + - **MessageContext**: Information needed to identify a sent message for replies (e.g., message ID, thread ID) + - Both should be Pydantic models with appropriate fields for your platform + +3. Implement the integration class: + + - Extend `BaseMessagingIntegration[YourDestination, YourContext]` + - Implement `send_message()`: Convert message format and send to platform + - Implement `supports_reply()`: Return True only if your platform supports replies + - Implement `reply_to_message()` if supported: Use message context to reply + +4. Add error handling: + + - Use exceptions from `exceptions.py` + - Handle platform-specific errors + - Provide clear error messages + +5. Add support in the integrations factory: + + - Update `elementary/monitor/data_monitoring/alerts/integrations/integrations.py` + - Add your integration to `get_integration()` method + - Add destination creation to `get_destination()` method + +6. Add configuration support: + - Add your platform's configuration to `Config` class + - Support both CLI arguments and config file input + - Make sure to get all required information from users to create your destination type + - See Teams implementation for reference (webhook URL configuration) + ## Migration Strategy The system currently supports both: @@ -49,7 +120,7 @@ To add a new messaging platform integration: ## Current Implementations -- **Teams**: Uses the new `BaseMessagingIntegration` system with webhook support +- **Teams**: Uses the new `BaseMessagingIntegration` system with webhook support and Adaptive Cards format - **Slack**: Currently uses the legacy `BaseIntegration` system (planned for migration) ## Future Improvements diff --git a/elementary/monitor/data_monitoring/alerts/integrations/README.md b/elementary/monitor/data_monitoring/alerts/integrations/README.md index c8aaa8ce7..08d3ae433 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/README.md +++ b/elementary/monitor/data_monitoring/alerts/integrations/README.md @@ -1,5 +1,9 @@ # Integration contribution guide +> **DEPRECATED**: This guide describes the legacy integration system. For new integrations, please refer to the [Messaging Integrations Guide](../../messages/messaging_integrations/README.md) which describes the new `BaseMessagingIntegration` system. + +The content below is kept for reference while existing integrations are migrated to the new system. + First of all, thank you for contributing a new integration to Elementary! We appreciate it :) This guide is meant to make this process easier for you and make the review process faster. diff --git a/elementary/monitor/data_monitoring/alerts/integrations/teams/__init__.py b/elementary/monitor/data_monitoring/alerts/integrations/teams/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/elementary/monitor/data_monitoring/alerts/integrations/teams/message_builder.py b/elementary/monitor/data_monitoring/alerts/integrations/teams/message_builder.py deleted file mode 100644 index 9a8238dd0..000000000 --- a/elementary/monitor/data_monitoring/alerts/integrations/teams/message_builder.py +++ /dev/null @@ -1,20 +0,0 @@ -from pymsteams import cardsection, potentialaction # type: ignore - -from elementary.clients.teams.client import TeamsClient - - -class TeamsAlertMessageBuilder: - def __init__(self, client: TeamsClient) -> None: - self.client = client - - def title(self, title: str): - self.client.title(title) - - def text(self, text: str): - self.client.text(text) - - def addSection(self, section: cardsection): - self.client.addSection(section) - - def addPotentialAction(self, action: potentialaction): - self.client.addPotentialAction(action) diff --git a/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py b/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py deleted file mode 100644 index fb0c329eb..000000000 --- a/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py +++ /dev/null @@ -1,707 +0,0 @@ -import json -from datetime import datetime, timedelta -from typing import Dict, List, Optional, Sequence, Union - -import pandas as pd -from pymsteams import cardsection, potentialaction # type: ignore - -from elementary.clients.teams.client import TeamsClient -from elementary.config.config import Config -from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts -from elementary.monitor.alerts.model_alert import ModelAlertModel -from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel -from elementary.monitor.alerts.test_alert import TestAlertModel -from elementary.monitor.data_monitoring.alerts.integrations.base_integration import ( - BaseIntegration, -) -from elementary.monitor.data_monitoring.alerts.integrations.teams.message_builder import ( - TeamsAlertMessageBuilder, -) -from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import ( - ReportLinkData, -) -from elementary.tracking.tracking_interface import Tracking -from elementary.utils.json_utils import ( - list_of_lists_of_strings_to_comma_delimited_unique_strings, -) -from elementary.utils.log import get_logger -from elementary.utils.strings import prettify_and_dedup_list - -logger = get_logger(__name__) - -TABLE_FIELD = "table" -COLUMN_FIELD = "column" -DESCRIPTION_FIELD = "description" -OWNERS_FIELD = "owners" -TAGS_FIELD = "tags" -SUBSCRIBERS_FIELD = "subscribers" -RESULT_MESSAGE_FIELD = "result_message" -TEST_PARAMS_FIELD = "test_parameters" -TEST_QUERY_FIELD = "test_query" -TEST_RESULTS_SAMPLE_FIELD = "test_results_sample" -DEFAULT_ALERT_FIELDS = [ - TABLE_FIELD, - COLUMN_FIELD, - DESCRIPTION_FIELD, - OWNERS_FIELD, - TAGS_FIELD, - SUBSCRIBERS_FIELD, - RESULT_MESSAGE_FIELD, - TEST_PARAMS_FIELD, - TEST_QUERY_FIELD, - TEST_RESULTS_SAMPLE_FIELD, -] - -STATUS_DISPLAYS: Dict[str, Dict] = { - "fail": {"display_name": "Failure"}, - "warn": {"display_name": "Warning"}, - "error": {"display_name": "Error"}, -} - - -class TeamsIntegration(BaseIntegration): - def __init__( - self, - config: Config, - tracking: Optional[Tracking] = None, - override_config_defaults=False, - *args, - **kwargs, - ) -> None: - self.config = config - self.tracking = tracking - self.override_config_defaults = override_config_defaults - super().__init__() - - # Enforce typing - self.client: TeamsClient - self.message_builder = TeamsAlertMessageBuilder(self.client) - - def _initial_client(self, *args, **kwargs) -> TeamsClient: - teams_client = TeamsClient.create_client( - config=self.config, tracking=self.tracking - ) - if not teams_client: - raise Exception("Could not create a Teams client") - return teams_client - - @staticmethod - def _get_alert_sub_title( - alert: Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - ], - ) -> str: - subtitle = "**" - subtitle += f"Status: {alert.status}" - if alert.suppression_interval: - subtitle += f" | Time: {alert.detected_at_str}" - subtitle += ( - f" | Suppression interval: {alert.suppression_interval} hours" - ) - else: - subtitle += f" | {alert.detected_at_str}" - subtitle += "**" - - return subtitle - - @staticmethod - def _get_potential_action(reportlink: ReportLinkData): - action = potentialaction(reportlink.text) - action.addOpenURI( - reportlink.text, - [{"os": "default", "uri": reportlink.url}], - ) - return action - - @staticmethod - def _get_section(title: str, text: str): - section = cardsection() - section.activityTitle(title) - section.activityText(text) - return section - - def _add_report_link_if_applicable( - self, - alert: Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - GroupedByTableAlerts, - ], - ): - report_link = alert.get_report_link() - if report_link: - action = self._get_potential_action(report_link) - self.message_builder.addPotentialAction(action) - - def _add_table_field_section_if_applicable(self, alert: TestAlertModel): - if TABLE_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - self.message_builder.addSection( - self._get_section("*Table*", f"_{alert.table_full_name}_") - ) - - def _add_column_field_section_if_applicable(self, alert: TestAlertModel): - if COLUMN_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - self.message_builder.addSection( - self._get_section("*Column*", f'_{alert.column_name or "No column"}_') - ) - - def _add_tags_field_section_if_applicable( - self, - alert: Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - ], - ): - if TAGS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - tags = prettify_and_dedup_list(alert.tags or []) - self.message_builder.addSection( - self._get_section("*Tags*", f'_{tags or "No tags"}_') - ) - - def _add_owners_field_section_if_applicable( - self, - alert: Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - ], - ): - if OWNERS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - owners = prettify_and_dedup_list(alert.owners or []) - self.message_builder.addSection( - self._get_section("*Owners*", f'_{owners or "No owners"}_') - ) - - def _add_subscribers_field_section_if_applicable( - self, - alert: Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - ], - ): - if SUBSCRIBERS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - subscribers = prettify_and_dedup_list(alert.subscribers or []) - self.message_builder.addSection( - self._get_section( - "*Subscribers*", f'_{subscribers or "No subscribers"}_' - ) - ) - - def _add_description_field_section_if_applicable(self, alert: TestAlertModel): - if DESCRIPTION_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - self.message_builder.addSection( - self._get_section( - "*Description*", f'_{alert.test_description or "No description"}_' - ) - ) - - def _add_result_message_field_section_if_applicable( - self, - alert: Union[ - TestAlertModel, - ModelAlertModel, - ], - ): - message = None - if RESULT_MESSAGE_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS): - if isinstance(alert, ModelAlertModel): - if alert.message: - message = alert.message.strip() - elif isinstance(alert, TestAlertModel): - if alert.error_message: - message = alert.error_message.strip() - if not message: - message = "No result message" - self.message_builder.addSection( - self._get_section("*Result message*", f"_{message}_") - ) - - def _add_test_query_field_section_if_applicable(self, alert: TestAlertModel): - # This lacks logic to handle the case where the message is too long - if ( - TEST_QUERY_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS) - and alert.test_results_query - ): - self.message_builder.addSection( - self._get_section( - "*Test query*", f"```{alert.test_results_query.strip()}" - ) - ) - - def _add_test_params_field_section_if_applicable(self, alert: TestAlertModel): - if ( - TEST_PARAMS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS) - and alert.test_params - ): - self.message_builder.addSection( - self._get_section("*Test parameters*", f"```{alert.test_params}```") - ) - - def _add_test_results_sample_field_section_if_applicable( - self, alert: TestAlertModel - ): - if TEST_RESULTS_SAMPLE_FIELD in ( - alert.alert_fields or DEFAULT_ALERT_FIELDS - ) and (alert.test_rows_sample or alert.test_type == "anomaly_detection"): - if alert.test_type == "anomaly_detection": - anomalous_value = alert.other - if alert.column_name: - message = f"*Column*: {alert.column_name} | *Anomalous Values*: {anomalous_value}" - else: - message = f"*Anomalous Values*: {anomalous_value}" - else: - df = pd.DataFrame(alert.test_rows_sample) - message = df.to_markdown(index=False) - self.message_builder.addSection( - self._get_section("*Test results sample*", f"{message}") - ) - - def _get_dbt_test_template(self, alert: TestAlertModel, *args, **kwargs): - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - subtitle = self._get_alert_sub_title(alert) - - self._add_report_link_if_applicable(alert) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - - self._add_table_field_section_if_applicable(alert) - self._add_column_field_section_if_applicable(alert) - self._add_tags_field_section_if_applicable(alert) - self._add_owners_field_section_if_applicable(alert) - self._add_subscribers_field_section_if_applicable(alert) - self._add_description_field_section_if_applicable(alert) - self._add_result_message_field_section_if_applicable(alert) - self._add_test_results_sample_field_section_if_applicable(alert) - self._add_test_query_field_section_if_applicable(alert) - self._add_test_params_field_section_if_applicable(alert) - - def _get_elementary_test_template(self, alert: TestAlertModel, *args, **kwargs): - if alert.test_type == "schema_change": - title = f"{alert.summary}" - else: - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - - subtitle = self._get_alert_sub_title(alert) - - self._add_report_link_if_applicable(alert) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - - self._add_table_field_section_if_applicable(alert) - self._add_column_field_section_if_applicable(alert) - self._add_tags_field_section_if_applicable(alert) - self._add_owners_field_section_if_applicable(alert) - self._add_subscribers_field_section_if_applicable(alert) - self._add_description_field_section_if_applicable(alert) - self._add_result_message_field_section_if_applicable(alert) - self._add_test_results_sample_field_section_if_applicable(alert) - self._add_test_params_field_section_if_applicable(alert) - - def _get_model_template(self, alert: ModelAlertModel, *args, **kwargs): - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - subtitle = self._get_alert_sub_title(alert) - - self._add_report_link_if_applicable(alert) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - self._add_tags_field_section_if_applicable(alert) - self._add_owners_field_section_if_applicable(alert) - self._add_subscribers_field_section_if_applicable(alert) - self._add_result_message_field_section_if_applicable(alert) - - if alert.materialization: - self.message_builder.addSection( - self._get_section( - "*Materialization*", f"`{str(alert.materialization)}`" - ) - ) - if alert.full_refresh: - self.message_builder.addSection( - self._get_section("*Full refresh*", f"`{alert.full_refresh}`") - ) - if alert.path: - self.message_builder.addSection( - self._get_section("*Path*", f"`{alert.path}`") - ) - - def _get_snapshot_template(self, alert: ModelAlertModel, *args, **kwargs): - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - subtitle = self._get_alert_sub_title(alert) - - self._add_report_link_if_applicable(alert) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - - self._add_tags_field_section_if_applicable(alert) - self._add_owners_field_section_if_applicable(alert) - self._add_subscribers_field_section_if_applicable(alert) - self._add_result_message_field_section_if_applicable(alert) - - if alert.original_path: - self.message_builder.addSection( - self._get_section("*Path*", f"`{alert.original_path}`") - ) - - def _get_source_freshness_template( - self, alert: SourceFreshnessAlertModel, *args, **kwargs - ): - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - subtitle = self._get_alert_sub_title(alert) - - self._add_report_link_if_applicable(alert) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - - self._add_tags_field_section_if_applicable(alert) - self._add_owners_field_section_if_applicable(alert) - self._add_subscribers_field_section_if_applicable(alert) - - if alert.freshness_description: - self.message_builder.addSection( - self._get_section( - "*Description*", - f'_{alert.freshness_description or "No description"}_', - ) - ) - - if alert.status == "runtime error": - self.message_builder.addSection( - self._get_section( - "*Result message*", - f"Failed to calculate the source freshness\n```{alert.error}```", - ) - ) - else: - self.message_builder.addSection( - self._get_section( - "*Result message*", f"```{alert.result_description}```" - ) - ) - - if alert.status != "runtime error": - self.message_builder.addSection( - self._get_section( - "*Time Elapsed*", - f"{timedelta(seconds=alert.max_loaded_at_time_ago_in_s) if alert.max_loaded_at_time_ago_in_s else 'N/A'}", - ) - ) - - if alert.status != "runtime error": - self.message_builder.addSection( - self._get_section("*Last Record At*", f"{alert.max_loaded_at}") - ) - - if alert.status != "runtime error": - self.message_builder.addSection( - self._get_section("*Sampled At*", f"{alert.snapshotted_at_str}") - ) - - if alert.error_after: - self.message_builder.addSection( - self._get_section("*Error after*", f"`{alert.error_after}`") - ) - - if alert.error_after: - self.message_builder.addSection( - self._get_section("*Warn after*", f"`{alert.warn_after}`") - ) - - if alert.error_after: - self.message_builder.addSection( - self._get_section("*Filter*", f"`{alert.filter}`") - ) - - if alert.path: - self.message_builder.addSection( - self._get_section("*Path*", f"`{alert.path}`") - ) - - def _get_group_by_table_template( - self, alert: GroupedByTableAlerts, *args, **kwargs - ): - alerts = alert.alerts - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - subtitle = "" - - if alert.model_errors: - subtitle = ( - subtitle - + (" | " + f"😵 Model errors: {len(alert.model_errors)}") - if subtitle - else f"😵 Model errors: {len(alert.model_errors)}" - ) - if alert.test_failures: - subtitle = ( - subtitle - + (" | " + f"🔺 Test failures: {len(alert.test_failures)}") - if subtitle - else f"🔺 Test failures: {len(alert.test_failures)}" - ) - if alert.test_warnings: - subtitle = ( - subtitle - + (" | " + f"⚠ Test warnings: {len(alert.test_warnings)}") - if subtitle - else f"⚠ Test warnings: {len(alert.test_warnings)}" - ) - if alert.test_errors: - subtitle = ( - subtitle + (" | " + f"❗ Test errors: {len(alert.test_errors)}") - if subtitle - else f"❗ Test errors: {len(alert.test_errors)}" - ) - - self._add_report_link_if_applicable(alert) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - - tags = list_of_lists_of_strings_to_comma_delimited_unique_strings( - [alert.tags or [] for alert in alerts] - ) - owners = list_of_lists_of_strings_to_comma_delimited_unique_strings( - [alert.owners or [] for alert in alerts] - ) - subscribers = list_of_lists_of_strings_to_comma_delimited_unique_strings( - [alert.subscribers or [] for alert in alerts] - ) - - self.message_builder.addSection( - self._get_section("*Tags*", f'_{tags if tags else "No tags"}_') - ) - self.message_builder.addSection( - self._get_section("*Owners*", f'_{owners if owners else "No owners"}_') - ) - self.message_builder.addSection( - self._get_section( - "*Subscribers*", f'_{subscribers if subscribers else "No subscribers"}_' - ) - ) - - if alert.model_errors: - section = cardsection() - section.activityTitle("*Model errors*") - section.activitySubtitle( - f"{self._get_model_error_block_header(alert.model_errors)}" - ) - section.activityText( - f"{self._get_model_error_block_body(alert.model_errors)}" - ) - self.message_builder.addSection(section) - - if alert.test_failures: - rows = [alert.concise_name for alert in alert.test_failures] - text = "
".join([f"🔺 {row}" for row in rows]) - self.message_builder.addSection( - self._get_section("*Test failures*", f"{text}") - ) - - if alert.test_warnings: - rows = [alert.concise_name for alert in alert.test_warnings] - text = "
".join([f"⚠ {row}" for row in rows]) - self.message_builder.addSection( - self._get_section("*Test warnings*", f"{text}") - ) - - if alert.test_errors: - rows = [alert.concise_name for alert in alert.test_errors] - text = "
".join([f"❗ {row}" for row in rows]) - self.message_builder.addSection( - self._get_section("*Test errors*", f"{text}") - ) - - def _get_sub_group_detailed_section( - self, - alerts: Sequence[ - Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - GroupedByTableAlerts, - ], - ], - sub_title: str, - bullet_icon: str, - ) -> cardsection: - formatted_sub_title = f"*{sub_title}*" - rows = [] - for alert in alerts: - row = f"{bullet_icon} {alert.summary}" - if report_link := alert.get_report_link(): - link = f'{report_link.text}' - row = f"{row} - {link}" - rows.append(row) - text = "
".join(rows) - return self._get_section(formatted_sub_title, text) - - def _get_alerts_group_template(self, alert: AlertsGroup, *args, **kwargs): # type: ignore[override] - title = f"{self._get_display_name(alert.status)}: {alert.summary}" - - subtitle = "" - if alert.model_errors: - subtitle = ( - subtitle - + (" | " + f"😵 Model errors: {len(alert.model_errors)}") - if subtitle - else f"😵 Model errors: {len(alert.model_errors)}" - ) - if alert.test_failures: - subtitle = ( - subtitle - + (" | " + f"🔺 Test failures: {len(alert.test_failures)}") - if subtitle - else f"🔺 Test failures: {len(alert.test_failures)}" - ) - if alert.test_warnings: - subtitle = ( - subtitle - + (" | " + f"⚠ Test warnings: {len(alert.test_warnings)}") - if subtitle - else f"⚠ Test warnings: {len(alert.test_warnings)}" - ) - if alert.test_errors: - subtitle = ( - subtitle + (" | " + f"❗ Test errors: {len(alert.test_errors)}") - if subtitle - else f"❗ Test errors: {len(alert.test_errors)}" - ) - - self.message_builder.title(title) - self.message_builder.text(subtitle) - - if alert.model_errors: - self.message_builder.addSection( - self._get_sub_group_detailed_section( - alerts=alert.model_errors, - sub_title="Model errors", - bullet_icon="😵", - ) - ) - - if alert.test_failures: - self.message_builder.addSection( - self._get_sub_group_detailed_section( - alerts=alert.test_failures, - sub_title="Test failures", - bullet_icon="🔺", - ) - ) - - if alert.test_warnings: - self.message_builder.addSection( - self._get_sub_group_detailed_section( - alerts=alert.test_warnings, - sub_title="Test warnings", - bullet_icon="⚠", - ) - ) - - if alert.test_errors: - self.message_builder.addSection( - self._get_sub_group_detailed_section( - alerts=alert.test_errors, - sub_title="Test errors", - bullet_icon="❗", - ) - ) - - def _get_fallback_template( - self, - alert: Union[ - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - GroupedByTableAlerts, - AlertsGroup, - ], - *args, - **kwargs, - ): - # Since the title can never be truncated and the text can be truncated by Teams, I think it is good to have a title + text in the fallback template - self.message_builder.title( - "Oops, we failed to format the alert ! -_-' Please share this with the Elementary team via or a issue." - ) - self.message_builder.text(f"```{json.dumps(alert.data, indent=2)}") - - def _get_test_message_template(self, *args, **kwargs): - self.message_builder.title("This is a test message generated by Elementary!") - self.message_builder.text( - f"Elementary monitor ran successfully on {datetime.now().strftime('%Y-%m-%d %H:%M')}" - ) - - def send_alert( - self, - alert: Union[ # type: ignore[override] - TestAlertModel, - ModelAlertModel, - SourceFreshnessAlertModel, - GroupedByTableAlerts, - AlertsGroup, - ], - *args, - **kwargs, - ) -> bool: - try: - logger.debug("Sending alert via Teams.") - self._get_alert_template(alert) - sent_successfully = self.client.send_message() - except Exception as e: - logger.error( - f"Unable to send alert via Teams: {e}\nSending fallback template." - ) - sent_successfully = False - - if not sent_successfully: - try: - self._get_fallback_template(alert) - fallback_sent_successfully = self.client.send_message() - except Exception as e: - logger.error(f"Unable to send alert fallback via Teams: {e}") - fallback_sent_successfully = False - sent_successfully = fallback_sent_successfully - # Resetting the client so that it does not cache the message of other alerts - self.client = self._initial_client() - self.message_builder = TeamsAlertMessageBuilder(self.client) - - return sent_successfully - - @staticmethod - def _get_display_name(alert_status: Optional[str]) -> str: - if alert_status is None: - return "Unknown" - return STATUS_DISPLAYS.get(alert_status, {}).get("display_name", alert_status) - - @staticmethod - def _get_model_error_block_header( - model_error_alerts: List[ModelAlertModel], - ) -> List: - if len(model_error_alerts) == 0: - return [] - result = [] - for model_error_alert in model_error_alerts: - if model_error_alert.message: - result.extend(["*Result message*"]) - return result - - @staticmethod - def _get_model_error_block_body(model_error_alerts: List[ModelAlertModel]) -> str: - if len(model_error_alerts) == 0: - return "" - for model_error_alert in model_error_alerts: - if model_error_alert.message: - return f"```{model_error_alert.message.strip()}```" - return "" - - def send_test_message(self, *args, **kwargs) -> bool: - self._get_test_message_template() - return self.client.send_message() diff --git a/pyproject.toml b/pyproject.toml index bc232e5db..8f01f9755 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,6 @@ networkx = ">=2.3,<3" packaging = ">=20.9" azure-storage-blob = ">=12.11.0" pymsteams = ">=0.2.2,<1.0.0" -pandas = ">=2.0.0" numpy = "<2.0.0" tabulate = ">= 0.9.0"