Skip to content

Commit 0918a78

Browse files
author
Michael Myaskovsky
committed
partial fixes in tests for fabric
1 parent 05081d4 commit 0918a78

File tree

11 files changed

+530
-33
lines changed

11 files changed

+530
-33
lines changed

integration_tests/tests/dbt_project.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def get_dbt_runner(target: str, project_dir: str) -> BaseDbtRunner:
4242
class DbtProject:
4343
def __init__(self, target: str, project_dir: str):
4444
self.dbt_runner = get_dbt_runner(target, project_dir)
45-
45+
self.target = target
4646
self.project_dir_path = Path(project_dir)
4747
self.models_dir_path = self.project_dir_path / "models"
4848
self.tmp_models_dir_path = self.models_dir_path / "tmp"
@@ -57,23 +57,32 @@ def run_query(self, prerendered_query: str):
5757
)
5858
return results
5959

60-
@staticmethod
6160
def read_table_query(
61+
self,
6262
table_name: str,
6363
where: Optional[str] = None,
6464
group_by: Optional[str] = None,
6565
order_by: Optional[str] = None,
6666
limit: Optional[int] = None,
67-
column_names: Optional[List[str]] = None,
67+
column_names: Optional[List[str]] = None
6868
):
69-
return f"""
70-
SELECT {', '.join(column_names) if column_names else '*'}
71-
FROM {{{{ ref('{table_name}') }}}}
72-
{f"WHERE {where}" if where else ""}
73-
{f"GROUP BY {group_by}" if group_by else ""}
74-
{f"ORDER BY {order_by}" if order_by else ""}
75-
{f"LIMIT {limit}" if limit else ""}
76-
"""
69+
if self.target == 'fabric':
70+
return f"""
71+
SELECT {f'TOP {limit}' if limit else ''} {', '.join(column_names) if column_names else '*'}
72+
FROM {{{{ ref('{table_name}') }}}}
73+
{f"WHERE {where}" if where else ""}
74+
{f"GROUP BY {group_by}" if group_by else ""}
75+
{f"ORDER BY {order_by}" if order_by else ""}
76+
"""
77+
else:
78+
return f"""
79+
SELECT {', '.join(column_names) if column_names else '*'}
80+
FROM {{{{ ref('{table_name}') }}}}
81+
{f"WHERE {where}" if where else ""}
82+
{f"GROUP BY {group_by}" if group_by else ""}
83+
{f"ORDER BY {order_by}" if order_by else ""}
84+
{f"LIMIT {limit}" if limit else ""}
85+
"""
7786

