diff --git a/soda/dbt/soda/cloud/dbt.py b/soda/dbt/soda/cloud/dbt.py index 6f51da1c9..a9debdf25 100644 --- a/soda/dbt/soda/cloud/dbt.py +++ b/soda/dbt/soda/cloud/dbt.py @@ -2,25 +2,13 @@ import json from collections import defaultdict +from dataclasses import dataclass from functools import reduce from operator import or_ from pathlib import Path -from typing import Any, Iterator +from typing import Any import requests -from dbt.contracts.graph.compiled import ( - CompiledGenericTestNode, - CompiledModelNode, - CompiledSeedNode, -) -from dbt.contracts.graph.parsed import ( - ParsedGenericTestNode, - ParsedModelNode, - ParsedSeedNode, - ParsedSourceDefinition, -) -from dbt.contracts.results import RunResultOutput -from dbt.node_types import NodeType from requests.structures import CaseInsensitiveDict from soda.cloud.dbt_config import DbtCloudConfig from soda.cloud.soda_cloud import SodaCloud @@ -33,6 +21,48 @@ from soda.sodacl.dbt_check_cfg import DbtCheckCfg +@dataclass +class DbtNodeType: + Model = "model" + Test = "test" # renamed to 'data_test'; preserved as 'test' here for back-compat + Seed = "seed" + Source = "source" + + +@dataclass +class DbtTestStatus: + Skipped = "skipped" + Error = "error" + Fail = "fail" + Warn = "warn" + Pass = "pass" + + +@dataclass +class DbtDataNode: + name: str | None = None + database: str | None = None + schema: str | None = None + resource_type: str | None = None + + +@dataclass +class DbtTestNode: + name: str | None = None + unique_id: str | None = None + original_file_path: str | None = None + column_name: str | None = None + compiled_code: str | None = None + depends_on: list[str] | None = None + + +@dataclass +class DbtRunResult: + status: str | None = None + failures: int | None = None + unique_id: str | None = None + + class DbtCloud: def __init__( self, @@ -51,32 +81,30 @@ def __init__( self.dbt_run_results = dbt_run_results self.dbt_cloud_run_id = dbt_cloud_run_id self.dbt_cloud_job_id = dbt_cloud_job_id + self.checks: list[DbtCheck] | None = None def ingest(self): return_code = 0 - if self.dbt_artifacts or self.dbt_manifest or self.dbt_run_results: + if self.dbt_artifacts or (self.dbt_manifest and self.dbt_run_results): self.scan._logs.info("Ingesting local dbt artifacts.") if self.dbt_artifacts: - dbt_manifest = self.dbt_artifacts / "manifest.json" - dbt_run_results = self.dbt_artifacts / "run_results.json" + self.dbt_manifest = self.dbt_artifacts / "manifest.json" + self.dbt_run_results = self.dbt_artifacts / "run_results.json" - if not dbt_manifest or not dbt_manifest.is_file(): + if not self.dbt_manifest or not self.dbt_manifest.is_file(): raise ValueError( - f"dbt manifest ({dbt_manifest}) or artifacts ({self.dbt_artifacts}) " + f"dbt manifest ({self.dbt_manifest}) or artifacts ({self.dbt_artifacts}) " "should point to an existing path." ) - elif dbt_run_results is None or not dbt_run_results.is_file(): + elif self.dbt_run_results is None or not self.dbt_run_results.is_file(): raise ValueError( - f"dbt run results ({dbt_run_results}) or artifacts ({self.dbt_artifacts}) " + f"dbt run results ({self.dbt_run_results}) or artifacts ({self.dbt_artifacts}) " "should point to an existing path." ) - manifest, run_results = self._load_dbt_artifacts( - dbt_manifest, - dbt_run_results, - ) + manifest, run_results = self._load_dbt_artifacts(self.dbt_manifest, self.dbt_run_results) else: self.scan._logs.info("Getting dbt artifacts from dbt Cloud.") @@ -101,19 +129,22 @@ def ingest(self): ) check_results_iterator = self._map_dbt_test_results_iterator(manifest, run_results) + self.checks = check_results_iterator self.flush_test_results( check_results_iterator, self.scan._configuration.soda_cloud, ) - return return_code def flush_test_results(self, checks: list[Check], soda_cloud: SodaCloud) -> None: if len(checks) != 0: scan_results = self.build_scan_results(checks) scan_results["type"] = "sodaCoreInsertScanResults" - soda_cloud._execute_command(scan_results, command_name="send_scan_results") + if self.scan._is_local: + self.scan._logs.info("Skipping sending scan results to Soda Cloud, because scan is local") + else: + soda_cloud._execute_command(scan_results, command_name="send_scan_results") def build_scan_results(self, checks): check_dicts = [check.get_cloud_dict() for check in checks] @@ -130,16 +161,13 @@ def build_scan_results(self, checks): "hasFailures": self.scan.has_check_fails(), "metrics": [{"identity": "dbt_metric", "metricName": "dbt_metric", "value": 0}], "checks": check_dicts, - "logs": [log.get_cloud_dict() for log in self.scan._logs.logs], - "sourceOwner": "soda-core", + "logs": self.scan._logs.get_cloud_dict(), + "sourceOwner": "soda-library", } ) - def _load_dbt_artifacts( - self, - manifest_file: Path, - run_results_file: Path, - ) -> tuple[dict, dict]: + @staticmethod + def _load_dbt_artifacts(manifest_file: Path, run_results_file: Path) -> tuple[dict, dict]: with manifest_file.open("r") as file: manifest = json.load(file) with run_results_file.open("r") as file: @@ -159,29 +187,21 @@ def _download_dbt_manifest_and_run_results( return manifest, run_results - def _map_dbt_test_results_iterator( - self, manifest: dict, run_results: dict - ) -> Iterator[tuple[Dataset, list[DbtCheck]]]: - model_nodes, seed_nodes, test_nodes, source_nodes = self._parse_manifest(manifest) - parsed_run_results = self._parse_run_results(run_results) - + def _map_dbt_test_results_iterator(self, manifest: dict, run_results: dict) -> list[DbtCheck]: + model_nodes, seed_nodes, source_nodes, test_nodes = self._parse_manifest(manifest) model_seed_and_source_nodes = {**model_nodes, **seed_nodes, **source_nodes} - models_with_tests = self._create_nodes_to_tests_mapping( - model_seed_and_source_nodes, test_nodes, parsed_run_results - ) + parsed_run_results = self._parse_run_results(run_results) + + models_with_tests = self._create_nodes_to_tests_mapping(test_nodes, parsed_run_results) soda_checks = self._dbt_run_results_to_soda_checks(test_nodes, parsed_run_results) + checks = [] for unique_id, test_unique_ids in models_with_tests.items(): node = model_seed_and_source_nodes[unique_id] - dataset = Dataset( - node.name if isinstance(node, ParsedSourceDefinition) else node.alias, - node.database, - node.schema, - ) - + dataset = Dataset(name=node.name, schema=node.schema, database=node.database) for test_unique_id in test_unique_ids: - check: DbtCheck = soda_checks[test_unique_id] + check = soda_checks[test_unique_id] check.dataset = dataset check.check_cfg.table_name = dataset.name checks.append(check) @@ -191,15 +211,14 @@ def _map_dbt_test_results_iterator( def _dbt_run_results_to_soda_checks( self, test_nodes: dict[str, DbtTestNode] | None, - run_results: list[RunResultOutput], - ) -> dict[str, list[Check]]: + run_results: list[DbtRunResult], + ) -> dict[str, DbtCheck]: """Maps dbt run results to Soda Checks. Returns lists of Checks keyed by dbt run results.""" - from dbt.contracts.results import TestStatus - - assert ( - test_nodes is not None - ), "No test nodes were retrieved from the manifest.json. This could be because no tests have been implemented in dbt yet or you never ran `dbt test`." + assert test_nodes is not None, ( + "No test nodes found in manifest.json. This could be because no test was implemented" + "in dbt yet or you never ran `dbt test`." + ) checks = {} for run_result in run_results: @@ -214,19 +233,27 @@ def _dbt_run_results_to_soda_checks( column_name=test_node.column_name, ), identity=test_node.unique_id, - expression=test_node.compiled_sql if hasattr(test_node, "compiled_sql") else None, + expression=test_node.compiled_code if hasattr(test_node, "compiled_code") else None, ) check.data_source_scan = self.scan._get_or_create_data_source_scan(self.scan._data_source_name) - if run_result.status == TestStatus.Pass: + + # get check outcome + if run_result.status == DbtTestStatus.Pass: check.outcome = CheckOutcome.PASS - elif run_result.status == TestStatus.Warn: + elif run_result.status == DbtTestStatus.Warn: check.outcome = CheckOutcome.WARN - else: + elif run_result.status == DbtTestStatus.Fail: check.outcome = CheckOutcome.FAIL - # check.add_outcome_reason(outcome_type="dbt", ) - # values={"failures": run_result.failures}, - take this into diagnostics? + else: + # let outcome be unknown if test had error or was skipped + check.outcome = None + + # try to get check value + if run_result.failures is not None: + check.check_value = run_result.failures checks[run_result.unique_id] = check + return checks def _download_dbt_artifact_from_cloud( @@ -290,90 +317,109 @@ def _get_latest_run_id(self, api_token: str, account_id: str, job_id: str) -> st return run_id def _parse_manifest(self, manifest: dict[str, Any]) -> tuple[ - dict[str, ParsedModelNode | CompiledModelNode] | None, - dict[str, ParsedSeedNode | CompiledSeedNode] | None, - dict[str, ParsedGenericTestNode | CompiledGenericTestNode] | None, - dict[str, ParsedSourceDefinition] | None, + dict[str, DbtDataNode] | None, + dict[str, DbtDataNode] | None, + dict[str, DbtDataNode] | None, + dict[str, DbtTestNode] | None, ]: - """ - Parse the manifest. - - Only V6 manifest is supported. + test_nodes = {} + seed_nodes = {} + model_nodes = {} + source_nodes = {} - https://docs.getdbt.com/reference/artifacts/manifest-json - """ if manifest.get("nodes") is not None: - model_nodes = { - node_name: CompiledModelNode(**node) if "compiled" in node.keys() else ParsedModelNode(**node) - for node_name, node in manifest["nodes"].items() - if node["resource_type"] == NodeType.Model - } - seed_nodes = { - node_name: CompiledSeedNode(**node) if "compiled" in node.keys() else ParsedSeedNode(**node) - for node_name, node in manifest["nodes"].items() - if node["resource_type"] == NodeType.Seed - } - - test_nodes = {} for node_name, node in manifest["nodes"].items(): - if node["resource_type"] == NodeType.Test: - if "test_metadata" in node.keys(): - if "compiled" in node.keys(): - node = CompiledGenericTestNode(**node) - else: - node = ParsedGenericTestNode(**node) - test_nodes[node_name] = node - else: - self.scan._logs.info(f"Ignoring unsupported test node '{node_name}'. Missing 'test_metadata'.") + if node["resource_type"] == DbtNodeType.Test: + try: + test_nodes[node_name] = DbtTestNode( + name=node["name"], + unique_id=node["unique_id"], + original_file_path=node["original_file_path"], + column_name=node.get("column_name", node.get("kwargs", {}).get("column_name")), + depends_on=[ + node["attached_node"] if "attached_node" in node else node["depends_on"]["nodes"][-1] + ], + compiled_code=node.get("compiled_code", node.get("compiled_sql")), + ) + except Exception as e: + self.scan._logs.debug(f"Skipping test node {node_name} due to parsing error: {e}") + elif node["resource_type"] == DbtNodeType.Model: + model_nodes[node_name] = DbtDataNode( + name=node["name"], + database=node["database"], + schema=node["schema"], + resource_type=node["resource_type"], + ) + elif node["resource_type"] == DbtNodeType.Seed: + seed_nodes[node_name] = DbtDataNode( + name=node["name"], + database=node["database"], + schema=node["schema"], + resource_type=node["resource_type"], + ) else: self.scan._logs.debug(f"Ignoring unsupported node type '{node['resource_type']}', {node_name}") - else: model_nodes = None seed_nodes = None test_nodes = None if manifest.get("sources") is not None: - source_nodes: dict | None = { - source_name: ParsedSourceDefinition(**source) - for source_name, source in manifest["sources"].items() - if source["resource_type"] == NodeType.Source - } + for source_name, source in manifest["sources"].items(): + if source["resource_type"] == DbtNodeType.Source: + source_nodes[source_name] = DbtDataNode( + name=source["name"], + database=source["database"], + schema=source["schema"], + resource_type=source["resource_type"], + ) else: source_nodes = None - return model_nodes, seed_nodes, test_nodes, source_nodes + return model_nodes, seed_nodes, source_nodes, test_nodes - def _parse_run_results(self, run_results: dict[str, Any]) -> list[RunResultOutput]: + @staticmethod + def _parse_run_results(run_results: dict[str, Any]) -> list[DbtRunResult]: """ Parse the run results. - - Only V4 run results is supported. - https://docs.getdbt.com/reference/artifacts/run-results-json """ - parsed_run_results = [RunResultOutput(**result) for result in run_results["results"]] - - self._all_test_failures_are_not_none(parsed_run_results) + parsed_run_results = [] + if run_results.get("results") is not None: + for result in run_results["results"]: + if result["status"] == DbtTestStatus.Fail: + default_failures = 1 + elif result["status"] == DbtTestStatus.Pass: + default_failures = 0 + else: + default_failures = None + + parsed_run_results.append( + DbtRunResult( + unique_id=result["unique_id"], + status=result["status"], + failures=result.get("failures", default_failures), + ) + ) + # self._all_test_failures_are_not_none(parsed_run_results) return parsed_run_results + @staticmethod def _create_nodes_to_tests_mapping( - self, - model_nodes: dict[str, ParsedModelNode], - test_nodes: dict[str, CompiledGenericTestNode | ParsedGenericTestNode] | None, - run_results: list[RunResultOutput], - ) -> dict[str, set[ParsedModelNode]]: - assert ( - test_nodes is not None - ), "No test nodes found in manifest.json. This could be because no test was implemented in dbt yet" - + test_nodes: dict[str, DbtTestNode] | None, + run_results: list[DbtRunResult], + ) -> dict[str, set[str]]: + assert test_nodes is not None, ( + "No test nodes found in manifest.json. This could be because no test was implemented" + "in dbt yet or you never ran `dbt test`." + ) test_unique_ids = [ run_result.unique_id for run_result in run_results if run_result.unique_id in test_nodes.keys() ] models_that_tests_depends_on = { - test_unique_id: set(test_nodes[test_unique_id].depends_on["nodes"]) for test_unique_id in test_unique_ids + test_unique_id: set(test_nodes[test_unique_id].depends_on) for test_unique_id in test_unique_ids } model_unique_ids = {} @@ -385,16 +431,14 @@ def _create_nodes_to_tests_mapping( models_with_tests = defaultdict(set) for model_unique_id in model_unique_ids: - for ( - test_unique_id, - model_unique_ids_of_test, - ) in models_that_tests_depends_on.items(): + for test_unique_id, model_unique_ids_of_test in models_that_tests_depends_on.items(): if model_unique_id in model_unique_ids_of_test: models_with_tests[model_unique_id].add(test_unique_id) return models_with_tests - def _all_test_failures_are_not_none(self, run_results: list[RunResultOutput]) -> bool: + @staticmethod + def _all_test_failures_are_not_none(run_results: list[DbtRunResult]) -> bool: results_with_null_failures = [] for run_result in run_results: if run_result.failures is None: @@ -403,12 +447,13 @@ def _all_test_failures_are_not_none(self, run_results: list[RunResultOutput]) -> if len(results_with_null_failures) == len(run_results): raise ValueError( "Could not find a valid test result in the run results. " - "This is often the case when ingesting from dbt Cloud where the last step in the " + "This sometimes happens when ingesting from dbt Cloud where the last step in the " "job was neither a `dbt build` or `dbt test`. For example, your run may have terminated with " "`dbt docs generate` \n" - "We are currently investigating this with the dbt Cloud team. \n" - "In the meantime, if your jobs do not end on the above mentioned commands, you could make sure to add at least a `dbt test` " - "step as your last step and make sure that 'generate documentation' is not turned on in your job definition." + "In the meantime, if your jobs do not end on the above mentioned commands, you could make sure to add " + "at least a `dbt test`" + "step as your last step and make sure that 'generate documentation' is not turned on in your job " + "definition." ) else: return True diff --git a/soda/dbt/soda/execution/check/dbt_check.py b/soda/dbt/soda/execution/check/dbt_check.py index 2afb3267f..c6e98f02a 100644 --- a/soda/dbt/soda/execution/check/dbt_check.py +++ b/soda/dbt/soda/execution/check/dbt_check.py @@ -14,7 +14,10 @@ def __init__(self, check_cfg: DbtCheckCfg, identity: str, expression: str | None self.cloud_check_type = "generic" def get_cloud_diagnostics_dict(self) -> dict: - return {"value": 0} + cloud_diagnostics = { + "value": self.check_value if hasattr(self, "check_value") else 0, + } + return cloud_diagnostics def evaluate(self, metrics: dict[str, Metric], historic_values: dict[str, object]): # Not much to evaluate since this is done by dbt @@ -32,7 +35,7 @@ def get_cloud_dict(self): }, "name": self.name, "type": self.cloud_check_type, - "definition": self.check_cfg.name, + "definition": self.expression if self.expression is not None else self.check_cfg.name, "location": { "filePath": self.check_cfg.file_path, "line": 0, diff --git a/soda/dbt/soda/sodacl/dbt_check_cfg.py b/soda/dbt/soda/sodacl/dbt_check_cfg.py index a3ecb0fd9..3f2072923 100644 --- a/soda/dbt/soda/sodacl/dbt_check_cfg.py +++ b/soda/dbt/soda/sodacl/dbt_check_cfg.py @@ -4,7 +4,7 @@ class DbtCheckCfg(CheckCfg): - def __init__(self, name: str, file_path: str, column_name: str, table_name: str | None = None): + def __init__(self, name: str, file_path: str, column_name: str | None, table_name: str | None = None): self.name = name self.column_name = column_name self.file_path = file_path