Skip to content

Commit ca0c288

Browse files
committed
dbt-fusion fixes
1 parent b173ab8 commit ca0c288

File tree

5 files changed

+55
-45
lines changed

5 files changed

+55
-45
lines changed

macros/edr/dbt_artifacts/upload_dbt_artifacts.sql

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@
5151
order by metadata_hash
5252
{% endset %}
5353
{% set artifacts_hashes_results = elementary.run_query(stored_artifacts_query) %}
54-
{% set artifact_agate_hashes = artifacts_hashes_results.group_by("artifacts_model") %}
55-
{% set artifacts_hashes = {} %}
56-
{% for artifacts_model, metadata_hashes in artifact_agate_hashes.items() %}
57-
{% do artifacts_hashes.update({artifacts_model: metadata_hashes.columns["metadata_hash"]}) %}
54+
{% set artifacts_hashes = elementary.agate_to_dicts(artifacts_hashes_results) %}
55+
56+
{% set artifacts_hashes_per_model = {} %}
57+
{% for artifacts_hashes_row in artifacts_hashes %}
58+
{% do artifacts_hashes_per_model.setdefault(artifacts_hashes_row['artifacts_model'], []) %}
59+
{% do artifacts_hashes_per_model[artifacts_hashes_row['artifacts_model']].append(artifacts_hashes_row['metadata_hash']) %}
5860
{% endfor %}
59-
{% do return(artifacts_hashes) %}
61+
62+
{% do return(artifacts_hashes_per_model) %}
6063
{% endmacro %}
6164

6265
{% macro get_artifacts_hashes_for_model(model_name) %}

