Skip to content

Commit f27e8d7

Browse files
author
Michael Myaskovsky
committed
fixed null insertion
1 parent 1d9374c commit f27e8d7

File tree

12 files changed

+140
-41
lines changed

12 files changed

+140
-41
lines changed

dbt_project.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ version: "0.18.1"
33

44
require-dbt-version: [">=1.0.0", "<2.0.0"]
55

6-
flags:
7-
require_explicit_package_overrides_for_builtin_materializations: false
8-
96
config-version: 2
107
profile: "elementary"
118

macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@
164164
bucket_duration_hours,
165165
updated_at,
166166
avg(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg,
167-
stddev(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
167+
{{ elementary.standard_deviation('metric_value') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
168168
count(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size,
169169
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end,
170170
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start

macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
select edr_bucket_start, edr_bucket_end, dimension_value
6565
from training_set_dimensions left join buckets
6666
on (buckets.joiner = training_set_dimensions.joiner
67-
{# This makes sure we don't create empty buckets for dimensions before their first appearance #}
67+
{# This makes sure we dont create empty buckets for dimensions before their first appearance #}
6868
and edr_bucket_end >= dimension_min_bucket_end)
6969
where dimension_value is not null
7070
),
@@ -202,13 +202,12 @@
202202
{{ elementary.null_timestamp() }} as bucket_start,
203203
bucket_end,
204204
{{ elementary.null_int() }} as bucket_duration_hours,
205-
{{ elementary.const_as_string(dimensions_string) }} as dimension,
206-
dimension_value,
205+
{{ elementary.null_string() }} as dimension,
206+
{{ elementary.null_string() }} as dimension_value,
207207
{{ elementary.dict_to_quoted_json(metric_properties) }} as metric_properties
208208
from row_count
209209
)
210210
{% endif %}
211-
212211
select
213212
{{ elementary.generate_surrogate_key([
214213
'full_table_name',

macros/edr/system/system_utils/empty_table.sql

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@
5454
{% endmacro %}
5555

5656
{% macro empty_data_monitoring_metrics(with_created_at=true) %}
57+
{{ return(adapter.dispatch('empty_data_monitoring_metrics', 'elementary')(with_created_at)) }}
58+
{% endmacro %}
59+
60+
{% macro default__empty_data_monitoring_metrics(with_created_at=true) %}
5761
{% set columns = [('id','string'),
5862
('full_table_name','string'),
5963
('column_name','string'),
@@ -75,6 +79,27 @@
7579
{{ elementary.empty_table(columns) }}
7680
{% endmacro %}
7781

82+
{% macro clickhouse__empty_data_monitoring_metrics(with_created_at=true) %}
83+
{% set columns = [('id','string'),
84+
('full_table_name','nullable(string)'),
85+
('column_name','nullable(string)'),
86+
('metric_name','nullable(string)'),
87+
('metric_type','nullable(string)'),
88+
('metric_value','nullable(float)'),
89+
('source_value','nullable(string)'),
90+
('bucket_start','timestamp'),
91+
('bucket_end','timestamp'),
92+
('bucket_duration_hours','nullable(int)'),
93+
('updated_at','nullable(timestamp)'),
94+
('dimension','nullable(string)'),
95+
('dimension_value','nullable(string)'),
96+
('metric_properties','string')]
97+
%}
98+
{% if with_created_at %}
99+
{% do columns.append(('created_at','nullable(timestamp)')) %}
100+
{% endif %}
101+
{{ elementary.empty_table(columns) }}
102+
{% endmacro %}
78103

79104
{% macro empty_schema_columns_snapshot() %}
80105
{{ elementary.empty_table([('column_state_id','string'),('full_column_name','string'),('full_table_name','string'),('column_name','string'),('data_type','string'),('is_new','boolean'),('detected_at','timestamp'),('created_at','timestamp')]) }}
@@ -124,6 +149,14 @@
124149
cast({{ dummy_values['float'] }} as {{ elementary.edr_type_float() }}) as {{ column_name }}
125150
{%- elif data_type == 'long_string' %}
126151
cast('{{ dummy_values['long_string'] }}' as {{ elementary.edr_type_long_string() }}) as {{ column_name }}
152+
{%- elif data_type == 'nullable(string)' %}
153+
cast('{{ dummy_values['string'] }}' as Nullable({{ elementary.edr_type_string() }})) as {{ column_name }}
154+
{%- elif data_type == 'nullable(timestamp)' -%}
155+
cast('{{ dummy_values['timestamp'] }}' as Nullable({{ elementary.edr_type_timestamp() }})) as {{ column_name }}
156+
{%- elif data_type == 'nullable(float)' -%}
157+
cast({{ dummy_values['float'] }} as Nullable({{ elementary.edr_type_float() }})) as {{ column_name }}
158+
{%- elif data_type == 'nullable(int)' -%}
159+
cast({{ dummy_values['int'] }} as Nullable({{ elementary.edr_type_int() }})) as {{ column_name }}
127160
{%- else %}
128161
cast('{{ dummy_values['string'] }}' as {{ elementary.edr_type_string() }}) as {{ column_name }}
129162
{%- endif %}

macros/edr/system/system_utils/get_config_var.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@
7575
{{- return(default_config) -}}
7676
{%- endmacro -%}
7777

78+
{%- macro clickhouse__get_default_config() -%}
79+
{% set default_config = elementary.default__get_default_config() %}
80+
{% do default_config.update({'query_max_size': 250000}) %}
81+
{{- return(default_config) -}}
82+
{%- endmacro -%}
83+
7884
{%- macro athena__get_default_config() -%}
7985
{% set default_config = elementary.default__get_default_config() %}
8086
{% do default_config.update({'query_max_size': 250000}) %}

macros/edr/tests/test_column_anomalies.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
dimensions) %}
8181
{{ elementary.debug_log('column_monitoring_query - \n' ~ column_monitoring_query) }}
8282
{% set temp_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'metrics', column_monitoring_query) %}
83-
8483
{#- calculate anomaly scores for metrics -#}
8584
{% set anomaly_scores_query = elementary.get_anomaly_scores_query(test_metrics_table_relation=temp_table_relation,
8685
model_relation=model_relation,

macros/edr/tests/test_dimension_anomalies.sql

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{% test dimension_anomalies(model, dimensions, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity,ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_final_results) %}
22
{{ config(tags = ['elementary-tests']) }}
3-
{%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
3+
{%- if execute and elementary.is_test_command()%}
44
{% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %}
55
{% if not model_relation %}
66
{{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }}
@@ -58,9 +58,7 @@
5858

5959
{%- set dimension_monitoring_query = elementary.dimension_monitoring_query(model, model_relation, metric_properties.dimensions, min_bucket_start, max_bucket_end, metric_properties) %}
6060
{{ elementary.debug_log('dimension_monitoring_query - \n' ~ dimension_monitoring_query) }}
61-
6261
{% set temp_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'metrics', dimension_monitoring_query) %}
63-
6462
{#- calculate anomaly scores for metrics -#}
6563
{% set anomaly_scores_query = elementary.get_anomaly_scores_query(test_metrics_table_relation=temp_table_relation,
6664
model_relation=model_relation,

macros/edr/tests/test_table_anomalies.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none) %}
22
{{ config(tags = ['elementary-tests']) }}
3-
{%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
3+
{%- if execute and elementary.is_test_command() %}
44
{% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %}
55
{% if not model_relation %}
66
{{ exceptions.raise_compiler_error("The test has unsupported configuration, please contact Elementary support") }}
@@ -71,7 +71,6 @@
7171
metric_properties=metric_properties) %}
7272
{{ elementary.debug_log('table_monitoring_query - \n' ~ table_monitoring_query) }}
7373
{% set temp_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'metrics', table_monitoring_query) %}
74-
7574
{#- calculate anomaly scores for metrics -#}
7675
{% set anomaly_scores_query = elementary.get_anomaly_scores_query(temp_table_relation,
7776
model_relation,

macros/edr/tests/test_utils/get_anomaly_query.sql

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,38 +60,77 @@ case when
6060
from anomaly_scores
6161
),
6262

63-
final_results as (
64-
select
65-
metric_value as value,
66-
training_avg as average,
67-
{# when there is an anomaly we would want to use the last value of the metric (lag), otherwise visually the expectations would look out of bounds #}
68-
case
69-
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'spike' then
70-
lag(metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
71-
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'spike' then
72-
lag(min_metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
73-
when '{{ test_configuration.anomaly_direction }}' = 'spike' then metric_value
74-
else min_metric_value end as min_value,
75-
case
76-
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'drop' then
77-
lag(metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
78-
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'drop' then
79-
lag(max_metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
80-
when '{{ test_configuration.anomaly_direction }}' = 'drop' then metric_value
81-
else max_metric_value end as max_value,
82-
bucket_start as start_time,
83-
bucket_end as end_time,
84-
*
85-
from anomaly_scores_with_is_anomalous
86-
order by bucket_end, dimension_value
87-
)
63+
{{ elementary.get_final_results_query(test_configuration) }}
8864

8965
select * from final_results
9066
where {{ test_configuration.exclude_final_results }}
9167
{%- endset -%}
9268
{{- return(anomaly_query) -}}
9369
{% endmacro %}
9470

71+
{% macro get_final_results_query(test_configuration) %}
72+
{{ return(adapter.dispatch('get_final_results_query', 'elementary')(test_configuration)) }}
73+
{% endmacro %}
74+
75+
{% macro default__get_final_results_query(test_configuration) %}
76+
final_results as (
77+
select
78+
metric_value as value,
79+
training_avg as average,
80+
{# when there is an anomaly we would want to use the last value of the metric (lag), otherwise visually the expectations would look out of bounds #}
81+
case
82+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'spike' then
83+
lag(metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
84+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'spike' then
85+
lag(min_metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
86+
when '{{ test_configuration.anomaly_direction }}' = 'spike' then metric_value
87+
else min_metric_value
88+
end as min_value,
89+
case
90+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'drop' then
91+
lag(metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
92+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'drop' then
93+
lag(max_metric_value) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
94+
when '{{ test_configuration.anomaly_direction }}' = 'drop' then metric_value
95+
else max_metric_value
96+
end as max_value,
97+
bucket_start as start_time,
98+
bucket_end as end_time,
99+
*
100+
from anomaly_scores_with_is_anomalous
101+
order by bucket_end, dimension_value
102+
)
103+
{% endmacro %}
104+
105+
{% macro clickhouse__get_final_results_query(test_configuration) %}
106+
final_results as (
107+
select
108+
metric_value as value,
109+
training_avg as average,
110+
{# when there is an anomaly we would want to use the last value of the metric (lag), otherwise visually the expectations would look out of bounds #}
111+
case
112+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'spike' then
113+
lagInFrame(metric_value, 1) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
114+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'spike' then
115+
lagInFrame(min_metric_value, 1) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
116+
when '{{ test_configuration.anomaly_direction }}' = 'spike' then metric_value
117+
else min_metric_value
118+
end as min_value,
119+
case
120+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'drop' then
121+
lagInFrame(metric_value, 1) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
122+
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'drop' then
123+
lagInFrame(max_metric_value, 1) over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
124+
when '{{ test_configuration.anomaly_direction }}' = 'drop' then metric_value
125+
else max_metric_value
126+
end as max_value,
127+
bucket_start as start_time,
128+
bucket_end as end_time,
129+
*
130+
from anomaly_scores_with_is_anomalous
131+
order by bucket_end, dimension_value
132+
)
133+
{% endmacro %}
95134

96135
{%- macro set_directional_anomaly(anomaly_direction, anomaly_score, sensitivity) -%}
97136
{% if anomaly_direction | lower == 'spike' %}

macros/utils/cross_db_utils/generate_surrogate_key.sql

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
limitations under the License.
1515
#}
1616

17-
1817
{%- macro generate_surrogate_key(fields) -%}
18+
{{ return(adapter.dispatch('generate_surrogate_key', 'elementary')(fields)) }}
19+
{%- endmacro -%}
20+
21+
{%- macro default__generate_surrogate_key(fields) -%}
1922
{% set concat_macro = dbt.concat or dbt_utils.concat %}
2023
{% set hash_macro = dbt.hash or dbt_utils.hash %}
2124

@@ -30,4 +33,21 @@
3033
{%- endif -%}
3134
{%- endfor -%}
3235
{{ hash_macro(concat_macro(field_sqls)) }}
33-
{% endmacro %}
36+
{%- endmacro -%}
37+
38+
{%- macro clickhouse__generate_surrogate_key(fields) -%}
39+
{% set concat_macro = dbt.concat or dbt_utils.concat %}
40+
{% set hash_macro = dbt.hash or dbt_utils.hash %}
41+
42+
{% set default_null_value = "" %}
43+
{%- set field_sqls = [] -%}
44+
{%- for field in fields -%}
45+
{%- do field_sqls.append(
46+
"coalesce(cast(" ~ field ~ " as Nullable(" ~ elementary.edr_type_string() ~ ")), '" ~ default_null_value ~"')"
47+
) -%}
48+
{%- if not loop.last %}
49+
{%- do field_sqls.append("'-'") -%}
50+
{%- endif -%}
51+
{%- endfor -%}
52+
{{ hash_macro(concat_macro(field_sqls)) }}
53+
{%- endmacro -%}

0 commit comments

Comments
 (0)