Skip to content

Commit 5d4dd69

Browse files
Implement column-level PII protection for sample collection
- Add disable_samples_on_pii_columns and pii_column_tags config variables - Create get_pii_columns_from_parent_model helper macro for column PII detection - Modify query_test_result_rows to dynamically exclude PII columns from SELECT - Add comprehensive integration tests for column-level PII protection - Maintain backward compatibility with existing table-level PII detection - Follow existing patterns for configuration, tag handling, and testing Addresses Linear issue ELE-4850 Co-Authored-By: Yosef Arbiv <[email protected]>
1 parent dc8fba5 commit 5d4dd69

File tree

4 files changed

+204
-3
lines changed

4 files changed

+204
-3
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import json
2+
3+
import pytest
4+
from dbt_project import DbtProject
5+
6+
SENSITIVE_COLUMN = "email"
7+
SAFE_COLUMN = "order_count"
8+
9+
SAMPLES_QUERY = """
10+
with latest_elementary_test_result as (
11+
select id
12+
from {{{{ ref("elementary_test_results") }}}}
13+
where lower(table_name) = lower('{test_id}')
14+
order by created_at desc
15+
limit 1
16+
)
17+
18+
select result_row
19+
from {{{{ ref("test_result_rows") }}}}
20+
where elementary_test_results_id in (select * from latest_elementary_test_result)
21+
"""
22+
23+
TEST_SAMPLE_ROW_COUNT = 5
24+
25+
26+
@pytest.mark.skip_targets(["clickhouse"])
27+
def test_column_pii_sampling_enabled(test_id: str, dbt_project: DbtProject):
28+
"""Test that PII columns are excluded when column-level PII protection is enabled"""
29+
data = [
30+
{SENSITIVE_COLUMN: f"user{i}@example.com", SAFE_COLUMN: None} for i in range(10)
31+
]
32+
33+
test_result = dbt_project.test(
34+
test_id,
35+
"not_null",
36+
test_args=dict(column_name=SAFE_COLUMN),
37+
data=data,
38+
columns=[
39+
{"name": SENSITIVE_COLUMN, "config": {"tags": ["pii"]}},
40+
{"name": SAFE_COLUMN},
41+
],
42+
test_vars={
43+
"enable_elementary_test_materialization": True,
44+
"test_sample_row_count": TEST_SAMPLE_ROW_COUNT,
45+
"disable_samples_on_pii_columns": True,
46+
"pii_column_tags": ["pii"],
47+
},
48+
)
49+
assert test_result["status"] == "fail"
50+
51+
samples = [
52+
json.loads(row["result_row"])
53+
for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id))
54+
]
55+
56+
assert len(samples) == TEST_SAMPLE_ROW_COUNT
57+
for sample in samples:
58+
assert SENSITIVE_COLUMN not in sample
59+
assert SAFE_COLUMN in sample
60+
61+
62+
@pytest.mark.skip_targets(["clickhouse"])
63+
def test_column_pii_sampling_disabled(test_id: str, dbt_project: DbtProject):
64+
"""Test that all columns are included when column-level PII protection is disabled"""
65+
data = [
66+
{SENSITIVE_COLUMN: f"user{i}@example.com", SAFE_COLUMN: None} for i in range(10)
67+
]
68+
69+
test_result = dbt_project.test(
70+
test_id,
71+
"not_null",
72+
test_args=dict(column_name=SAFE_COLUMN),
73+
data=data,
74+
columns=[
75+
{"name": SENSITIVE_COLUMN, "config": {"tags": ["pii"]}},
76+
{"name": SAFE_COLUMN},
77+
],
78+
test_vars={
79+
"enable_elementary_test_materialization": True,
80+
"test_sample_row_count": TEST_SAMPLE_ROW_COUNT,
81+
"disable_samples_on_pii_columns": False,
82+
},
83+
)
84+
assert test_result["status"] == "fail"
85+
86+
samples = [
87+
json.loads(row["result_row"])
88+
for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id))
89+
]
90+
91+
assert len(samples) == TEST_SAMPLE_ROW_COUNT
92+
for sample in samples:
93+
assert SENSITIVE_COLUMN in sample
94+
assert SAFE_COLUMN in sample
95+
96+
97+
@pytest.mark.skip_targets(["clickhouse"])
98+
def test_column_pii_sampling_all_columns_pii(test_id: str, dbt_project: DbtProject):
99+
"""Test behavior when all columns are tagged as PII"""
100+
data = [
101+
{SENSITIVE_COLUMN: f"user{i}@example.com", SAFE_COLUMN: i} for i in range(10)
102+
]
103+
104+
test_result = dbt_project.test(
105+
test_id,
106+
"not_null",
107+
test_args=dict(column_name=SAFE_COLUMN),
108+
data=data,
109+
columns=[
110+
{"name": SENSITIVE_COLUMN, "config": {"tags": ["pii"]}},
111+
{"name": SAFE_COLUMN, "config": {"tags": ["pii"]}},
112+
],
113+
test_vars={
114+
"enable_elementary_test_materialization": True,
115+
"test_sample_row_count": TEST_SAMPLE_ROW_COUNT,
116+
"disable_samples_on_pii_columns": True,
117+
"pii_column_tags": ["pii"],
118+
},
119+
)
120+
assert test_result["status"] == "pass"
121+
122+
samples = [
123+
json.loads(row["result_row"])
124+
for row in dbt_project.run_query(SAMPLES_QUERY.format(test_id=test_id))
125+
]
126+
127+
assert len(samples) == TEST_SAMPLE_ROW_COUNT
128+
for sample in samples:
129+
assert "_no_non_pii_columns" in sample
130+
assert sample["_no_non_pii_columns"] == 1
131+
assert SENSITIVE_COLUMN not in sample
132+
assert SAFE_COLUMN not in sample

