diff --git a/contessa/destination.py b/contessa/destination.py new file mode 100644 index 0000000..e00f498 --- /dev/null +++ b/contessa/destination.py @@ -0,0 +1,87 @@ +import logging +from typing import List, Text, Type + +from datadog.dogstatsd.base import DEFAULT_HOST, DEFAULT_PORT + +from contessa.db import Connector +from contessa.models import DQBase, QualityCheck +from datadog import DogStatsd + + +class Destination: + def __init__(self): + pass + + def ensure_destination(self, dq_class: Type[DQBase]) -> None: + pass + + def persist(self, items: List[DQBase]) -> None: + pass + + +class DBDestination(Destination): + def __init__(self, conn_uri_or_engine: Text): + super().__init__() + self.conn_uri_or_engine = conn_uri_or_engine + self.conn = Connector(conn_uri_or_engine) + + def ensure_destination(self, quality_check_class: Type[DQBase]) -> None: + self.conn.ensure_table(quality_check_class.__table__) + + def persist(self, items: List[DQBase]) -> None: + self.conn.upsert(items) + + +class StatsDDestination(Destination): + def __init__( + self, + statsd_prefix: Text, + statsd_host: Text = DEFAULT_HOST, + statsd_port: int = DEFAULT_PORT, + ): + super().__init__() + self.statsd = DogStatsd(host=statsd_host, port=statsd_port) + self.statsd_prefix = statsd_prefix.rstrip(".") + + def ensure_destination(self, quality_check_class: Type[DQBase]) -> None: + pass + + def persist(self, items: List[DQBase]) -> None: + for item in items: + if isinstance(item, QualityCheck): + self.persist_quality_check(item) + else: + logging.warning( + f"Not persisted. Reason: no handler for {type(item)} type. Skipping..." + ) + + def persist_quality_check(self, item: QualityCheck) -> None: + tags = [ + f"rule_name:{item.rule_name}", + f"rule_type:{item.rule_type}", + f"attribute:{item.attribute}", + ] + + self.statsd.increment( + f"{self.statsd_prefix}.total_records", item.total_records, tags=tags, + ) + + self.statsd.increment( + f"{self.statsd_prefix}.failed_records", item.failed, tags=tags, + ) + + self.statsd.increment( + f"{self.statsd_prefix}.passed_records", item.passed, tags=tags, + ) + + self.statsd.gauge( + f"{self.statsd_prefix}.failed_records_percentage", + item.failed_percentage, + tags=tags, + ) + + self.statsd.gauge( + f"{self.statsd_prefix}.passed_records_percentage", + item.passed_percentage, + tags=tags, + ) diff --git a/contessa/executor.py b/contessa/executor.py index 05e989c..a3be6f5 100644 --- a/contessa/executor.py +++ b/contessa/executor.py @@ -4,6 +4,7 @@ from typing import Dict import pandas as pd +from pybigquery.sqlalchemy_bigquery import BigQueryDialect from contessa.db import Connector from contessa.models import Table @@ -79,8 +80,13 @@ def compose_where_time_filter(self, rule): past = ( self.context["task_ts"] - timedelta(days=each["days"]) ).strftime("%Y-%m-%d %H:%M:%S UTC") + timestamp_transformer = ( + "::timestamptz" + if not isinstance(self.conn.engine.dialect, BigQueryDialect) + else "" + ) result.append( - f"""{each["column"]} BETWEEN '{past}'::timestamptz AND '{present}'::timestamptz""" + f"""{each["column"]} BETWEEN '{past}'{timestamp_transformer} AND '{present}'{timestamp_transformer}""" ) return " AND ".join(result) diff --git a/contessa/runner.py b/contessa/runner.py index b958f0b..cc4e27c 100644 --- a/contessa/runner.py +++ b/contessa/runner.py @@ -3,6 +3,7 @@ from datetime import datetime +from contessa.destination import Destination, DBDestination from contessa.base_rules import Rule from contessa.db import Connector from contessa.executor import get_executor, refresh_executors @@ -27,26 +28,38 @@ def run( check_table: Dict, result_table: Dict, # todo - docs for quality name, maybe defaults.. context: Optional[Dict] = None, + destinations: List[Destination] = None, ): check_table = Table(**check_table) result_table = ResultTable(**result_table, model_cls=self.model_cls) context = self.get_context(check_table, context) + destinations = ( + destinations + if destinations is not None + else [DBDestination(self.conn_uri_or_engine)] + ) normalized_rules = self.normalize_rules(raw_rules) refresh_executors(check_table, self.conn, context) quality_check_class = self.get_quality_check_class(result_table) - self.conn.ensure_table(quality_check_class.__table__) + + _ = [ + destination.ensure_destination(quality_check_class) + for destination in destinations + ] rules = self.build_rules(normalized_rules) objs = self.do_quality_checks(quality_check_class, rules, context) - self.conn.upsert(objs) + _ = [destination.persist(objs) for destination in destinations] @staticmethod def get_context(check_table: Table, context: Optional[Dict] = None) -> Dict: """ Construct context to pass to executors. User context overrides defaults. """ + if context is None: + context = {} ctx_defaults = { "table_fullname": check_table.fullname, "task_ts": datetime.now(), # todo - is now() ok ? diff --git a/requirements.in b/requirements.in index b405a95..057d8ae 100644 --- a/requirements.in +++ b/requirements.in @@ -1,8 +1,9 @@ +datadog sqlalchemy>=1.2 psycopg2-binary>=2.7 pandas Jinja2 -pybigquery +pybigquery==0.4.13 alembic click packaging \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b5290f6..e8458de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +# +# This file is autogenerated by pip-compile # To update, run: # # pip-compile --output-file=requirements.txt requirements.in @@ -7,6 +9,8 @@ cachetools==3.1.1 # via google-auth certifi==2019.9.11 # via requests chardet==3.0.4 # via requests click==7.0 +datadog==0.34.0 +decorator==4.4.1 # via datadog future==0.18.1 # via pybigquery google-api-core==1.14.2 # via google-cloud-core google-auth==1.6.3 # via google-api-core @@ -25,16 +29,16 @@ protobuf==3.10.0 # via google-api-core, google-cloud-bigquery, googleap psycopg2-binary==2.8.3 pyasn1-modules==0.2.6 # via google-auth pyasn1==0.4.7 # via pyasn1-modules, rsa -pybigquery==0.4.11 +pybigquery==0.4.13 pyparsing==2.4.5 # via packaging -python-dateutil==2.8.0 # via alembic, pandas +python-dateutil==2.8.0 # via pandas python-editor==1.0.4 # via alembic pytz==2019.2 # via google-api-core, pandas -requests==2.22.0 # via google-api-core +requests==2.22.0 # via datadog, google-api-core rsa==4.0 # via google-auth six==1.12.0 # via google-api-core, google-auth, google-resumable-media, packaging, protobuf, python-dateutil sqlalchemy==1.3.6 urllib3==1.25.6 # via requests # The following packages are considered to be unsafe in a requirements file: -# setuptools==41.6.0 # via google-api-core, protobuf +# setuptools==45.2.0 # via google-api-core, protobuf