Skip to content

Commit e72c8da

Browse files
committed
Added bigquery support and bumped package version
1 parent 207fddc commit e72c8da

File tree

3 files changed

+73
-27
lines changed

3 files changed

+73
-27
lines changed

monitor/alerts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def __init__(self, alert_id, database_name, schema_name, table_name, detected_at
5252
super().__init__(alert_id)
5353
self.table_name = '.'.join([database_name, schema_name, table_name]).lower()
5454
self.detected_at = convert_utc_time_to_local_time(detected_at).strftime('%Y-%m-%d %H:%M:%S')
55-
self.description = description[0].upper() + description[1:].lower()
55+
self.description = description[0].upper() + description[1:].lower() if description else ''
5656
self.change_type = ' '.join([word[0].upper() + word[1:] for word in sub_type.split('_')])
5757

5858
def to_slack_message(self) -> dict:
@@ -114,7 +114,7 @@ def __init__(self, alert_id, database_name, schema_name, table_name, detected_at
114114
super().__init__(alert_id)
115115
self.table_name = '.'.join([database_name, schema_name, table_name]).lower()
116116
self.detected_at = convert_utc_time_to_local_time(detected_at).strftime('%Y-%m-%d %H:%M:%S')
117-
self.description = description[0].upper() + description[1:].lower()
117+
self.description = description if description else ''
118118
self.anomaly_type = ' '.join([word[0].upper() + word[1:] for word in sub_type.split('_')])
119119

120120
def to_slack_message(self) -> dict:

monitor/data_monitoring.py

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import os
22
from typing import Any
3-
import snowflake.connector.errors
43
from exceptions.exceptions import ConfigError
54
from monitor.alerts import Alert
65
from monitor.dbt_runner import DbtRunner
76
from config.config import Config
8-
from utils.dbt import get_snowflake_client
7+
from utils.dbt import get_snowflake_client, get_bigquery_client
98
from utils.log import get_logger
9+
from google.cloud import bigquery
1010

1111
logger = get_logger(__name__)
1212
FILE_DIR = os.path.dirname(__file__)
@@ -20,20 +20,6 @@ class DataMonitoring(object):
2020
DBT_PROJECT_MODULES_PATH = os.path.join(DBT_PROJECT_PATH, 'dbt_modules', DBT_PACKAGE_NAME)
2121
DBT_PROJECT_PACKAGES_PATH = os.path.join(DBT_PROJECT_PATH, 'dbt_packages', DBT_PACKAGE_NAME)
2222

23-
SELECT_NEW_ALERTS_QUERY = """
24-
SELECT alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type,
25-
alert_description
26-
FROM ALERTS
27-
WHERE alert_sent = FALSE;
28-
"""
29-
30-
UPDATE_SENT_ALERTS = """
31-
UPDATE ALERTS set alert_sent = TRUE
32-
WHERE alert_id IN (%s);
33-
"""
34-
35-
COUNT_ROWS_QUERY = None
36-
3723
def __init__(self, config: Config, db_connection: Any) -> None:
3824
self.config = config
3925
self.dbt_runner = DbtRunner(self.DBT_PROJECT_PATH, self.config.profiles_dir)
@@ -45,6 +31,9 @@ def create_data_monitoring(config: Config) -> 'DataMonitoring':
4531
if config.platform == 'snowflake':
4632
snowflake_conn = get_snowflake_client(config.credentials, server_side_binding=False)
4733
return SnowflakeDataMonitoring(config, snowflake_conn)
34+
elif config.platform == 'bigquery':
35+
bigquery_client = get_bigquery_client(config.credentials)
36+
return BigQueryDataMonitoring(config, bigquery_client)
4837
else:
4938
raise ConfigError("Unsupported platform")
5039

@@ -55,16 +44,10 @@ def _run_query(self, query: str, params: tuple = None) -> list:
5544
pass
5645

5746
def _update_sent_alerts(self, alert_ids) -> None:
58-
results = self._run_query(self.UPDATE_SENT_ALERTS, (alert_ids,))
59-
logger.debug(f"Updated sent alerts -\n{str(results)}")
47+
pass
6048

6149
def _query_alerts(self) -> list:
62-
alert_rows = self._run_query(self.SELECT_NEW_ALERTS_QUERY)
63-
self.execution_properties['alert_rows'] = len(alert_rows)
64-
alerts = []
65-
for alert_row in alert_rows:
66-
alerts.append(Alert.create_alert_from_row(alert_row))
67-
return alerts
50+
pass
6851

6952
def _send_to_slack(self, alerts: [Alert]) -> None:
7053
slack_webhook = self.config.slack_notification_webhook
@@ -150,6 +133,18 @@ def properties(self):
150133

151134

152135
class SnowflakeDataMonitoring(DataMonitoring):
136+
SELECT_NEW_ALERTS_QUERY = """
137+
SELECT alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type,
138+
alert_description
139+
FROM ALERTS
140+
WHERE alert_sent = FALSE;
141+
"""
142+
143+
UPDATE_SENT_ALERTS = """
144+
UPDATE ALERTS set alert_sent = TRUE
145+
WHERE alert_id IN (%s);
146+
"""
147+
153148
def __init__(self, config: 'Config', db_connection: Any):
154149
super().__init__(config, db_connection)
155150

@@ -163,5 +158,56 @@ def _run_query(self, query: str, params: tuple = None) -> list:
163158
results = cursor.fetchall()
164159
return results
165160

161+
def _update_sent_alerts(self, alert_ids) -> None:
162+
results = self._run_query(self.UPDATE_SENT_ALERTS, (alert_ids,))
163+
logger.debug(f"Updated sent alerts -\n{str(results)}")
164+
165+
def _query_alerts(self) -> list:
166+
alert_rows = self._run_query(self.SELECT_NEW_ALERTS_QUERY)
167+
self.execution_properties['alert_rows'] = len(alert_rows)
168+
alerts = []
169+
for alert_row in alert_rows:
170+
alerts.append(Alert.create_alert_from_row(alert_row))
171+
return alerts
172+
166173

174+
class BigQueryDataMonitoring(DataMonitoring):
175+
SELECT_NEW_ALERTS_QUERY = """
176+
SELECT alert_id, detected_at, database_name, schema_name, table_name, column_name, alert_type, sub_type,
177+
alert_description
178+
FROM {dataset}.alerts
179+
WHERE alert_sent = FALSE;
180+
"""
181+
182+
UPDATE_SENT_ALERTS = """
183+
UPDATE {dataset}.alerts set alert_sent = TRUE
184+
WHERE alert_id IN UNNEST(@alert_ids);
185+
"""
186+
187+
def __init__(self, config: 'Config', db_connection: Any):
188+
super().__init__(config, db_connection)
189+
190+
def _run_query(self, query: str, params: list = None) -> list:
191+
if params is not None:
192+
job_config = bigquery.QueryJobConfig(
193+
query_parameters=params
194+
)
195+
job = self.db_connection.query(query, job_config=job_config)
196+
else:
197+
job = self.db_connection.query(query)
198+
199+
return list(job.result())
200+
201+
def _update_sent_alerts(self, alert_ids) -> None:
202+
params = [bigquery.ArrayQueryParameter("alert_ids", "STRING", alert_ids)]
203+
results = self._run_query(self.UPDATE_SENT_ALERTS.format(dataset=self.config.credentials.schema), params)
204+
logger.debug(f"Updated sent alerts -\n{str(results)}")
205+
206+
def _query_alerts(self) -> list:
207+
alert_rows = self._run_query(self.SELECT_NEW_ALERTS_QUERY.format(dataset=self.config.credentials.schema))
208+
self.execution_properties['alert_rows'] = len(alert_rows)
209+
alerts = []
210+
for alert_row in alert_rows:
211+
alerts.append(Alert.create_alert_from_row(alert_row))
212+
return alerts
167213

monitor/dbt_project/packages.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
packages:
22
- package: elementary-data/elementary
3-
version: 0.3.1
3+
version: 0.3.3

0 commit comments

Comments
 (0)