Skip to content

Commit 37beadd

Browse files
authored
Merge pull request #67 from elementary-data/redshift
Redshift
2 parents 9d7a937 + ec32436 commit 37beadd

File tree

8 files changed

+79
-106
lines changed

8 files changed

+79
-106
lines changed

monitor/alerts.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
from exceptions.exceptions import InvalidAlertType
44
from utils.time import convert_utc_time_to_local_time
5+
from datetime import datetime
56

67

78
class Alert(object):
@@ -11,9 +12,10 @@ def __init__(self, alert_id) -> None:
1112
self.alert_id = alert_id
1213

1314
@staticmethod
14-
def create_alert_from_row(alert_row: list) -> 'Alert':
15+
def create_alert_from_row(alert_row: dict) -> 'Alert':
1516
alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type, \
16-
alert_description = alert_row
17+
alert_description = alert_row.values()
18+
detected_at = datetime.fromisoformat(detected_at)
1719
if alert_type == 'schema_change':
1820
return SchemaChangeAlert(alert_id, database_name, schema_name, table_name, detected_at, sub_type,
1921
alert_description)

monitor/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def monitor(ctx, config_dir, profiles_dir, update_dbt_package, full_refresh_dbt_
7070
anonymous_tracking = AnonymousTracking(config)
7171
track_cli_start(anonymous_tracking, 'monitor', get_cli_properties(), ctx.command.name)
7272
try:
73-
data_monitoring = DataMonitoring.create_data_monitoring(config)
73+
data_monitoring = DataMonitoring(config)
7474
data_monitoring.run(update_dbt_package, full_refresh_dbt_package)
7575
track_cli_end(anonymous_tracking, 'monitor', data_monitoring.properties(), ctx.command.name)
7676
except Exception as exc:

monitor/data_monitoring.py

Lines changed: 23 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import os
2-
from typing import Any
3-
from exceptions.exceptions import ConfigError
42
from monitor.alerts import Alert
53
from monitor.dbt_runner import DbtRunner
64
from config.config import Config
7-
from utils.dbt import get_snowflake_client, get_bigquery_client
85
from utils.log import get_logger
9-
from google.cloud import bigquery
6+
import json
7+
from alive_progress import alive_it
108

119
logger = get_logger(__name__)
1210
FILE_DIR = os.path.dirname(__file__)
@@ -20,40 +18,43 @@ class DataMonitoring(object):
2018
DBT_PROJECT_MODULES_PATH = os.path.join(DBT_PROJECT_PATH, 'dbt_modules', DBT_PACKAGE_NAME)
2119
DBT_PROJECT_PACKAGES_PATH = os.path.join(DBT_PROJECT_PATH, 'dbt_packages', DBT_PACKAGE_NAME)
2220

23-
def __init__(self, config: Config, db_connection: Any) -> None:
21+
def __init__(self, config: Config) -> None:
2422
self.config = config
2523
self.dbt_runner = DbtRunner(self.DBT_PROJECT_PATH, self.config.profiles_dir)
26-
self.db_connection = db_connection
2724
self.execution_properties = {}
2825

29-
@staticmethod
30-
def create_data_monitoring(config: Config) -> 'DataMonitoring':
31-
if config.platform == 'snowflake':
32-
snowflake_conn = get_snowflake_client(config.credentials, server_side_binding=False)
33-
return SnowflakeDataMonitoring(config, snowflake_conn)
34-
elif config.platform == 'bigquery':
35-
bigquery_client = get_bigquery_client(config.credentials)
36-
return BigQueryDataMonitoring(config, bigquery_client)
37-
else:
38-
raise ConfigError("Unsupported platform")
39-
4026
def _dbt_package_exists(self) -> bool:
4127
return os.path.exists(self.DBT_PROJECT_PACKAGES_PATH) or os.path.exists(self.DBT_PROJECT_MODULES_PATH)
4228

43-
def _run_query(self, query: str, params: tuple = None) -> list:
44-
pass
29+
@staticmethod
30+
def _split_list_to_chunks(items: list, chunk_size: int = 50) -> [list]:
31+
chunk_list = []
32+
for i in range(0, len(items), chunk_size):
33+
chunk_list.append(items[i: i + chunk_size])
34+
return chunk_list
4535

4636
def _update_sent_alerts(self, alert_ids) -> None:
47-
pass
37+
alert_ids_chunks = self._split_list_to_chunks(alert_ids)
38+
for alert_ids_chunk in alert_ids_chunks:
39+
self.dbt_runner.run_operation(macro_name='update_sent_alerts',
40+
macro_args={'alert_ids': alert_ids_chunk},
41+
json_logs=False)
4842

4943
def _query_alerts(self) -> list:
50-
pass
44+
json_alert_rows = self.dbt_runner.run_operation(macro_name='get_new_alerts')
45+
self.execution_properties['alert_rows'] = len(json_alert_rows)
46+
alerts = []
47+
for json_alert_row in json_alert_rows:
48+
alert_row = json.loads(json_alert_row)
49+
alerts.append(Alert.create_alert_from_row(alert_row))
50+
return alerts
5151

5252
def _send_to_slack(self, alerts: [Alert]) -> None:
5353
slack_webhook = self.config.slack_notification_webhook
5454
if slack_webhook is not None:
5555
sent_alerts = []
56-
for alert in alerts:
56+
alerts_with_progress_bar = alive_it(alerts, title="Sending alerts")
57+
for alert in alerts_with_progress_bar:
5758
alert.send_to_slack(slack_webhook, self.config.is_slack_workflow)
5859
sent_alerts.append(alert.id)
5960

@@ -132,82 +133,4 @@ def properties(self):
132133
return data_monitoring_properties
133134

134135

135-
class SnowflakeDataMonitoring(DataMonitoring):
136-
SELECT_NEW_ALERTS_QUERY = """
137-
SELECT alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type,
138-
alert_description
139-
FROM ALERTS
140-
WHERE alert_sent = FALSE;
141-
"""
142-
143-
UPDATE_SENT_ALERTS = """
144-
UPDATE ALERTS set alert_sent = TRUE
145-
WHERE alert_id IN (%s);
146-
"""
147-
148-
def __init__(self, config: 'Config', db_connection: Any):
149-
super().__init__(config, db_connection)
150-
151-
def _run_query(self, query: str, params: tuple = None) -> list:
152-
with self.db_connection.cursor() as cursor:
153-
if params is not None:
154-
cursor.execute(query, params)
155-
else:
156-
cursor.execute(query)
157-
158-
results = cursor.fetchall()
159-
return results
160-
161-
def _update_sent_alerts(self, alert_ids) -> None:
162-
results = self._run_query(self.UPDATE_SENT_ALERTS, (alert_ids,))
163-
logger.debug(f"Updated sent alerts -\n{str(results)}")
164-
165-
def _query_alerts(self) -> list:
166-
alert_rows = self._run_query(self.SELECT_NEW_ALERTS_QUERY)
167-
self.execution_properties['alert_rows'] = len(alert_rows)
168-
alerts = []
169-
for alert_row in alert_rows:
170-
alerts.append(Alert.create_alert_from_row(alert_row))
171-
return alerts
172-
173-
174-
class BigQueryDataMonitoring(DataMonitoring):
175-
SELECT_NEW_ALERTS_QUERY = """
176-
SELECT alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type,
177-
alert_description
178-
FROM {dataset}.alerts
179-
WHERE alert_sent = FALSE;
180-
"""
181-
182-
UPDATE_SENT_ALERTS = """
183-
UPDATE {dataset}.alerts set alert_sent = TRUE
184-
WHERE alert_id IN UNNEST(@alert_ids);
185-
"""
186-
187-
def __init__(self, config: 'Config', db_connection: Any):
188-
super().__init__(config, db_connection)
189-
190-
def _run_query(self, query: str, params: list = None) -> list:
191-
if params is not None:
192-
job_config = bigquery.QueryJobConfig(
193-
query_parameters=params
194-
)
195-
job = self.db_connection.query(query, job_config=job_config)
196-
else:
197-
job = self.db_connection.query(query)
198-
199-
return list(job.result())
200-
201-
def _update_sent_alerts(self, alert_ids) -> None:
202-
params = [bigquery.ArrayQueryParameter("alert_ids", "STRING", alert_ids)]
203-
results = self._run_query(self.UPDATE_SENT_ALERTS.format(dataset=self.config.credentials.schema), params)
204-
logger.debug(f"Updated sent alerts -\n{str(results)}")
205-
206-
def _query_alerts(self) -> list:
207-
alert_rows = self._run_query(self.SELECT_NEW_ALERTS_QUERY.format(dataset=self.config.credentials.schema))
208-
self.execution_properties['alert_rows'] = len(alert_rows)
209-
alerts = []
210-
for alert_row in alert_rows:
211-
alerts.append(Alert.create_alert_from_row(alert_row))
212-
return alerts
213136

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{% macro get_alerts_time_limit(days_back=31) %}
2+
{% set today = dbt_utils.date_trunc('day', dbt_utils.current_timestamp()) %}
3+
{% set datetime_limit = dbt_utils.dateadd('day', days_back * -1, today) %}
4+
{{ return(elementary.cast_as_timestamp(datetime_limit)) }}
5+
{% endmacro %}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{% macro get_new_alerts() %}
2+
-- depends_on: {{ ref('alerts') }}
3+
{% set current_date = dbt_utils.date_trunc('day', dbt_utils.current_timestamp()) %}
4+
{% set select_new_alerts_query %}
5+
SELECT alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type,
6+
alert_description
7+
FROM {{ ref('alerts') }}
8+
WHERE alert_sent = FALSE and detected_at >= {{ get_alerts_time_limit() }}
9+
{% endset %}
10+
{% set results = run_query(select_new_alerts_query) %}
11+
{% set new_alerts = [] %}
12+
{% for result in results %}
13+
{% set new_alert_dict = {'alert_id': result[0],
14+
'detected_at': result[1].isoformat(),
15+
'database_name': result[2],
16+
'schema_name': result[3],
17+
'table_name': result[4],
18+
'column_name': result[5],
19+
'alert_type': result[6],
20+
'sub_type': result[7],
21+
'alert_description': result[8]} %}
22+
{% set new_alert_json = tojson(new_alert_dict) %}
23+
{% do elementary.edr_log(new_alert_json) %}
24+
{% endfor %}
25+
{% endmacro %}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{% macro update_sent_alerts(alert_ids) %}
2+
-- depends_on: {{ ref('alerts') }}
3+
{% if alert_ids %}
4+
{% set update_sent_alerts_query %}
5+
UPDATE {{ ref('alerts') }} set alert_sent = TRUE
6+
WHERE alert_id IN {{ elementary.strings_list_to_tuple(alert_ids) }} and alert_sent = FALSE and
7+
detected_at >= {{ get_alerts_time_limit() }}
8+
{% endset %}
9+
{% set results = dbt_utils.get_query_results_as_dict(update_sent_alerts_query) %}
10+
{% do elementary.edr_log(results) %}
11+
{% endif %}
12+
{% endmacro %}

monitor/dbt_project/packages.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
packages:
2+
- package: dbt-labs/dbt_utils
3+
version: 0.7.6
24
- package: elementary-data/elementary
3-
version: 0.3.5
5+
version: 0.3.11

monitor/dbt_runner.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,12 @@ def snapshot(self) -> bool:
5050
success, _ = self._run_command(['snapshot'])
5151
return success
5252

53-
def run_operation(self, macro_name, json_logs=True) -> list:
54-
success, command_output = self._run_command(['run-operation', macro_name], json_logs)
53+
def run_operation(self, macro_name: str, json_logs: bool = True, macro_args: dict = None) -> list:
54+
command_args = ['run-operation', macro_name]
55+
if macro_args is not None:
56+
json_args = json.dumps(macro_args)
57+
command_args.extend(['--args', json_args])
58+
success, command_output = self._run_command(command_args, json_logs)
5559
run_operation_results = []
5660
if json_logs:
5761
json_messages = command_output.splitlines()

0 commit comments

Comments
 (0)