macros/edr/materializations/test/test.sql

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151
{% macro handle_dbt_test(flattened_test, materialization_macro) %}
5252
{% set result = materialization_macro() %}
5353
{% set result_rows = elementary.query_test_result_rows(sample_limit=elementary.get_config_var('test_sample_row_count'),
54-
ignore_passed_tests=true) %}
54+
ignore_passed_tests=true,
55+
flattened_test=flattened_test) %}
5556
{% set elementary_test_results_row = elementary.get_dbt_test_result_row(flattened_test, result_rows) %}
5657
{% do elementary.cache_elementary_test_results_rows([elementary_test_results_row]) %}
5758
{% do return(result) %}
@@ -103,19 +104,43 @@
103104
{% do return(new_sql) %}
104105
{% endmacro %}
105106

106-
{% macro query_test_result_rows(sample_limit=none, ignore_passed_tests=false) %}
107+
{% macro query_test_result_rows(sample_limit=none, ignore_passed_tests=false, flattened_test=none) %}
107108
{% if sample_limit == 0 %} {# performance: no need to run a sql query that we know returns an empty list #}
108109
{% do return([]) %}
109110
{% endif %}
110111
{% if ignore_passed_tests and elementary.did_test_pass() %}
111112
{% do elementary.debug_log("Skipping sample query because the test passed.") %}
112113
{% do return([]) %}
113114
{% endif %}
115+
116+
{% set pii_columns = [] %}
117+
{% if flattened_test %}
118+
{% set pii_columns = elementary.get_pii_columns_from_parent_model(flattened_test) %}
119+
{% endif %}
120+
121+
{% set select_clause = "*" %}
122+
{% if pii_columns %}
123+
{% set query_to_get_columns %}
124+
with test_results as (
125+
{{ sql }}
126+
)
127+
select * from test_results limit 0
128+
{% endset %}
129+
{% set columns_result = elementary.run_query(query_to_get_columns) %}
130+
{% set all_columns = columns_result.column_names %}
131+
{% set safe_columns = all_columns | reject("in", pii_columns) | list %}
132+
{% if safe_columns %}
133+
{% set select_clause = safe_columns | join(", ") %}
134+
{% else %}
135+
{% set select_clause = "1 as _no_non_pii_columns" %}
136+
{% endif %}
137+
{% endif %}
138+
114139
{% set query %}
115140
with test_results as (
116141
{{ sql }}
117142
)
118-
select * from test_results {% if sample_limit is not none %} limit {{ sample_limit }} {% endif %}
143+
select {{ select_clause }} from test_results {% if sample_limit is not none %} limit {{ sample_limit }} {% endif %}
119144
{% endset %}
120145
{% do return(elementary.agate_to_dicts(elementary.run_query(query))) %}
121146
{% endmacro %}

macros/edr/system/system_utils/get_config_var.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
'disable_skipped_test_alerts': true,
5656
'dbt_artifacts_chunk_size': 5000,
5757
'test_sample_row_count': 5,
58+
'disable_samples_on_pii_columns': false,
59+
'pii_column_tags': ['pii', 'personal', 'sensitive'],
5860
'edr_cli_run': false,
5961
'max_int': 2147483647,
6062
'custom_run_started_at': none,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{% macro get_pii_columns_from_parent_model(flattened_test) %}
2+
{% set pii_columns = [] %}
3+
4+
{% if not elementary.get_config_var('disable_samples_on_pii_columns') %}
5+
{% do return(pii_columns) %}
6+
{% endif %}
7+
8+
{% set parent_model_unique_id = elementary.insensitive_get_dict_value(flattened_test, 'parent_model_unique_id') %}
9+
{% set parent_model = elementary.get_node(parent_model_unique_id) %}
10+
11+
{% if not parent_model %}
12+
{% do return(pii_columns) %}
13+
{% endif %}
14+
15+
{% set column_nodes = parent_model.get("columns") %}
16+
{% if not column_nodes %}
17+
{% do return(pii_columns) %}
18+
{% endif %}
19+
20+
{% set pii_column_tags = elementary.get_config_var('pii_column_tags') %}
21+
{% if pii_column_tags is string %}
22+
{% set pii_column_tags = [pii_column_tags] %}
23+
{% endif %}
24+
25+
{% for column_node in column_nodes.values() %}
26+
{% set config_dict = column_node.get('config', {}) %}
27+
{% set config_tags = config_dict.get('tags', []) %}
28+
{% set global_tags = column_node.get('tags', []) %}
29+
{% set meta_dict = column_node.get('meta', {}) %}
30+
{% set meta_tags = meta_dict.get('tags', []) %}
31+
{% set all_column_tags = config_tags + global_tags + meta_tags %}
32+
33+
{% for pii_tag in pii_column_tags %}
34+
{% if pii_tag in all_column_tags %}
35+
{% do pii_columns.append(column_node.get('name')) %}
36+
{% break %}
37+
{% endif %}
38+
{% endfor %}
39+
{% endfor %}
40+
41+
{% do return(pii_columns) %}
42+
{% endmacro %}

0 commit comments

Comments
 (0)