Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions integration_tests/tests/dbt_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def test(
materialization: str = "table", # Only relevant if as_model=True
test_vars: Optional[dict] = None,
elementary_enabled: bool = True,
model_config: Optional[Dict[str, Any]] = None,
*,
multiple_results: Literal[False] = False,
) -> Dict[str, Any]:
Expand All @@ -128,6 +129,7 @@ def test(
materialization: str = "table", # Only relevant if as_model=True
test_vars: Optional[dict] = None,
elementary_enabled: bool = True,
model_config: Optional[Dict[str, Any]] = None,
*,
multiple_results: Literal[True],
) -> List[Dict[str, Any]]:
Expand All @@ -146,6 +148,7 @@ def test(
materialization: str = "table", # Only relevant if as_model=True
test_vars: Optional[dict] = None,
elementary_enabled: bool = True,
model_config: Optional[Dict[str, Any]] = None,
*,
multiple_results: bool = False,
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
Expand All @@ -161,6 +164,9 @@ def test(
test_args = test_args or {}
table_yaml: Dict[str, Any] = {"name": test_id}

if model_config:
table_yaml.update(model_config)

if columns:
table_yaml["columns"] = columns

Expand Down
110 changes: 110 additions & 0 deletions integration_tests/tests/test_sampling_pii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import json

import pytest
from dbt_project import DbtProject

COLUMN_NAME = "value"


SAMPLES_QUERY = """
with latest_elementary_test_result as (
select id
from {{{{ ref("elementary_test_results") }}}}
where lower(table_name) = lower('{test_id}')
order by created_at desc
limit 1
)

select result_row
from {{{{ ref("test_result_rows") }}}}
where elementary_test_results_id in (select * from latest_elementary_test_result)
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify the query handles edge cases properly.

The SAMPLES_QUERY looks correct but should handle potential edge cases:

  • What if multiple tests exist for the same table_name?
  • Consider adding ORDER BY for deterministic results

🏁 Script executed:

#!/bin/bash
# Verify the query logic and check for potential issues with multiple test results

# Search for similar query patterns in the codebase to ensure consistency
rg -A 10 -B 5 "elementary_test_results.*table_name.*order by.*created_at desc"

# Look for any existing edge case handling in similar queries
rg -A 5 "limit 1" --type sql

Length of output: 2695


Ensure deterministic ordering and handle no-results in SAMPLES_QUERY

The current CTE picks the latest test by created_at DESC LIMIT 1, but if two runs share the same timestamp you may get a non-deterministic row. Also, if no test results exist for the given test_id, the final query simply returns no rows.

Please update integration_tests/tests/test_sampling_pii.py (lines 9–21) to:

  • Add a secondary sort key (e.g. id DESC) to make the choice deterministic
  • Optionally guard against an empty CTE (for example, by raising an error or returning a default)

Suggested diff:

     with latest_elementary_test_result as (
         select id
         from {{ ref("elementary_test_results") }}
         where lower(table_name) = lower('{test_id}')
-        order by created_at desc
+        order by created_at desc, id desc
         limit 1
     )
 
     select result_row
     from {{ ref("test_result_rows") }}
     where elementary_test_results_id in (
         select * from latest_elementary_test_result
     )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SAMPLES_QUERY = """
with latest_elementary_test_result as (
select id
from {{{{ ref("elementary_test_results") }}}}
where lower(table_name) = lower('{test_id}')
order by created_at desc
limit 1
)
select result_row
from {{{{ ref("test_result_rows") }}}}
where elementary_test_results_id in (select * from latest_elementary_test_result)
"""
SAMPLES_QUERY = """
with latest_elementary_test_result as (
select id
from {{{{ ref("elementary_test_results") }}}}
where lower(table_name) = lower('{test_id}')
order by created_at desc, id desc
limit 1
)
select result_row
from {{{{ ref("test_result_rows") }}}}
where elementary_test_results_id in (select * from latest_elementary_test_result)
"""
🤖 Prompt for AI Agents
In integration_tests/tests/test_sampling_pii.py around lines 9 to 21, the
SAMPLES_QUERY CTE orders by created_at descending but lacks a secondary sort
key, causing non-deterministic results if timestamps tie. Also, it does not
handle the case when no test results exist for the given test_id. Fix this by
adding a secondary order criterion such as id DESC to ensure deterministic
ordering, and add logic to handle an empty CTE result, for example by raising an
error or returning a default value to avoid silent failures.


TEST_SAMPLE_ROW_COUNT = 7


@pytest.mark.skip_targets(["clickhouse"])
def test_sampling_pii_disabled(test_id: str, dbt_project: DbtProject):
"""Test that PII-tagged tables don't upload samples even when tests fail"""
null_count = 50
data = [{COLUMN_NAME: None} for _ in range(null_count)]

test_result = dbt_project.test(
test_id,
"not_null",
dict(column_name=COLUMN_NAME),
data=data,
as_model=True,
model_config={"config": {"tags": ["pii"]}},
test_vars={
"enable_elementary_test_materialization": True,
"test_sample_row_count": TEST_SAMPLE_ROW_COUNT,
"disable_samples_on_pii_tables": True,
"pii_table_tags": ["pii", "sensitive"],
},
)
assert test_result["status"] == "fail"

samples = [
json.loads(row["result_row"])
for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id))
]
assert len(samples) == 0


