Skip to content

Commit 1b034ca

Browse files
authored
Add exclude_detection_period_from_training flag to volume_anomalies test (#877)
1 parent 20b7637 commit 1b034ca

File tree

7 files changed

+163
-12
lines changed

7 files changed

+163
-12
lines changed

integration_tests/tests/test_anomaly_test_configuration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def get_value(key: str):
8888
"freshness_column": None, # Deprecated
8989
"dimensions": None, # should only be set at the test level,
9090
"exclude_final_results": get_value("exclude_final_results"),
91+
"exclude_detection_period_from_training": None,
9192
}
9293

9394

integration_tests/tests/test_volume_anomalies.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,3 +534,88 @@ def test_anomalyless_vol_anomalies_with_test_materialization(
534534
test_vars={"enable_elementary_test_materialization": True},
535535
)
536536
assert test_result["status"] == "pass"
537+
538+
539+
# Test for exclude_detection_period_from_training functionality
540+
# This test demonstrates the use case where:
541+
# 1. Detection period contains anomalous data that would normally be included in training
542+
# 2. With exclude_detection_period_from_training=False: anomaly is missed (test passes) because training includes the anomaly
543+
# 3. With exclude_detection_period_from_training=True: anomaly is detected (test fails) because training excludes the anomaly
544+
@pytest.mark.skip_targets(["clickhouse"])
545+
def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject):
546+
"""
547+
Test the exclude_detection_period_from_training flag functionality.
548+
549+
Scenario:
550+
- 30 days of normal data with variance (98, 100, 102 rows per day pattern)
551+
- 7 days of anomalous data (114 rows per day) in detection period
552+
- Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly)
553+
- With exclusion: anomaly excluded from training, test fails (detects anomaly)
554+
"""
555+
utc_now = datetime.utcnow()
556+
557+
# Generate 30 days of normal data with variance (98, 100, 102 pattern)
558+
normal_pattern = [98, 100, 102]
559+
normal_data = []
560+
for i in range(30):
561+
date = utc_now - timedelta(days=37 - i)
562+
rows_per_day = normal_pattern[i % 3]
563+
normal_data.extend(
564+
[
565+
{TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)}
566+
for _ in range(rows_per_day)
567+
]
568+
)
569+
570+
# Generate 7 days of anomalous data (114 rows per day) - this will be in detection period
571+
anomalous_data = []
572+
for i in range(7):
573+
date = utc_now - timedelta(days=7 - i)
574+
anomalous_data.extend(
575+
[
576+
{TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)}
577+
for _ in range(114) # 14% increase from mean
578+
]
579+
)
580+
581+
all_data = normal_data + anomalous_data
582+
583+
# Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training)
584+
test_args_without_exclusion = {
585+
**DBT_TEST_ARGS,
586+
"training_period": {"period": "day", "count": 30},
587+
"detection_period": {"period": "day", "count": 7},
588+
"time_bucket": {"period": "day", "count": 1},
589+
"sensitivity": 5, # Higher sensitivity to allow anomaly to be absorbed
590+
# exclude_detection_period_from_training is not set (defaults to False/None)
591+
}
592+
593+
test_result_without_exclusion = dbt_project.test(
594+
test_id + "_without_exclusion",
595+
DBT_TEST_NAME,
596+
test_args_without_exclusion,
597+
data=all_data,
598+
)
599+
600+
# This should PASS because the anomaly is included in training, making it part of the baseline
601+
assert (
602+
test_result_without_exclusion["status"] == "pass"
603+
), "Test should pass when anomaly is included in training"
604+
605+
# Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training)
606+
test_args_with_exclusion = {
607+
**test_args_without_exclusion,
608+
"exclude_detection_period_from_training": True,
609+
}
610+
611+
test_result_with_exclusion = dbt_project.test(
612+
test_id + "_with_exclusion",
613+
DBT_TEST_NAME,
614+
test_args_with_exclusion,
615+
data=all_data,
616+
)
617+
618+
# This should FAIL because the anomaly is excluded from training, so it's detected as anomalous
619+
assert (
620+
test_result_with_exclusion["status"] == "fail"
621+
), "Test should fail when anomaly is excluded from training"

macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,50 @@
1717

1818
{%- if test_configuration.seasonality == 'day_of_week' %}
1919
{%- set bucket_seasonality_expr = elementary.edr_day_of_week_expression('bucket_end') %}
20+
{%- set has_seasonality = true %}
2021

2122
{%- elif test_configuration.seasonality == 'hour_of_day' %}
2223
{%- set bucket_seasonality_expr = elementary.edr_hour_of_day_expression('bucket_end') %}
24+
{%- set has_seasonality = true %}
2325

2426
{%- elif test_configuration.seasonality == 'hour_of_week' %}
2527
{%- set bucket_seasonality_expr = elementary.edr_hour_of_week_expression('bucket_end') %}
28+
{%- set has_seasonality = true %}
2629

2730
{%- else %}
2831
{%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %}
32+
{%- set has_seasonality = false %}
2933
{%- endif %}
34+
35+
{# Build PARTITION BY clause for window functions dynamically to work around Redshift limitation.
36+
37+
Redshift doesn't allow constant expressions in PARTITION BY of window functions. When seasonality
38+
is not configured, bucket_seasonality becomes a constant ('no_seasonality'::text), which triggers
39+
the error "constant expressions are not supported in partition by clauses."
40+
41+
We build the partition keys dynamically, always including the core metric keys and only appending
42+
bucket_seasonality when it's computed from timestamps (has_seasonality = true). Partitioning by
43+
a constant has no effect anyway, so this preserves behavior while keeping Redshift happy. #}
44+
{%- set partition_by_keys = "metric_name, full_table_name, column_name, dimension, dimension_value" %}
45+
{%- if has_seasonality %}
46+
{%- set partition_by_keys = partition_by_keys ~ ", bucket_seasonality" %}
47+
{%- endif %}
48+
3049
{%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %}
3150
{%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %}
3251
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %}
3352

