Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
target/
dbt_packages/
dbt_internal_packages/
logs/
scripts/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
{%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %}
{%- endif %}
{%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %}
{%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %}
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %}

{# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the
Expand All @@ -39,9 +40,9 @@
with buckets as (
select edr_bucket_start, edr_bucket_end
from ({{ elementary.complete_buckets_cte(metric_properties, min_bucket_start_expr,
elementary.edr_quote(detection_end)) }}) results
where edr_bucket_start >= {{ elementary.edr_cast_as_timestamp(min_bucket_start_expr) }}
and edr_bucket_end <= {{ elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) }}
detection_end_expr) }}) results
where edr_bucket_start >= {{ min_bucket_start_expr }}
and edr_bucket_end <= {{ detection_end_expr }}
),
{% else %}
with
Expand Down Expand Up @@ -121,7 +122,7 @@
{{ metric_time_bucket_expr }} as metric_time_bucket,
{{ elementary.edr_cast_as_date(elementary.edr_date_trunc('day', metric_time_bucket_expr))}} as metric_date,

row_number() over (partition by id order by updated_at desc) as row_number
row_number() over (partition by id order by updated_at desc) as row_num
from union_metrics

),
Expand All @@ -144,7 +145,7 @@
bucket_duration_hours,
updated_at
from grouped_metrics_duplicates
where row_number = 1
where row_num = 1
),