macros/edr/dbt_artifacts/upload_dbt_models.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
{% set flatten_model_metadata_dict = {
6969
'unique_id': node_dict.get('unique_id'),
7070
'alias': node_dict.get('alias'),
71-
'checksum': checksum_dict.get('checksum'),
71+
'checksum': checksum_dict.get('checksum') if checksum_dict is mapping else none,
7272
'materialization': config_dict.get('materialized'),
7373
'tags': elementary.filter_none_and_sort(tags),
7474
'meta': meta_dict,

macros/edr/dbt_artifacts/upload_dbt_tests.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@
188188
{% set test_type = 'generic' %}
189189
{%- if test_namespace == 'dbt_expectations' -%}
190190
{% set test_type = 'expectation' %}
191-
{%- elif 'tests/generic' in test_path or 'macros/' in test_path -%}
191+
{%- elif 'tests/generic' in test_path or 'macros/' in test_path or 'generic_tests/' in test_path -%}
192192
{% set test_type = 'generic' %}
193193
{%- elif 'tests/' in test_path -%}
194194
{% set test_type = 'singular' %}

macros/edr/tests/on_run_end/handle_tests_results.sql

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,34 +28,38 @@
2828
{% macro get_result_enriched_elementary_test_results(cached_elementary_test_results, cached_elementary_test_failed_row_counts, render_result_rows=false) %}
2929
{% set elementary_test_results = [] %}
3030

31-
{% for result in results | selectattr('node.resource_type', '==', 'test') %}
32-
{% set result = elementary.get_run_result_dict(result) %}
33-
{% set elementary_test_results_rows = cached_elementary_test_results.get(result.node.unique_id) %}
34-
{% set elementary_test_failed_row_count = cached_elementary_test_failed_row_counts.get(result.node.unique_id) %}
35-
36-
{# Materializing the test failed and therefore was not added to the cache. #}
37-
{% if not elementary_test_results_rows %}
38-
{% set flattened_test = elementary.flatten_test(result.node) %}
39-
{% set elementary_test_results_rows = [elementary.get_dbt_test_result_row(flattened_test)] %}
40-
{% endif %}
41-
42-
{% for elementary_test_results_row in elementary_test_results_rows %}
43-
{% set failures = elementary_test_results_row.get("failures", result.failures) %}
44-
45-
{# For Elementary anomaly tests, we actually save more than one result per test, in that case the dbt status will be "fail"
46-
even if one such result failed and the rest succeeded. To handle this, we make sure to mark the status as "pass" for these
47-
results if the number of failed rows is 0.
48-
We don't want to do this for every test though - because otherwise it can break configurations like warn_if=0 #}
49-
{% set status = "pass" if failures == 0 and elementary_test_results_row.get("test_type") == "anomaly_detection" else result.status %}
50-
51-
{% do elementary_test_results_row.update({'status': status, 'failures': failures, 'invocation_id': invocation_id,
52-
'failed_row_count': elementary_test_failed_row_count}) %}
53-
{% do elementary_test_results_row.setdefault('test_results_description', result.message) %}
54-
{% if render_result_rows %}
55-
{% do elementary_test_results_row.update({"result_rows": elementary.render_result_rows(elementary_test_results_row.result_rows)}) %}
31+
{% for result in results %}
32+
{% set node = elementary.get_node(result.unique_id) %}
33+
{% if node.resource_type == 'test' %}
34+
{% do print('result is' ~ result | tojson) %}
35+
{% set result = elementary.get_run_result_dict(result) %}
36+
{% set elementary_test_results_rows = cached_elementary_test_results.get(result.node.unique_id) %}
37+
{% set elementary_test_failed_row_count = cached_elementary_test_failed_row_counts.get(result.node.unique_id) %}
38+
39+
{# Materializing the test failed and therefore was not added to the cache. #}
40+
{% if not elementary_test_results_rows %}
41+
{% set flattened_test = elementary.flatten_test(node) %}
42+
{% set elementary_test_results_rows = [elementary.get_dbt_test_result_row(flattened_test)] %}
5643
{% endif %}
57-
{% do elementary_test_results.append(elementary_test_results_row) %}
58-
{% endfor %}
44+
45+
{% for elementary_test_results_row in elementary_test_results_rows %}
46+
{% set failures = elementary_test_results_row.get("failures", result.failures) %}
47+
48+
{# For Elementary anomaly tests, we actually save more than one result per test, in that case the dbt status will be "fail"
49+
even if one such result failed and the rest succeeded. To handle this, we make sure to mark the status as "pass" for these
50+
results if the number of failed rows is 0.
51+
We don't want to do this for every test though - because otherwise it can break configurations like warn_if=0 #}
52+
{% set status = "pass" if failures == 0 and elementary_test_results_row.get("test_type") == "anomaly_detection" else result.status %}
53+
54+
{% do elementary_test_results_row.update({'status': status, 'failures': failures, 'invocation_id': invocation_id,
55+
'failed_row_count': elementary_test_failed_row_count}) %}
56+
{% do elementary_test_results_row.setdefault('test_results_description', result.message) %}
57+
{% if render_result_rows %}
58+
{% do elementary_test_results_row.update({"result_rows": elementary.render_result_rows(elementary_test_results_row.result_rows)}) %}
59+
{% endif %}
60+
{% do elementary_test_results.append(elementary_test_results_row) %}
61+
{% endfor %}
62+
{% endif %}
5963
{% endfor %}
6064
6165
{% do return(elementary_test_results) %}
@@ -174,13 +178,15 @@
174178
{% macro pop_test_result_rows(elementary_test_results) %}
175179
{% set result_rows = [] %}
176180
{% for test_result in elementary_test_results %}
177-
{% for result_row in test_result.pop('result_rows', []) %}
178-
{% do result_rows.append({
179-
"elementary_test_results_id": test_result.id,
180-
"detected_at": test_result.detected_at,
181-
"result_row": result_row
182-
}) %}
183-
{% endfor %}
181+
{% if 'result_rows' in test_result %}
182+
{% for result_row in test_result.pop('result_rows') %}
183+
{% do result_rows.append({
184+
"elementary_test_results_id": test_result.id,
185+
"detected_at": test_result.detected_at,
186+
"result_row": result_row
187+
}) %}
188+
{% endfor %}
189+
{% endif %}
184190
{% endfor %}
185191
{% do return(result_rows) %}
186192
{% endmacro %}

macros/utils/graph/get_result_node.sql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
{% macro get_result_node(identifier, package_name='elementary') %}
22
{% for result in results %}
3-
{% if result.node.identifier == identifier %}
3+
{% set node = elementary.get_node(result.unique_id) %}
4+
{% if node.identifier == identifier %}
45
{% if package_name %}
5-
{% if result.node.package_name == package_name %}
6-
{{ return(result.node) }}
6+
{% if node.package_name == package_name %}
7+
{{ return(node) }}
78
{% endif %}
89
{% else %}
9-
{{ return(result.node) }}
10+
{{ return(node) }}
1011
{% endif %}
1112
{% endif %}
1213
{% endfor %}

0 commit comments

Comments
 (0)