53+
{# Calculate detection period start for exclusion logic.
54+
backfill_days defines the window of recent data to test for anomalies on each run.
55+
It defaults to 2 days (configurable via vars.backfill_days or test-level parameter).
56+
The detection period spans from (detection_end - backfill_days) to detection_end.
57+
When exclude_detection_period_from_training is enabled, metrics in this detection period
58+
are excluded from training statistics to prevent contamination from potentially anomalous data. #}
59+
{%- if test_configuration.exclude_detection_period_from_training %}
60+
{%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %}
61+
{%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %}
62+
{%- endif %}
63+
3464
{# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the
3565
bucket end (which is the actual time of the test) #}
3666
{%- set metric_time_bucket_expr = 'case when bucket_start is not null then bucket_start else bucket_end end' %}
@@ -142,6 +172,12 @@
142172
bucket_end,
143173
{{ bucket_seasonality_expr }} as bucket_seasonality,
144174
{{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded,
175+
{# Flag detection period metrics for exclusion from training #}
176+
{% if test_configuration.exclude_detection_period_from_training %}
177+
bucket_end > {{ detection_period_start_expr }}
178+
{% else %}
179+
FALSE
180+
{% endif %} as should_exclude_from_training,
145181
bucket_duration_hours,
146182
updated_at
147183
from grouped_metrics_duplicates
@@ -164,14 +200,15 @@
164200
bucket_seasonality,
165201
bucket_duration_hours,
166202
updated_at,
167-
avg(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg,
168-
{{ elementary.standard_deviation('metric_value') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
169-
count(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size,
170-
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end,
171-
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start
203+
should_exclude_from_training,
204+
avg(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_avg,
205+
{{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
206+
count(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_set_size,
207+
last_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) training_end,
208+
first_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_start
172209
from grouped_metrics
173210
where not is_excluded
174-
{{ dbt_utils.group_by(13) }}
211+
{{ dbt_utils.group_by(14) }}
175212
),
176213

177214
anomaly_scores as (

macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
anomaly_exclude_metrics,
2424
detection_period,
2525
training_period,
26-
exclude_final_results) %}
26+
exclude_final_results,
27+
exclude_detection_period_from_training) %}
2728

2829
{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}
2930
{# Changes in these configs impact the metric id of the test. #}
@@ -53,6 +54,7 @@
5354

5455
{% set anomaly_exclude_metrics = elementary.get_test_argument('anomaly_exclude_metrics', anomaly_exclude_metrics, model_graph_node) %}
5556
{% set exclude_final_results = elementary.get_exclude_final_results(exclude_final_results) %}
57+
{% set exclude_detection_period_from_training = elementary.get_test_argument('exclude_detection_period_from_training', exclude_detection_period_from_training, model_graph_node) %}
5658

5759
{% set test_configuration =
5860
{'timestamp_column': metric_props.timestamp_column,
@@ -71,7 +73,8 @@
7173
'fail_on_zero': fail_on_zero,
7274
'detection_delay': detection_delay,
7375
'anomaly_exclude_metrics': anomaly_exclude_metrics,
74-
'exclude_final_results': exclude_final_results
76+
'exclude_final_results': exclude_final_results,
77+
'exclude_detection_period_from_training': exclude_detection_period_from_training
7578
} %}
7679
{%- set test_configuration = elementary.undefined_dict_keys_to_none(test_configuration) -%}
7780
{%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%}

macros/edr/tests/test_table_anomalies.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none) %}
1+
{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none, exclude_detection_period_from_training=false) %}
22
{{ config(tags = ['elementary-tests']) }}
33
{%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
44
{% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %}
@@ -37,7 +37,8 @@
3737
detection_delay=detection_delay,
3838
anomaly_exclude_metrics=anomaly_exclude_metrics,
3939
detection_period=detection_period,
40-
training_period=training_period) %}
40+
training_period=training_period,
41+
exclude_detection_period_from_training=exclude_detection_period_from_training) %}
4142

4243
{% if not test_configuration %}
4344
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}

macros/edr/tests/test_volume_anomalies.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %}
1+
{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %}
22
{{ config(tags = ['elementary-tests']) }}
33

44
{{ elementary.test_table_anomalies(
@@ -20,7 +20,8 @@
2020
detection_delay=detection_delay,
2121
anomaly_exclude_metrics=anomaly_exclude_metrics,
2222
detection_period=detection_period,
23-
training_period=training_period
23+
training_period=training_period,
24+
exclude_detection_period_from_training=exclude_detection_period_from_training
2425
)
2526
}}
2627
{% endtest %}

macros/utils/cross_db_utils/multi_value_in.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,26 @@
3232
from {{ target_table }}
3333
)
3434
{%- endmacro -%}
35+
36+
{%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%}
37+
{# Redshift doesn't support multi-column IN subqueries (tuple IN) like:
38+
(col1, col2) IN (SELECT col1, col2 FROM table)
39+
40+
This limitation causes the error: "This type of IN/NOT IN query is not supported yet"
41+
42+
To work around this, we use CONCAT to combine multiple columns into a single scalar
43+
value on both sides of the IN comparison, similar to the BigQuery implementation.
44+
This maintains the same semantics while avoiding Redshift's tuple IN limitation. #}
45+
concat(
46+
{%- for val in source_cols -%}
47+
{{ elementary.edr_cast_as_string(val) -}}
48+
{%- if not loop.last %}, {% endif %}
49+
{%- endfor %}
50+
) in (
51+
select concat({%- for val in target_cols -%}
52+
{{ elementary.edr_cast_as_string(val) -}}
53+
{%- if not loop.last %}, {% endif %}
54+
{%- endfor %})
55+
from {{ target_table }}
56+
)
57+
{%- endmacro -%}

0 commit comments

Comments
 (0)