7887
def read_table(
7988
self,

macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
{% macro get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, metric_names, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
2+
{{ return(adapter.dispatch('get_anomaly_scores_query', 'elementary')(test_metrics_table_relation, model_relation, test_configuration, metric_names, column_name, columns_only, metric_properties, data_monitoring_metrics_table)) }}
3+
{% endmacro %}
4+
5+
{% macro default__get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, metric_names, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
26
{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}
37
{%- set full_table_name = elementary.model_node_to_full_name(model_graph_node) %}
48
{%- set test_execution_id = elementary.get_test_execution_id() %}
@@ -228,6 +232,196 @@
228232
{{ return(anomaly_scores_query) }}
229233
{% endmacro %}
230234

235+
{% macro fabric__get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, metric_names, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
236+
{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}
237+
{%- set full_table_name = elementary.model_node_to_full_name(model_graph_node) %}
238+
{%- set test_execution_id = elementary.get_test_execution_id() %}
239+
{%- set test_unique_id = elementary.get_test_unique_id() %}
240+
{%- if not data_monitoring_metrics_table %}
241+
{%- set data_monitoring_metrics_table = elementary.get_elementary_relation('data_monitoring_metrics') %}
242+
{%- endif %}
243+
244+
{%- if elementary.is_incremental_model(model_graph_node) %}
245+
{%- set latest_full_refresh = elementary.get_latest_full_refresh(model_graph_node) %}
246+
{%- else %}
247+
{%- set latest_full_refresh = none %}
248+
{%- endif %}
249+
250+
{%- if test_configuration.seasonality == 'day_of_week' %}
251+
{%- set bucket_seasonality_expr = elementary.edr_day_of_week_expression('bucket_end') %}
252+
{%- elif test_configuration.seasonality == 'hour_of_day' %}
253+
{%- set bucket_seasonality_expr = elementary.edr_hour_of_day_expression('bucket_end') %}
254+
{%- elif test_configuration.seasonality == 'hour_of_week' %}
255+
{%- set bucket_seasonality_expr = elementary.edr_hour_of_week_expression('bucket_end') %}
256+
{%- else %}
257+
{%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %}
258+
{%- endif %}
259+
{%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %}
260+
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %}
261+
262+
{%- set metric_time_bucket_expr = 'case when bucket_start is not null then bucket_start else bucket_end end' %}
263+
264+
{%- set anomaly_scores_query %}
265+
select * from (
266+
select
267+
{{ elementary.generate_surrogate_key([
268+
'metric_id',
269+
elementary.const_as_string(test_execution_id)
270+
]) }} as id,
271+
metric_id,
272+
{{ elementary.const_as_string(test_execution_id) }} as test_execution_id,
273+
{{ elementary.const_as_string(test_unique_id) }} as test_unique_id,
274+
{{ elementary.current_timestamp_column() }} as detected_at,
275+
full_table_name,
276+
column_name,
277+
metric_name,
278+
case
279+
when training_stddev is null then null
280+
when training_stddev = 0 then 0
281+
else (metric_value - training_avg) / (training_stddev)
282+
end as anomaly_score,
283+
{{ test_configuration.anomaly_sensitivity }} as anomaly_score_threshold,
284+
source_value as anomalous_value,
285+
{{ elementary.edr_cast_as_timestamp('bucket_start') }} as bucket_start,
286+
{{ elementary.edr_cast_as_timestamp('bucket_end') }} as bucket_end,
287+
bucket_seasonality,
288+
metric_value,
289+
290+
{% set limit_values = elementary.get_limit_metric_values(test_configuration) %}
291+
case
292+
when training_stddev is null then null
293+
when {{ limit_values.min_metric_value }} > 0 or metric_name in {{ elementary.to_sql_list(elementary.get_negative_value_supported_metrics()) }} then {{ limit_values.min_metric_value }}
294+
else 0
295+
end as min_metric_value,
296+
case
297+
when training_stddev is null then null
298+
else {{ limit_values.max_metric_value }}
299+
end as max_metric_value,
300+
training_avg,
301+
training_stddev,
302+
training_set_size,
303+
{{ elementary.edr_cast_as_timestamp('training_start') }} as training_start,
304+
{{ elementary.edr_cast_as_timestamp('training_end') }} as training_end,
305+
dimension,
306+
dimension_value
307+
from (
308+
select
309+
metric_id,
310+
full_table_name,
311+
column_name,
312+
dimension,
313+
dimension_value,
314+
metric_name,
315+
metric_value,
316+
source_value,
317+
bucket_start,
318+
bucket_end,
319+
bucket_seasonality,
320+
bucket_duration_hours,
321+
updated_at,
322+
avg(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg,
323+
stddev(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
324+
count(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size,
325+
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end,
326+
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start
327+
from (
328+
select
329+
id as metric_id,
330+
full_table_name,
331+
column_name,
332+
dimension,
333+
dimension_value,
334+
metric_name,
335+
metric_value,
336+
source_value,
337+
bucket_start,
338+
bucket_end,
339+
{{ bucket_seasonality_expr }} as bucket_seasonality,
340+
{{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded,
341+
bucket_duration_hours,
342+
updated_at
343+
from (
344+
select
345+
id,
346+
full_table_name,
347+
column_name,
348+
metric_name,
349+
metric_value,
350+
source_value,
351+
bucket_start,
352+
bucket_end,
353+
bucket_duration_hours,
354+
updated_at,
355+
dimension,
356+
dimension_value,
357+
{{ metric_time_bucket_expr }} as metric_time_bucket,
358+
{{ elementary.edr_cast_as_date(elementary.edr_date_trunc('day', metric_time_bucket_expr))}} as metric_date,
359+
row_number() over (partition by id order by updated_at desc) as row_number
360+
from (
361+
select * from (
362+
select
363+
id,
364+
full_table_name,
365+
column_name,
366+
metric_name,
367+
metric_type,
368+
metric_value,
369+
source_value,
370+
bucket_start,
371+
bucket_end,
372+
bucket_duration_hours,
373+
updated_at,
374+
dimension,
375+
dimension_value,
376+
metric_properties
377+
from {{ data_monitoring_metrics_table }}
378+
where
379+
bucket_end > {{ min_bucket_start_expr }}
380+
{% if test_configuration.timestamp_column %}
381+
and exists (
382+
select 1 from (
383+
select edr_bucket_start, edr_bucket_end
384+
from ({{ elementary.complete_buckets_cte(metric_properties, min_bucket_start_expr,
385+
elementary.edr_quote(detection_end)) }}) results
386+
where edr_bucket_start >= {{ elementary.edr_cast_as_timestamp(min_bucket_start_expr) }}
387+
and edr_bucket_end <= {{ elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) }}
388+
) buckets
389+
where buckets.edr_bucket_start = cast(bucket_start as datetime2(2))
390+
and buckets.edr_bucket_end = cast(bucket_end as datetime2(2))
391+
)
392+
{% endif %}
393+
and metric_properties = {{ elementary.dict_to_quoted_json(metric_properties) }}
394+
{% if latest_full_refresh %}
395+
and updated_at > {{ elementary.edr_cast_as_timestamp(elementary.edr_quote(latest_full_refresh)) }}
396+
{% endif %}
397+
and upper(full_table_name) = upper('{{ full_table_name }}')
398+
and metric_name in {{ elementary.strings_list_to_tuple(metric_names) }}
399+
{%- if column_name %}
400+
and upper(column_name) = upper('{{ column_name }}')
401+
{%- endif %}
402+
{%- if columns_only %}
403+
and column_name is not null
404+
{%- endif %}
405+
{% if test_configuration.dimensions %}
406+
and dimension = {{ elementary.edr_quote(elementary.join_list(test_configuration.dimensions, '; ')) }}
407+
{% endif %}
408+
) data_monitoring_metrics
409+
union all
410+
select * from {{ test_metrics_table_relation }}
411+
) union_metrics
412+
) grouped_metrics_duplicates
413+
where row_number = 1
414+
) grouped_metrics
415+
where not is_excluded
416+
) time_window_aggregation
417+
where
418+
metric_value is not null
419+
and training_avg is not null
420+
) anomaly_scores
421+
{% endset %}
422+
{{ return(anomaly_scores_query) }}
423+
{% endmacro %}
424+
231425
{% macro get_negative_value_supported_metrics() %}
232426
{% do return(["min", "max", "average", "standard_deviation", "variance", "sum"]) %}
233427
{% endmacro %}

macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
{{ return(trunc_min_bucket_start_expr) }}
1515
{% endmacro %}
1616

17-
{# This macro can't be used without truncating to full buckets #}
17+
{# This macro cant be used without truncating to full buckets #}
1818
{% macro get_backfill_bucket_start(detection_end, backfill_days) %}
1919
{% do return((detection_end - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00")) %}
2020
{% endmacro %}
@@ -40,7 +40,7 @@
4040
{%- endif %}
4141

4242
{%- set regular_bucket_times_query %}
43-
with bucket_times as (
43+
;with bucket_times as (
4444
select
4545
{{ trunc_min_bucket_start_expr }} as days_back_start
4646
, {{ detection_end_expr }} as detection_end
@@ -58,7 +58,7 @@
5858
{%- endset %}
5959

6060
{%- set incremental_bucket_times_query %}
61-
with all_buckets as (
61+
;with all_buckets as (
6262
select edr_bucket_start as bucket_start, edr_bucket_end as bucket_end
6363
from ({{ elementary.complete_buckets_cte(metric_properties, trunc_min_bucket_start_expr, detection_end_expr) }}) results
6464
where edr_bucket_start >= {{ trunc_min_bucket_start_expr }}

0 commit comments

Comments
 (0)