time_window_aggregation as (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{% macro get_detection_end(detection_delay) %}
{% if not detection_delay %}
{% do return(elementary.get_run_started_at()) %}
{% do return(elementary.get_run_started_at().replace(microsecond=0)) %}
{% endif %}

{%- set kwargs = {detection_delay.period+'s': detection_delay.count} %}
{%- set detection_end = elementary.get_run_started_at() - modules.datetime.timedelta(**kwargs) %}
{%- set detection_end = elementary.get_run_started_at().replace(microsecond=0) - modules.datetime.timedelta(**kwargs) %}
{% do return(detection_end) %}
{% endmacro %}

Expand Down Expand Up @@ -105,8 +105,8 @@
{%- set buckets = elementary.agate_to_dicts(elementary.run_query(incremental_bucket_times_query))[0] %}
{% endif %}
{%- if buckets %}
{%- set min_bucket_start = elementary.edr_quote(buckets.get('min_bucket_start')) %}
{%- set max_bucket_end = elementary.edr_quote(buckets.get('max_bucket_end')) %}
{%- set min_bucket_start = elementary.edr_datetime_to_sql(buckets.get('min_bucket_start')) %}
{%- set max_bucket_end = elementary.edr_datetime_to_sql(buckets.get('max_bucket_end')) %}
{{ return([min_bucket_start, max_bucket_end]) }}
{%- else %}
{{ exceptions.raise_compiler_error("Failed to calc test buckets min and max") }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
stddevPop(cast({{ column_name }} as {{ elementary.edr_type_float() }}))
{%- endmacro %}

{% macro dremio__standard_deviation(column_name) -%}
stddev_pop(cast({{ column_name }} as {{ elementary.edr_type_float() }}))
{%- endmacro %}

{% macro variance(column_name) -%}
{{ return(adapter.dispatch('variance', 'elementary')(column_name)) }}
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
bucket_end,
dimension_value,
metric_value,
row_number () over (partition by dimension_value order by bucket_end desc) as row_number
from {{ data_monitoring_metrics_relation }}
where full_table_name = {{ full_table_name_str }}
and metric_name = {{ elementary.edr_quote(metric_name) }}
Expand Down Expand Up @@ -148,7 +147,6 @@
bucket_end,
dimension_value,
metric_value,
row_number () over (partition by dimension_value order by bucket_end desc) as row_number
from {{ data_monitoring_metrics_relation }}
where full_table_name = {{ full_table_name_str }}
and metric_name = {{ elementary.edr_quote(metric_name) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@
bucket_freshness_ranked as (
select
*,
row_number () over (partition by edr_bucket_end order by freshness is null, freshness desc) as row_number
row_number () over (partition by edr_bucket_end order by freshness is null, freshness desc) as row_num
from bucket_all_freshness_metrics
)

Expand All @@ -281,7 +281,7 @@
{{ elementary.edr_cast_as_string('update_timestamp') }} as source_value,
freshness as metric_value
from bucket_freshness_ranked
where row_number = 1
where row_num = 1
{% endmacro %}

{% macro event_freshness_metric_query(metric, metric_properties) %}
Expand Down
17 changes: 17 additions & 0 deletions macros/edr/system/system_utils/buckets_cte.sql
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,20 @@
{{ return(complete_buckets_cte) }}
{% endmacro %}

{% macro dremio__complete_buckets_cte(time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr) %}
{%- set complete_buckets_cte %}
with integers as (
select (row_number() over (order by t1.val, t2.val, t3.val, t4.val)) - 1 as num
from (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t1(val)
cross join (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t2(val)
cross join (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t3(val)
cross join (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t4(val)
)
select
{{ elementary.edr_timeadd(time_bucket.period, 'num * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_start,
{{ elementary.edr_timeadd(time_bucket.period, '(num + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_end
from integers
where {{ elementary.edr_timeadd(time_bucket.period, '(num + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} <= {{ max_bucket_end_expr }}
{%- endset %}
{{ return(complete_buckets_cte) }}
{% endmacro %}
4 changes: 2 additions & 2 deletions macros/edr/tests/on_run_end/union_columns_snapshot_query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
),
columns_snapshot_with_duplicates as (
select *,
row_number() over (partition by column_state_id order by detected_at desc) as row_number
row_number() over (partition by column_state_id order by detected_at desc) as row_num
from union_temp_columns_snapshot
)
select
Expand All @@ -21,7 +21,7 @@
is_new,
detected_at
from columns_snapshot_with_duplicates
where row_number = 1
where row_num = 1
{%- endset %}
{{ return(union_temp_query) }}
{%- endif %}
Expand Down
4 changes: 2 additions & 2 deletions macros/edr/tests/on_run_end/union_metrics_query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
),
metrics_with_duplicates as (
select *,
row_number() over (partition by id order by updated_at desc) as row_number
row_number() over (partition by id order by updated_at desc) as row_num
from union_temps_metrics
)
select
Expand All @@ -28,7 +28,7 @@
dimension_value,
metric_properties
from metrics_with_duplicates
where row_number = 1
where row_num = 1
{%- endset %}
{{ return(union_temp_query) }}
{%- endif %}
Expand Down
10 changes: 5 additions & 5 deletions macros/edr/tests/test_utils/clean_elementary_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
{% endmacro %}

{% macro trino__get_clean_elementary_test_tables_queries(test_table_relations) %}
{% set queries = [] %}
{% for test_relation in test_table_relations %}
{% do queries.append("DROP TABLE IF EXISTS {}".format(test_relation)) %}
{% endfor %}
{% do return(queries) %}
{% do return(elementary.get_transactionless_clean_elementary_test_tables_queries(test_table_relations)) %}
{% endmacro %}

{% macro dremio__get_clean_elementary_test_tables_queries(test_table_relations) %}
{% do return(elementary.get_transactionless_clean_elementary_test_tables_queries(test_table_relations)) %}
{% endmacro %}

{% macro get_transaction_clean_elementary_test_tables_queries(test_table_relations) %}
Expand Down
2 changes: 1 addition & 1 deletion macros/edr/tests/test_utils/get_anomaly_query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ case when

final_results as (
select
metric_value as value,
metric_value as {{ elementary.escape_reserved_keywords('value') }},
training_avg as average,
{# when there is an anomaly we would want to use the last value of the metric (lag), otherwise visually the expectations would look out of bounds #}
case
Expand Down
2 changes: 1 addition & 1 deletion macros/utils/cross_db_utils/current_timestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@
{% macro dremio__edr_current_timestamp_in_utc() -%}
-- Dremio CURRENT_TIMESTAMP() is always in UTC
CURRENT_TIMESTAMP()
{%- endmacro -%}
{%- endmacro -%}
25 changes: 25 additions & 0 deletions macros/utils/cross_db_utils/datediff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,28 @@
{% endif %}
{{ return(macro(elementary.edr_cast_as_timestamp(first_date), elementary.edr_cast_as_timestamp(second_date), date_part)) }}
{% endmacro %}

{% macro dremio__edr_datediff(first_date, second_date, date_part) %}
{% if date_part.lower() == 'second' %}
{# Use a different method specifically for seconds diff - as it's helpful in a specific case
when subtracting aggregate values (problematic with the statement containing "select" - see below) #}
{%- set expr -%}
(unix_timestamp(substr(cast(({{ second_date }}) as varchar), 1, 19)) -
unix_timestamp(substr(cast(({{ first_date }}) as varchar), 1, 19)))
{%- endset -%}
{% do return(expr) %}
{% endif %}

{% set macro = dbt.datediff or dbt_utils.datediff %}
{% if not macro %}
{{ exceptions.raise_compiler_error("Did not find a `datediff` macro.") }}
{% endif %}

{% set sql = macro(elementary.edr_cast_as_timestamp(first_date), elementary.edr_cast_as_timestamp(second_date), date_part) %}

{# Hack - dbt-dremio implements this macro as a select statement (which seems to be necessary), but in order
for it to really work we wrap it in parentheses and remove ; if it is there #}
{% set sql = '(' ~ sql.strip().replace(';', '') ~ ')' %}

{% do return(sql) %}
{% endmacro %}
14 changes: 14 additions & 0 deletions macros/utils/cross_db_utils/datetime_to_sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro edr_datetime_to_sql(dt) %}
{% do return(adapter.dispatch("edr_datetime_to_sql", "elementary")(dt)) %}
{% endmacro %}

{% macro default__edr_datetime_to_sql(dt) %}
{% do return(elementary.edr_quote(dt)) %}
{% endmacro %}

{% macro dremio__edr_datetime_to_sql(dt) %}
{% if dt is string %}
{% set dt = modules.datetime.datetime.fromisoformat(dt) %}
{% endif %}
{% do return(elementary.edr_quote(dt.strftime(elementary.get_time_format()))) %}
{% endmacro %}
8 changes: 8 additions & 0 deletions macros/utils/data_types/cast_column.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
)
{%- endmacro -%}

{%- macro dremio__edr_cast_as_timestamp(timestamp_field) -%}
cast({{ timestamp_field }} as {{ elementary.edr_type_timestamp() }})
{%- endmacro -%}

{%- macro edr_cast_as_float(column) -%}
cast({{ column }} as {{ elementary.edr_type_float() }})
{%- endmacro -%}
Expand Down Expand Up @@ -85,6 +89,10 @@
)
{%- endmacro -%}

{%- macro dremio__edr_cast_as_date(timestamp_field) -%}
cast({{ timestamp_field }} as {{ elementary.edr_type_date() }})
{%- endmacro -%}


{%- macro const_as_text(string) -%}
{{ return(adapter.dispatch('const_as_text', 'elementary')(string)) }}
Expand Down
4 changes: 4 additions & 0 deletions macros/utils/data_types/data_type.sql
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,7 @@
{% macro trino__edr_type_timestamp() %}
timestamp(6)
{% endmacro %}

{% macro dremio__edr_type_timestamp() %}
timestamp
{% endmacro %}
15 changes: 14 additions & 1 deletion macros/utils/graph/get_package_database_and_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,17 @@
{% endif %}
{% endif %}
{{ return([none, none]) }}
{% endmacro %}
{% endmacro %}

{% macro dremio__get_package_database_and_schema(package_name='elementary') %}
{% if execute %}
{% set node_in_package = graph.nodes.values()
| selectattr("resource_type", "==", "model")
| selectattr("package_name", "==", package_name)
| selectattr("config.materialized", "!=", "view") | first %}
{% if node_in_package %}
{{ return([node_in_package.database, node_in_package.schema]) }}
{% endif %}
{% endif %}
{{ return([none, none]) }}
{% endmacro %}
2 changes: 1 addition & 1 deletion macros/utils/sql_utils/escape_reserved_keywords.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{% endmacro %}

{% macro dremio__is_reserved_keywords(keyword) %}
{% do return(keyword in ['filter', 'sql', 'timestamp']) %}
{% do return(keyword in ['filter', 'sql', 'timestamp', 'value']) %}
{% endmacro %}

{% macro escape_keywords(keyword) %}
Expand Down