@pytest.mark.skip_targets(["clickhouse"])
def test_sampling_non_pii_enabled(test_id: str, dbt_project: DbtProject):
"""Test that non-PII tables still collect samples normally"""
null_count = 50
data = [{COLUMN_NAME: None} for _ in range(null_count)]

test_result = dbt_project.test(
test_id,
"not_null",
dict(column_name=COLUMN_NAME),
data=data,
as_model=True,
model_config={"config": {"tags": ["normal"]}},
test_vars={
"enable_elementary_test_materialization": True,
"test_sample_row_count": TEST_SAMPLE_ROW_COUNT,
"disable_samples_on_pii_tables": True,
"pii_table_tags": ["pii", "sensitive"],
},
)
assert test_result["status"] == "fail"

samples = [
json.loads(row["result_row"])
for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id))
]
assert len(samples) == TEST_SAMPLE_ROW_COUNT


@pytest.mark.skip_targets(["clickhouse"])
def test_sampling_pii_feature_disabled(test_id: str, dbt_project: DbtProject):
"""Test that when PII feature is disabled, PII tables still collect samples"""
null_count = 50
data = [{COLUMN_NAME: None} for _ in range(null_count)]

test_result = dbt_project.test(
test_id,
"not_null",
dict(column_name=COLUMN_NAME),
data=data,
as_model=True,
model_config={"config": {"tags": ["pii"]}},
test_vars={
"enable_elementary_test_materialization": True,
"test_sample_row_count": TEST_SAMPLE_ROW_COUNT,
"disable_samples_on_pii_tables": False,
"pii_table_tags": ["pii", "sensitive"],
},
)
assert test_result["status"] == "fail"

samples = [
json.loads(row["result_row"])
for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id))
]
assert len(samples) == TEST_SAMPLE_ROW_COUNT
6 changes: 5 additions & 1 deletion macros/edr/materializations/test/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@

{% macro handle_dbt_test(flattened_test, materialization_macro) %}
{% set result = materialization_macro() %}
{% set result_rows = elementary.query_test_result_rows(sample_limit=elementary.get_config_var('test_sample_row_count'),
{% set sample_limit = elementary.get_config_var('test_sample_row_count') %}
{% if elementary.is_pii_table(flattened_test) %}
{% set sample_limit = 0 %}
{% endif %}
{% set result_rows = elementary.query_test_result_rows(sample_limit=sample_limit,
ignore_passed_tests=true) %}
{% set elementary_test_results_row = elementary.get_dbt_test_result_row(flattened_test, result_rows) %}
{% do elementary.cache_elementary_test_results_rows([elementary_test_results_row]) %}
Expand Down
4 changes: 3 additions & 1 deletion macros/edr/system/system_utils/get_config_var.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
},
'include_other_warehouse_specific_columns': false,
'fail_on_zero': false,
'anomaly_exclude_metrics': none
'anomaly_exclude_metrics': none,
'disable_samples_on_pii_tables': false,
'pii_table_tags': ['pii']
} %}
{{- return(default_config) -}}
{%- endmacro -%}
Expand Down
14 changes: 14 additions & 0 deletions macros/edr/system/system_utils/is_pii_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro is_pii_table(flattened_test) %}
{% set disable_samples_on_pii_tables = elementary.get_config_var('disable_samples_on_pii_tables') %}
{% if not disable_samples_on_pii_tables %}
{% do return(false) %}
{% endif %}

{% set pii_table_tags = elementary.get_config_var('pii_table_tags') %}
{% set model_tags = elementary.insensitive_get_dict_value(flattened_test, 'model_tags', []) %}

{% set intersection = elementary.lists_intersection(model_tags, pii_table_tags) %}
{% set is_pii = intersection | length > 0 %}

{% do return(is_pii) %}
{% endmacro %}
Loading