diff --git a/.github/workflows/test-all-warehouses.yml b/.github/workflows/test-all-warehouses.yml index a01b0c8d0..a45f1d50d 100644 --- a/.github/workflows/test-all-warehouses.yml +++ b/.github/workflows/test-all-warehouses.yml @@ -49,6 +49,7 @@ jobs: athena, trino, clickhouse, + dremio, ] include: # If we're not running on a specific dbt version, then always add postgres on 1.8.0 diff --git a/.github/workflows/test-warehouse.yml b/.github/workflows/test-warehouse.yml index c37a46bd7..21fd675c9 100644 --- a/.github/workflows/test-warehouse.yml +++ b/.github/workflows/test-warehouse.yml @@ -17,6 +17,7 @@ on: - athena - trino - clickhouse + - dremio elementary-ref: type: string required: false @@ -88,6 +89,11 @@ jobs: working-directory: ${{ env.TESTS_DIR }} run: docker compose up -d clickhouse + - name: Start Dremio + if: inputs.warehouse-type == 'dremio' + working-directory: ${{ env.TESTS_DIR }} + run: docker compose -f docker-compose-dremio.yml up -d + - name: Setup Python uses: actions/setup-python@v4 with: diff --git a/.gitignore b/.gitignore index d8c7c72f2..15e8d8196 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ target/ dbt_packages/ +dbt_internal_packages/ logs/ scripts/ diff --git a/integration_tests/dbt_project/macros/generate_schema_name.sql b/integration_tests/dbt_project/macros/generate_schema_name.sql index 3f3a20251..3e207791f 100644 --- a/integration_tests/dbt_project/macros/generate_schema_name.sql +++ b/integration_tests/dbt_project/macros/generate_schema_name.sql @@ -1,9 +1,5 @@ {% macro generate_schema_name(custom_schema_name, node) -%} - {%- set schema_name = target.schema -%} - {% if custom_schema_name %} - {% set schema_name = "{}_{}".format(schema_name, custom_schema_name) %} - {% endif %} - + {% set schema_name = get_default_schema_name(custom_schema_name, node) %} {% set schema_name_suffix_by_var = var('schema_name_suffix', '') %} {% if schema_name_suffix_by_var %} {% set schema_name = schema_name + schema_name_suffix_by_var %} @@ -11,3 +7,26 @@ {% do return(schema_name) %} {%- endmacro %} + +{% macro get_default_schema_name(custom_schema_name, node) -%} + {% do return(adapter.dispatch('get_default_schema_name', 'elementary_tests')(custom_schema_name, node)) %} +{% endmacro %} + +{% macro default__get_default_schema_name(custom_schema_name, node) -%} + {%- set schema_name = target.schema -%} + {% if custom_schema_name %} + {% set schema_name = "{}_{}".format(schema_name, custom_schema_name) %} + {% endif %} + {% do return(schema_name) %} +{%- endmacro %} + +{% macro dremio__get_default_schema_name(custom_schema_name, node) -%} + {%- set default_schema = target.schema if not is_datalake_node(node) else target.root_path -%} + {%- if not custom_schema_name -%} + {% do return(default_schema) %} + {%- elif default_schema == 'no_schema' -%} + {% do return(custom_schema_name) %} + {%- else -%} + {% do return("{}_{}".format(default_schema, custom_schema_name)) %} + {%- endif -%} +{%- endmacro %} diff --git a/integration_tests/dbt_project/models/one.sql b/integration_tests/dbt_project/models/one.sql index 30955995b..49d24888f 100644 --- a/integration_tests/dbt_project/models/one.sql +++ b/integration_tests/dbt_project/models/one.sql @@ -6,4 +6,4 @@ ) }} -SELECT 1 AS one +SELECT 1 AS {{ elementary.escape_reserved_keywords('one') }} diff --git a/integration_tests/docker-compose-dremio.yml b/integration_tests/docker-compose-dremio.yml new file mode 100644 index 000000000..4ea82c5e9 --- /dev/null +++ b/integration_tests/docker-compose-dremio.yml @@ -0,0 +1,85 @@ +services: + # Nessie Catalog Server Using In-Memory Store + nessie: + image: projectnessie/nessie:latest + container_name: catalog + networks: + - dremio-lakehouse + ports: + - 19120:19120 + + # Minio Storage Server + minio: + image: minio/minio:latest + container_name: storage + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=storage + - MINIO_REGION_NAME=us-east-1 + - MINIO_REGION=us-east-1 + networks: + - dremio-lakehouse + ports: + - 9001:9001 + - 9000:9000 + command: ["server", "/data", "--console-address", ":9001"] + volumes: + - minio_data:/data + + minio-setup: + image: minio/mc + container_name: minio-setup + depends_on: + - minio + entrypoint: > + /bin/sh -c " + until (echo > /dev/tcp/minio/9000) >/dev/null 2>&1; do + echo 'Waiting for MinIO...'; + sleep 2; + done; + mc alias set myminio http://minio:9000 admin password; + mc mb myminio/datalake; + mc ls myminio; + " + networks: + - dremio-lakehouse + + # Dremio + dremio: + image: dremio/dremio-oss:latest + platform: linux/amd64 + ports: + - 9047:9047 + - 31010:31010 + - 32010:32010 + - 45678:45678 + container_name: dremio + environment: + - DREMIO_JAVA_SERVER_EXTRA_OPTS=-Dpaths.dist=file:///opt/dremio/data/dist -Ddebug.addDefaultUser=true + - SERVICES_COORDINATOR_ENABLED=true + - SERVICES_EXECUTOR_ENABLED=true + networks: + - dremio-lakehouse + volumes: + - dremio_data:/opt/dremio/data:rw + # Workaround for permission issues in podman + user: "0" + + dremio-setup: + image: alpine:latest + container_name: dremio-setup + depends_on: + - dremio + volumes: + - ./docker/dremio/dremio-setup.sh:/dremio-setup.sh + command: sh /dremio-setup.sh + networks: + - dremio-lakehouse + +networks: + dremio-lakehouse: + +volumes: + dremio_data: + minio_data: diff --git a/integration_tests/docker/dremio/dremio-setup.sh b/integration_tests/docker/dremio/dremio-setup.sh new file mode 100644 index 000000000..7dec29605 --- /dev/null +++ b/integration_tests/docker/dremio/dremio-setup.sh @@ -0,0 +1,33 @@ +#!/bin/sh + +# Install required tools +apk add --no-cache curl jq + +# Wait for Dremio to be ready +until curl -s http://dremio:9047; do + echo "Waiting for Dremio..." + sleep 5 +done + +echo "Dremio is up. Proceeding with configuration..." + +# Log in to Dremio to get the auth token +AUTH_TOKEN=$(curl -s -X POST "http://dremio:9047/apiv2/login" \ + -H "Content-Type: application/json" \ + --data "{\"userName\":\"dremio\", \"password\":\"dremio123\"}" | jq -r .token) + +# Check if AUTH_TOKEN is not empty +if [ -z "$AUTH_TOKEN" ]; then + echo "Failed to obtain Dremio auth token" + exit 1 +fi + +echo "Obtained Dremio auth token" + +# Create the S3 source in Dremio +curl -s -X PUT "http://dremio:9047/apiv2/source/S3Source" \ + -H "Content-Type: application/json" \ + -H "Authorization: _dremio$AUTH_TOKEN" \ + --data "{\"name\":\"S3Source\",\"config\":{\"credentialType\":\"ACCESS_KEY\",\"accessKey\":\"admin\",\"accessSecret\":\"password\",\"secure\":false,\"externalBucketList\":[],\"enableAsync\":true,\"enableFileStatusCheck\":true,\"rootPath\":\"/\",\"defaultCtasFormat\":\"ICEBERG\",\"propertyList\":[{\"name\":\"fs.s3a.path.style.access\",\"value\":\"true\"},{\"name\":\"fs.s3a.endpoint\",\"value\":\"minio:9000\"},{\"name\":\"dremio.s3.compat\",\"value\":\"true\"}],\"whitelistedBuckets\":[],\"isCachingEnabled\":false,\"maxCacheSpacePct\":100},\"type\":\"S3\",\"metadataPolicy\":{\"deleteUnavailableDatasets\":true,\"autoPromoteDatasets\":false,\"namesRefreshMillis\":3600000,\"datasetDefinitionRefreshAfterMillis\":3600000,\"datasetDefinitionExpireAfterMillis\":10800000,\"authTTLMillis\":86400000,\"updateMode\":\"PREFETCH_QUERIED\"}}" + +echo "S3 Source created in Dremio" \ No newline at end of file diff --git a/integration_tests/tests/data_seeder.py b/integration_tests/tests/data_seeder.py index a73dfbfcd..dfba10100 100644 --- a/integration_tests/tests/data_seeder.py +++ b/integration_tests/tests/data_seeder.py @@ -1,6 +1,7 @@ import csv +from contextlib import contextmanager from pathlib import Path -from typing import List +from typing import Generator, List from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from logger import get_logger @@ -18,7 +19,8 @@ def __init__( self.dbt_project_path = dbt_project_path self.seeds_dir_path = seeds_dir_path - def seed(self, data: List[dict], table_name: str): + @contextmanager + def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]: seed_path = self.seeds_dir_path.joinpath(f"{table_name}.csv") try: with seed_path.open("w") as seed_file: @@ -28,5 +30,7 @@ def seed(self, data: List[dict], table_name: str): writer.writerows(data) seed_file.flush() self.dbt_runner.seed(select=str(relative_seed_path), full_refresh=True) + + yield finally: seed_path.unlink() diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index a871f0f53..fceeb8bac 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -3,10 +3,11 @@ from contextlib import contextmanager, nullcontext from pathlib import Path from tempfile import NamedTemporaryFile -from typing import Any, Dict, List, Literal, Optional, Union, overload +from typing import Any, Dict, Generator, List, Literal, Optional, Union, overload from uuid import uuid4 from data_seeder import DbtDataSeeder +from dbt_utils import get_database_and_schema_properties from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from elementary.clients.dbt.factory import create_dbt_runner from logger import get_logger @@ -42,7 +43,7 @@ def get_dbt_runner(target: str, project_dir: str) -> BaseDbtRunner: class DbtProject: def __init__(self, target: str, project_dir: str): self.dbt_runner = get_dbt_runner(target, project_dir) - + self.target = target self.project_dir_path = Path(project_dir) self.models_dir_path = self.project_dir_path / "models" self.tmp_models_dir_path = self.models_dir_path / "tmp" @@ -187,12 +188,16 @@ def test( test_id, materialization ) else: + database_property, schema_property = get_database_and_schema_properties( + self.target + ) props_yaml = { "version": 2, "sources": [ { "name": "test_data", - "schema": f"{{{{ target.schema }}}}{SCHEMA_NAME_SUFFIX}", + "schema": f"{{{{ target.{schema_property} }}}}{SCHEMA_NAME_SUFFIX}", + "database": f"{{{{ target.{database_property} }}}}", "tables": [table_yaml], } ], @@ -230,9 +235,19 @@ def test( return [test_result] if multiple_results else test_result def seed(self, data: List[dict], table_name: str): - return DbtDataSeeder( + with DbtDataSeeder( + self.dbt_runner, self.project_dir_path, self.seeds_dir_path + ).seed(data, table_name): + return + + @contextmanager + def seed_context( + self, data: List[dict], table_name: str + ) -> Generator[None, None, None]: + with DbtDataSeeder( self.dbt_runner, self.project_dir_path, self.seeds_dir_path - ).seed(data, table_name) + ).seed(data, table_name): + yield @contextmanager def create_temp_model_for_existing_table( diff --git a/integration_tests/tests/dbt_utils.py b/integration_tests/tests/dbt_utils.py new file mode 100644 index 000000000..fbe0e4510 --- /dev/null +++ b/integration_tests/tests/dbt_utils.py @@ -0,0 +1,6 @@ +def get_database_and_schema_properties(target: str, is_view: bool = False): + if target == "dremio" and not is_view: + return "datalake", "root_path" + elif target == "clickhouse": + return "schema", "schema" + return "database", "schema" diff --git a/integration_tests/tests/env.py b/integration_tests/tests/env.py index 27b78641e..b0baa46db 100644 --- a/integration_tests/tests/env.py +++ b/integration_tests/tests/env.py @@ -3,10 +3,13 @@ class Environment: def __init__(self, target: str, project_dir: str): + self.target = target self.dbt_runner = dbt_project.get_dbt_runner(target, project_dir) def clear(self): - self.dbt_runner.run_operation("elementary_tests.clear_env") + # drop schema in dremio doesnt work, but we run the dremio tests with docker so its not really important to drop the schema + if self.target != "dremio": + self.dbt_runner.run_operation("elementary_tests.clear_env") def init(self): self.dbt_runner.run(selector="init") diff --git a/integration_tests/tests/test_anomalies_backfill_logic.py b/integration_tests/tests/test_anomalies_backfill_logic.py index 77a3ffb18..d9a4dea66 100644 --- a/integration_tests/tests/test_anomalies_backfill_logic.py +++ b/integration_tests/tests/test_anomalies_backfill_logic.py @@ -19,12 +19,12 @@ bucket_start, bucket_end, metric_value, - row_number() over (partition by id order by updated_at desc) as row_number + row_number() over (partition by id order by updated_at desc) as row_num from {{{{ ref("data_monitoring_metrics") }}}} where metric_name = 'row_count' and lower(full_table_name) like '%{test_id}' ) select bucket_start, bucket_end, metric_value from metrics_ordered - where row_number = 1 + where row_num = 1 """ # This returns data points used in the latest anomaly test diff --git a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py index 4c94a31a2..d677c7a25 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py +++ b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py @@ -3,6 +3,7 @@ import pytest from dbt_flags import set_flags from dbt_project import DbtProject +from dbt_utils import get_database_and_schema_properties TEST_MODEL = "one" @@ -95,17 +96,25 @@ def test_metrics_anomaly_score(dbt_project: DbtProject): @pytest.mark.requires_dbt_version("1.8.0") def test_source_freshness_results(test_id: str, dbt_project: DbtProject): + database_property, schema_property = get_database_and_schema_properties( + dbt_project.target + ) + loaded_at_field = ( + '"UPDATE_TIME"::timestamp' + if dbt_project.target != "dremio" + else "TO_TIMESTAMP(SUBSTRING(UPDATE_TIME, 0, 23), 'YYYY-MM-DD HH24:MI:SS.FFF')" + ) source_config = { "version": 2, "sources": [ { "name": "test_source", - "database": "{{target.database if target.type != 'clickhouse' else target.schema}}", - "schema": "{{target.schema}}", + "database": f"{{{{ target.{database_property} }}}}", + "schema": f"{{{{ target.{schema_property} }}}}", "tables": [ { "name": test_id, - "loaded_at_field": '"UPDATE_TIME"::timestamp', + "loaded_at_field": loaded_at_field, "freshness": { "warn_after": { "count": 1, diff --git a/integration_tests/tests/test_dimension_anomalies.py b/integration_tests/tests/test_dimension_anomalies.py index 588a521f0..034921dbb 100644 --- a/integration_tests/tests/test_dimension_anomalies.py +++ b/integration_tests/tests/test_dimension_anomalies.py @@ -191,7 +191,7 @@ def test_dimension_anomalies_with_timestamp_exclude_final_results( test_args = { "timestamp_column": TIMESTAMP_COLUMN, "dimensions": ["superhero"], - "exclude_final_results": "value > 15", + "exclude_final_results": '{{ elementary.escape_reserved_keywords("value") }} > 15', } test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data) assert test_result["status"] == "fail" @@ -200,7 +200,7 @@ def test_dimension_anomalies_with_timestamp_exclude_final_results( test_args = { "timestamp_column": TIMESTAMP_COLUMN, "dimensions": ["superhero"], - "exclude_final_results": "average > 3", + "exclude_final_results": '{{ elementary.escape_reserved_keywords("average") }} > 3', } test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data) assert test_result["status"] == "fail" diff --git a/integration_tests/tests/test_disable_elementary.py b/integration_tests/tests/test_disable_elementary.py index 541411e37..7995a02af 100644 --- a/integration_tests/tests/test_disable_elementary.py +++ b/integration_tests/tests/test_disable_elementary.py @@ -1,6 +1,6 @@ from dbt_project import DbtProject -COLUMN_NAME = "value" +COLUMN_NAME = "some_column" def test_running_dbt_tests_without_elementary(test_id: str, dbt_project: DbtProject): diff --git a/integration_tests/tests/test_failed_row_count.py b/integration_tests/tests/test_failed_row_count.py index 66d109fb5..81943019b 100644 --- a/integration_tests/tests/test_failed_row_count.py +++ b/integration_tests/tests/test_failed_row_count.py @@ -1,7 +1,7 @@ import pytest from dbt_project import DbtProject -COLUMN_NAME = "value" +COLUMN_NAME = "some_column" # Failed row count currently not supported on ClickHouse diff --git a/integration_tests/tests/test_sampling.py b/integration_tests/tests/test_sampling.py index d91d29972..77391e664 100644 --- a/integration_tests/tests/test_sampling.py +++ b/integration_tests/tests/test_sampling.py @@ -3,7 +3,7 @@ import pytest from dbt_project import DbtProject -COLUMN_NAME = "value" +COLUMN_NAME = "some_column" SAMPLES_QUERY = """ diff --git a/integration_tests/tests/test_sampling_pii.py b/integration_tests/tests/test_sampling_pii.py index 681ec0f00..39fceaa7b 100644 --- a/integration_tests/tests/test_sampling_pii.py +++ b/integration_tests/tests/test_sampling_pii.py @@ -3,7 +3,7 @@ import pytest from dbt_project import DbtProject -COLUMN_NAME = "value" +COLUMN_NAME = "some_column" SAMPLES_QUERY = """ diff --git a/integration_tests/tests/test_schema_changes.py b/integration_tests/tests/test_schema_changes.py index 3a937bc0d..60d213521 100644 --- a/integration_tests/tests/test_schema_changes.py +++ b/integration_tests/tests/test_schema_changes.py @@ -22,7 +22,7 @@ ("name", "column_removed"), ] -STRING_JINJA = r"{{ 'STRING' if (target.type == 'bigquery' or target.type == 'databricks') else 'character varying' if target.type == 'redshift' else 'TEXT' }}" +STRING_JINJA = r"{{ 'STRING' if (target.type == 'bigquery' or target.type == 'databricks') else 'character varying' if (target.type == 'redshift' or target.type == 'dremio') else 'TEXT' }}" def assert_test_results(test_results: List[dict]): diff --git a/integration_tests/tests/test_string_monitors.py b/integration_tests/tests/test_string_monitors.py index 0df3337a3..6f7a17e5f 100644 --- a/integration_tests/tests/test_string_monitors.py +++ b/integration_tests/tests/test_string_monitors.py @@ -6,9 +6,9 @@ def test_missing_count(dbt_project: DbtProject, test_id: str): missing_values = [None, " ", "null", "NULL"] data = [{COLUMN_NAME: value} for value in ["a", "b", "c", " a "] + missing_values] - dbt_project.seed(data, test_id) - result = dbt_project.run_query( - f"select {{{{ elementary.missing_count('{COLUMN_NAME}') }}}} " - f"as missing_count from {{{{ generate_schema_name() }}}}.{test_id}" - )[0] + with dbt_project.seed_context(data, test_id): + result = dbt_project.run_query( + f"select {{{{ elementary.missing_count('{COLUMN_NAME}') }}}} " + f'as missing_count from {{{{ ref("{test_id}") }}}}' + )[0] assert result["missing_count"] == len(missing_values) diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index b8b8847f5..4819ae772 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -28,6 +28,7 @@ {%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %} {%- endif %} {%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %} + {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} {# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the @@ -39,9 +40,9 @@ with buckets as ( select edr_bucket_start, edr_bucket_end from ({{ elementary.complete_buckets_cte(metric_properties, min_bucket_start_expr, - elementary.edr_quote(detection_end)) }}) results - where edr_bucket_start >= {{ elementary.edr_cast_as_timestamp(min_bucket_start_expr) }} - and edr_bucket_end <= {{ elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) }} + detection_end_expr) }}) results + where edr_bucket_start >= {{ min_bucket_start_expr }} + and edr_bucket_end <= {{ detection_end_expr }} ), {% else %} with @@ -121,7 +122,7 @@ {{ metric_time_bucket_expr }} as metric_time_bucket, {{ elementary.edr_cast_as_date(elementary.edr_date_trunc('day', metric_time_bucket_expr))}} as metric_date, - row_number() over (partition by id order by updated_at desc) as row_number + row_number() over (partition by id order by updated_at desc) as row_num from union_metrics ), @@ -144,7 +145,7 @@ bucket_duration_hours, updated_at from grouped_metrics_duplicates - where row_number = 1 + where row_num = 1 ), time_window_aggregation as ( @@ -189,7 +190,8 @@ metric_name, case when training_stddev is null then null - when training_stddev = 0 then 0 + when training_set_size = 1 then null -- Single value case - no historical context for anomaly detection + when training_stddev = 0 then 0 -- Stationary data case - valid, all values are identical else (metric_value - training_avg) / (training_stddev) end as anomaly_score, {{ test_configuration.anomaly_sensitivity }} as anomaly_score_threshold, @@ -201,12 +203,12 @@ {% set limit_values = elementary.get_limit_metric_values(test_configuration) %} case - when training_stddev is null then null + when training_stddev is null or training_set_size = 1 then null when {{ limit_values.min_metric_value }} > 0 or metric_name in {{ elementary.to_sql_list(elementary.get_negative_value_supported_metrics()) }} then {{ limit_values.min_metric_value }} else 0 end as min_metric_value, case - when training_stddev is null then null + when training_stddev is null or training_set_size = 1 then null else {{ limit_values.max_metric_value }} end as max_metric_value, training_avg, diff --git a/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql b/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql index 09ceeb1d2..985a5d8d8 100644 --- a/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql +++ b/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql @@ -1,10 +1,10 @@ {% macro get_detection_end(detection_delay) %} {% if not detection_delay %} - {% do return(elementary.get_run_started_at()) %} + {% do return(elementary.get_run_started_at().replace(microsecond=0)) %} {% endif %} {%- set kwargs = {detection_delay.period+'s': detection_delay.count} %} - {%- set detection_end = elementary.get_run_started_at() - modules.datetime.timedelta(**kwargs) %} + {%- set detection_end = elementary.get_run_started_at().replace(microsecond=0) - modules.datetime.timedelta(**kwargs) %} {% do return(detection_end) %} {% endmacro %} @@ -23,9 +23,9 @@ {% macro get_metric_buckets_min_and_max(model_relation, backfill_days, days_back, detection_delay=none, metric_names=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %} {%- set detection_end = elementary.get_detection_end(detection_delay) %} - {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) %} + {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set trunc_min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, days_back) %} - {%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.get_backfill_bucket_start(detection_end, backfill_days))) %} + {%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(elementary.get_backfill_bucket_start(detection_end, backfill_days))) %} {%- set full_table_name = elementary.relation_to_full_name(model_relation) %} {%- set force_metrics_backfill = elementary.get_config_var('force_metrics_backfill') %} @@ -105,8 +105,8 @@ {%- set buckets = elementary.agate_to_dicts(elementary.run_query(incremental_bucket_times_query))[0] %} {% endif %} {%- if buckets %} - {%- set min_bucket_start = elementary.edr_quote(buckets.get('min_bucket_start')) %} - {%- set max_bucket_end = elementary.edr_quote(buckets.get('max_bucket_end')) %} + {%- set min_bucket_start = elementary.edr_datetime_to_sql(buckets.get('min_bucket_start')) %} + {%- set max_bucket_end = elementary.edr_datetime_to_sql(buckets.get('max_bucket_end')) %} {{ return([min_bucket_start, max_bucket_end]) }} {%- else %} {{ exceptions.raise_compiler_error("Failed to calc test buckets min and max") }} diff --git a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql index 781007ef6..ad0c631fd 100644 --- a/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql +++ b/macros/edr/data_monitoring/monitors/column_numeric_monitors.sql @@ -34,6 +34,13 @@ stddevPop(cast({{ column_name }} as {{ elementary.edr_type_float() }})) {%- endmacro %} +{% macro dremio__standard_deviation(column_name) -%} + -- Dremio's stddev in window functions can raise division by zero with single values + -- stddev_pop returns 0 for single values instead of raising an error + -- We'll handle the single-value case in the anomaly detection logic using training_set_size + stddev_pop(cast({{ column_name }} as {{ elementary.edr_type_float() }})) +{%- endmacro %} + {% macro variance(column_name) -%} {{ return(adapter.dispatch('variance', 'elementary')(column_name)) }} {%- endmacro %} diff --git a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql index a7c21c7ae..cd74dc47b 100644 --- a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql @@ -61,8 +61,8 @@ {%- if 'null_count' in metric_types -%} {{ elementary.null_count(column) }} {%- else -%} null {% endif %} as null_count, {%- if 'null_percent' in metric_types -%} {{ elementary.null_percent(column) }} {%- else -%} null {% endif %} as null_percent, {%- if 'not_null_percent' in metric_types -%} {{ elementary.not_null_percent(column) }} {%- else -%} null {% endif %} as not_null_percent, - {%- if 'max' in metric_types -%} {{ elementary.max(column) }} {%- else -%} null {% endif %} as max, - {%- if 'min' in metric_types -%} {{ elementary.min(column) }} {%- else -%} null {% endif %} as min, + {%- if 'max' in metric_types -%} {{ elementary.max(column) }} {%- else -%} null {% endif %} as {{ elementary.escape_reserved_keywords('max') }}, + {%- if 'min' in metric_types -%} {{ elementary.min(column) }} {%- else -%} null {% endif %} as {{ elementary.escape_reserved_keywords('min') }}, {%- if 'average' in metric_types -%} {{ elementary.average(column) }} {%- else -%} null {% endif %} as average, {%- if 'zero_count' in metric_types -%} {{ elementary.zero_count(column) }} {%- else -%} null {% endif %} as zero_count, {%- if 'zero_percent' in metric_types -%} {{ elementary.zero_percent(column) }} {%- else -%} null {% endif %} as zero_percent, @@ -77,7 +77,7 @@ {%- if 'count_true' in metric_types -%} {{ elementary.count_true(column) }} {%- else -%} null {% endif %} as count_true, {%- if 'count_false' in metric_types -%} {{ elementary.count_false(column) }} {%- else -%} null {% endif %} as count_false, {%- if 'not_missing_percent' in metric_types -%} {{ elementary.not_missing_percent(column) }} {%- else -%} null {% endif %} as not_missing_percent, - {%- if 'sum' in metric_types -%} {{ elementary.sum(column) }} {%- else -%} null {% endif %} as sum + {%- if 'sum' in metric_types -%} {{ elementary.sum(column) }} {%- else -%} null {% endif %} as {{ elementary.escape_reserved_keywords('sum') }} from filtered_monitored_table {%- if timestamp_column %} left join buckets on (edr_bucket_start = start_bucket_in_data) @@ -112,10 +112,10 @@ {{ elementary.null_string() }} as dimension, {{ elementary.null_string() }} as dimension_value, {% endif %} - {{ elementary.edr_cast_as_float(metric_type) }} as metric_value, + {{ elementary.edr_cast_as_float(elementary.escape_reserved_keywords(metric_type)) }} as metric_value, {{ elementary.edr_cast_as_string(elementary.edr_quote(metric_name)) }} as metric_name, {{ elementary.edr_cast_as_string(elementary.edr_quote(metric_type)) }} as metric_type - from column_metrics where {{ metric_type }} is not null + from column_metrics where {{ elementary.escape_reserved_keywords(metric_type) }} is not null {% if not loop.last %} union all {% endif %} {%- endfor %} {%- else %} diff --git a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql index 5e735ee6b..7fbe82673 100644 --- a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql @@ -39,8 +39,7 @@ select bucket_end, dimension_value, - metric_value, - row_number () over (partition by dimension_value order by bucket_end desc) as row_number + metric_value from {{ data_monitoring_metrics_relation }} where full_table_name = {{ full_table_name_str }} and metric_name = {{ elementary.edr_quote(metric_name) }} @@ -147,8 +146,7 @@ select bucket_end, dimension_value, - metric_value, - row_number () over (partition by dimension_value order by bucket_end desc) as row_number + metric_value from {{ data_monitoring_metrics_relation }} where full_table_name = {{ full_table_name_str }} and metric_name = {{ elementary.edr_quote(metric_name) }} diff --git a/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql index 67aca02c6..23ad73a55 100644 --- a/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/table_monitoring_query.sql @@ -76,17 +76,19 @@ {% endmacro %} {% macro get_timestamp_table_query(monitored_table, metric_properties, timestamp_column, table_metrics, min_bucket_start, max_bucket_end, full_table_name_str) %} + {%- set timestamp_col_expr = elementary.edr_cast_as_timestamp(elementary.escape_reserved_keywords(timestamp_column)) -%} + with partially_time_filtered_monitored_table as ( select - {{ elementary.edr_cast_as_timestamp(timestamp_column) }} as monitored_table_timestamp_column + {{ timestamp_col_expr }} as monitored_table_timestamp_column {%- if metric_properties.timestamp_column and metric_properties.event_timestamp_column %} - , {{ elementary.edr_cast_as_timestamp(metric_properties.event_timestamp_column) }} as monitored_table_event_timestamp_column + , {{ elementary.edr_cast_as_timestamp(elementary.escape_reserved_keywords(metric_properties.event_timestamp_column)) }} as monitored_table_event_timestamp_column {%- endif %} from {{ monitored_table }} -- Freshness metric calculated differences between consecutive buckets, thus the first diff -- is always null. Therefore we let few old buckets inside the query and filter them later, just for -- the first relevant diff not to be null - where {{ elementary.edr_cast_as_timestamp(timestamp_column) }} >= {{ elementary.edr_timeadd("day", -7, elementary.edr_cast_as_timestamp(min_bucket_start)) }} + where {{ timestamp_col_expr }} >= {{ elementary.edr_timeadd("day", -7, elementary.edr_cast_as_timestamp(min_bucket_start)) }} {% if metric_properties.where_expression %} and {{ metric_properties.where_expression }} {% endif %} ), monitored_table as ( @@ -269,7 +271,7 @@ bucket_freshness_ranked as ( select *, - row_number () over (partition by edr_bucket_end order by freshness is null, freshness desc) as row_number + row_number () over (partition by edr_bucket_end order by freshness is null, freshness desc) as row_num from bucket_all_freshness_metrics ) @@ -281,7 +283,7 @@ {{ elementary.edr_cast_as_string('update_timestamp') }} as source_value, freshness as metric_value from bucket_freshness_ranked - where row_number = 1 + where row_num = 1 {% endmacro %} {% macro event_freshness_metric_query(metric, metric_properties) %} diff --git a/macros/edr/materializations/test/failed_row_count.sql b/macros/edr/materializations/test/failed_row_count.sql index d20786460..f98b2621b 100644 --- a/macros/edr/materializations/test/failed_row_count.sql +++ b/macros/edr/materializations/test/failed_row_count.sql @@ -39,5 +39,5 @@ with results as ( {{ sql }} ) - select {{ failed_row_count_calc }} as count from results + select {{ failed_row_count_calc }} as {{ elementary.escape_reserved_keywords('count') }} from results {% endmacro %} diff --git a/macros/edr/system/system_utils/buckets_cte.sql b/macros/edr/system/system_utils/buckets_cte.sql index fe82b61e8..f81baa20b 100644 --- a/macros/edr/system/system_utils/buckets_cte.sql +++ b/macros/edr/system/system_utils/buckets_cte.sql @@ -144,3 +144,20 @@ {{ return(complete_buckets_cte) }} {% endmacro %} +{% macro dremio__complete_buckets_cte(time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr) %} + {%- set complete_buckets_cte %} + with integers as ( + select (row_number() over (order by t1.val, t2.val, t3.val, t4.val)) - 1 as num + from (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t1(val) + cross join (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t2(val) + cross join (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t3(val) + cross join (values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) t4(val) + ) + select + {{ elementary.edr_timeadd(time_bucket.period, 'num * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_start, + {{ elementary.edr_timeadd(time_bucket.period, '(num + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} as edr_bucket_end + from integers + where {{ elementary.edr_timeadd(time_bucket.period, '(num + 1) * ' ~ time_bucket.count, min_bucket_start_expr) }} <= {{ max_bucket_end_expr }} + {%- endset %} + {{ return(complete_buckets_cte) }} +{% endmacro %} diff --git a/macros/edr/system/system_utils/empty_table.sql b/macros/edr/system/system_utils/empty_table.sql index 301bd09e6..07ccc3e4d 100644 --- a/macros/edr/system/system_utils/empty_table.sql +++ b/macros/edr/system/system_utils/empty_table.sql @@ -137,6 +137,8 @@ {%- set dummy_values = elementary.dummy_values() %} + {%- set column_name = elementary.escape_reserved_keywords(column_name) %} + {%- if data_type == 'boolean' %} cast ({{ dummy_values['boolean'] }} as {{ elementary.edr_type_bool()}}) as {{ column_name }} {%- elif data_type == 'timestamp' -%} diff --git a/macros/edr/system/system_utils/full_names.sql b/macros/edr/system/system_utils/full_names.sql index b3c5cf3b5..29c50105d 100644 --- a/macros/edr/system/system_utils/full_names.sql +++ b/macros/edr/system/system_utils/full_names.sql @@ -102,6 +102,20 @@ {% endmacro %} +{% macro dremio__full_name_split(part_name) %} + {%- if part_name == 'database_name' -%} + trim('"' from split_part(full_table_name,'.',1)) as {{ part_name }} + {%- elif part_name == 'schema_name' -%} + trim('"' from substr(full_table_name, length(split_part(full_table_name,'.',1)) + 2, + length(full_table_name) - length(split_part(full_table_name,'.',1)) - length(split_part(full_table_name,'.',length(full_table_name) - length(replace(full_table_name,'.','')) + 1)) - 2)) as {{ part_name }} + {%- elif part_name == 'table_name' -%} + trim('"' from split_part(full_table_name,'.',length(full_table_name) - length(replace(full_table_name,'.','')) + 1)) as {{ part_name }} + {%- else -%} + {{ return('') }} + {%- endif -%} +{% endmacro %} + + {% macro relation_to_full_name(relation) %} {%- if relation.is_cte %} {# Ephemeral models don't have db and schema #} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 310a00378..0884b0fcb 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -111,3 +111,9 @@ {% do default_config.update({'query_max_size': 250000}) %} {{- return(default_config) -}} {%- endmacro -%} + +{%- macro dremio__get_default_config() -%} + {% set default_config = elementary.default__get_default_config() %} + {% do default_config.update({'dbt_artifacts_chunk_size': 100}) %} + {{- return(default_config) -}} +{%- endmacro -%} diff --git a/macros/edr/tests/on_run_end/union_columns_snapshot_query.sql b/macros/edr/tests/on_run_end/union_columns_snapshot_query.sql index 225e7dc2b..c364744d4 100644 --- a/macros/edr/tests/on_run_end/union_columns_snapshot_query.sql +++ b/macros/edr/tests/on_run_end/union_columns_snapshot_query.sql @@ -9,7 +9,7 @@ ), columns_snapshot_with_duplicates as ( select *, - row_number() over (partition by column_state_id order by detected_at desc) as row_number + row_number() over (partition by column_state_id order by detected_at desc) as row_num from union_temp_columns_snapshot ) select @@ -21,7 +21,7 @@ is_new, detected_at from columns_snapshot_with_duplicates - where row_number = 1 + where row_num = 1 {%- endset %} {{ return(union_temp_query) }} {%- endif %} diff --git a/macros/edr/tests/on_run_end/union_metrics_query.sql b/macros/edr/tests/on_run_end/union_metrics_query.sql index 3ea0f32ef..541adf88d 100644 --- a/macros/edr/tests/on_run_end/union_metrics_query.sql +++ b/macros/edr/tests/on_run_end/union_metrics_query.sql @@ -9,7 +9,7 @@ ), metrics_with_duplicates as ( select *, - row_number() over (partition by id order by updated_at desc) as row_number + row_number() over (partition by id order by updated_at desc) as row_num from union_temps_metrics ) select @@ -28,7 +28,7 @@ dimension_value, metric_properties from metrics_with_duplicates - where row_number = 1 + where row_num = 1 {%- endset %} {{ return(union_temp_query) }} {%- endif %} diff --git a/macros/edr/tests/test_utils/clean_elementary_test_tables.sql b/macros/edr/tests/test_utils/clean_elementary_test_tables.sql index 870d81fac..e734e049f 100644 --- a/macros/edr/tests/test_utils/clean_elementary_test_tables.sql +++ b/macros/edr/tests/test_utils/clean_elementary_test_tables.sql @@ -48,11 +48,11 @@ {% endmacro %} {% macro trino__get_clean_elementary_test_tables_queries(test_table_relations) %} - {% set queries = [] %} - {% for test_relation in test_table_relations %} - {% do queries.append("DROP TABLE IF EXISTS {}".format(test_relation)) %} - {% endfor %} - {% do return(queries) %} + {% do return(elementary.get_transactionless_clean_elementary_test_tables_queries(test_table_relations)) %} +{% endmacro %} + +{% macro dremio__get_clean_elementary_test_tables_queries(test_table_relations) %} + {% do return(elementary.get_transactionless_clean_elementary_test_tables_queries(test_table_relations)) %} {% endmacro %} {% macro get_transaction_clean_elementary_test_tables_queries(test_table_relations) %} diff --git a/macros/edr/tests/test_utils/get_anomaly_query.sql b/macros/edr/tests/test_utils/get_anomaly_query.sql index 73a53e934..3ab1f7e62 100644 --- a/macros/edr/tests/test_utils/get_anomaly_query.sql +++ b/macros/edr/tests/test_utils/get_anomaly_query.sql @@ -62,7 +62,7 @@ case when final_results as ( select - metric_value as value, + metric_value as {{ elementary.escape_reserved_keywords('value') }}, training_avg as average, {# when there is an anomaly we would want to use the last value of the metric (lag), otherwise visually the expectations would look out of bounds #} case diff --git a/macros/utils/cross_db_utils/current_timestamp.sql b/macros/utils/cross_db_utils/current_timestamp.sql index 316d98e0c..4e0b76b87 100644 --- a/macros/utils/cross_db_utils/current_timestamp.sql +++ b/macros/utils/cross_db_utils/current_timestamp.sql @@ -69,3 +69,12 @@ {% macro trino__edr_current_timestamp_in_utc() -%} cast(current_timestamp at time zone 'UTC' as timestamp(6)) {%- endmacro -%} + +{% macro dremio__edr_current_timestamp() -%} + CURRENT_TIMESTAMP() +{%- endmacro -%} + +{% macro dremio__edr_current_timestamp_in_utc() -%} + -- Dremio CURRENT_TIMESTAMP() is always in UTC + CURRENT_TIMESTAMP() +{%- endmacro -%} diff --git a/macros/utils/cross_db_utils/datediff.sql b/macros/utils/cross_db_utils/datediff.sql index 3091bffac..fc098f0a0 100644 --- a/macros/utils/cross_db_utils/datediff.sql +++ b/macros/utils/cross_db_utils/datediff.sql @@ -156,3 +156,42 @@ {% endif %} {{ return(macro(elementary.edr_cast_as_timestamp(first_date), elementary.edr_cast_as_timestamp(second_date), date_part)) }} {% endmacro %} + +{% macro dremio__edr_datediff(first_date, second_date, date_part) %} + {%- set seconds_diff_expr -%} + cast(unix_timestamp(substr(cast(({{ second_date }}) as varchar), 1, 19)) - + unix_timestamp(substr(cast(({{ first_date }}) as varchar), 1, 19)) as integer) + {%- endset -%} + + {%- set first_date_ts = elementary.edr_cast_as_timestamp(first_date) -%} + {%- set second_date_ts = elementary.edr_cast_as_timestamp(second_date) -%} + + {# This macro is copied from dbt-dremio, but we replaced entirely the usage of TIMESTAMPDIFF + as for some reason it must be used with "select" - which creates issues. + So we're using an alternative implementation in these cases using the seconds diff expression above. + + See original implementation here - https://github.com/dremio/dbt-dremio/blob/22588446edabae1670d929e27501ae3060fdd0bc/dbt/include/dremio/macros/utils/date_spine.sql#L53 + #} + + {% if date_part == 'year' %} + (EXTRACT(YEAR FROM {{second_date_ts}}) - EXTRACT(YEAR FROM {{first_date_ts}})) + {% elif date_part == 'quarter' %} + ((EXTRACT(YEAR FROM {{second_date_ts}}) - EXTRACT(YEAR FROM {{first_date_ts}})) * 4 + CEIL(EXTRACT(MONTH FROM {{second_date_ts}}) / 3.0) - CEIL(EXTRACT(MONTH FROM {{first_date_ts}}) / 3.0)) + {% elif date_part == 'month' %} + ((EXTRACT(YEAR FROM {{second_date_ts}}) - EXTRACT(YEAR FROM {{first_date_ts}})) * 12 + (EXTRACT(MONTH FROM {{second_date_ts}}) - EXTRACT(MONTH FROM {{first_date_ts}}))) + {% elif date_part == 'weekday' %} + CAST(CAST({{second_date_ts}} AS DATE) - CAST({{first_date_ts}} AS DATE) AS INTEGER) + {% elif date_part == 'week' %} + ({{ seconds_diff_expr }} / (60 * 60 * 24 * 7)) + {% elif date_part == 'day' %} + ({{ seconds_diff_expr }} / (60 * 60 * 24)) + {% elif date_part == 'hour' %} + ({{ seconds_diff_expr }} / (60 * 60)) + {% elif date_part == 'minute' %} + ({{ seconds_diff_expr }} / 60) + {% elif date_part == 'second' %} + {{ seconds_diff_expr }} + {% else %} + {% do exceptions.raise_compiler_error('Unsupported date part: ' ~ date_part) %} + {% endif %} +{% endmacro %} diff --git a/macros/utils/cross_db_utils/datetime_to_sql.sql b/macros/utils/cross_db_utils/datetime_to_sql.sql new file mode 100644 index 000000000..3acf15c3e --- /dev/null +++ b/macros/utils/cross_db_utils/datetime_to_sql.sql @@ -0,0 +1,19 @@ +{% macro edr_datetime_to_sql(dt) %} + {% do return(adapter.dispatch("edr_datetime_to_sql", "elementary")(dt)) %} +{% endmacro %} + +{% macro default__edr_datetime_to_sql(dt) %} + {% do return(elementary.edr_quote(dt)) %} +{% endmacro %} + +{% macro dremio__edr_datetime_to_sql(dt) %} + {% if dt is string %} + {% if 'T' in dt %} + {% set dt = modules.datetime.datetime.fromisoformat(dt) %} + {% else %} + {% do return(elementary.edr_quote(dt)) %} + {% endif %} + {% endif %} + + {% do return(elementary.edr_quote(dt.strftime(elementary.get_time_format()))) %} +{% endmacro %} diff --git a/macros/utils/cross_db_utils/day_of_week.sql b/macros/utils/cross_db_utils/day_of_week.sql index 5c0e3d88f..630123f73 100644 --- a/macros/utils/cross_db_utils/day_of_week.sql +++ b/macros/utils/cross_db_utils/day_of_week.sql @@ -40,3 +40,7 @@ {% macro trino__edr_day_of_week_expression(date_expr) %} date_format({{ date_expr }}, '%W') {% endmacro %} + +{% macro dremio__edr_day_of_week_expression(date_expr) %} + TO_CHAR({{ date_expr }}, 'DAY') +{% endmacro %} diff --git a/macros/utils/cross_db_utils/generate_elementary_profile_args.sql b/macros/utils/cross_db_utils/generate_elementary_profile_args.sql index c59cdf9d9..f9caea426 100644 --- a/macros/utils/cross_db_utils/generate_elementary_profile_args.sql +++ b/macros/utils/cross_db_utils/generate_elementary_profile_args.sql @@ -181,6 +181,17 @@ {% do return(parameters) %} {% endmacro %} +{% macro dremio__generate_elementary_profile_args(method, elementary_database, elementary_schema) %} + {% do return([ + _parameter("project_id", target.cloud_project_id), + _parameter("host", target.cloud_host), + _parameter("object_storage", elementary_database), + _parameter("object_storage_path", elementary_schema), + _parameter("user", target.user), + _parameter("token", ""), + _parameter("threads", target.threads), + ]) %} +{% endmacro %} {% macro default__generate_elementary_profile_args(method, elementary_database, elementary_schema) %} Adapter "{{ target.type }}" is not supported on Elementary. diff --git a/macros/utils/cross_db_utils/get_user_creation_query.sql b/macros/utils/cross_db_utils/get_user_creation_query.sql index f192deff2..a3b6d5751 100644 --- a/macros/utils/cross_db_utils/get_user_creation_query.sql +++ b/macros/utils/cross_db_utils/get_user_creation_query.sql @@ -106,6 +106,14 @@ grant create table on {{ parameters["schema"] }}.* to {{ parameters["user"] }} {% endmacro %} +{% macro dremio__get_user_creation_query(parameters) %} +CREATE USER "{{ parameters["user"] }}"; + +GRANT USAGE ON PROJECT TO USER "{{ parameters["user"] }}"; +GRANT SELECT ON ALL DATASETS IN FOLDER {% for part in (parameters["object_storage"] ~ "." ~ parameters["object_storage_path"]).split(".") %}"{{ part }}"{% if not loop.last %}.{% endif %}{% endfor %} TO USER "{{ parameters["user"] }}"; +{% endmacro %} + + {# Databricks, BigQuery, Spark #} {% macro default__get_user_creation_query(parameters) %} {% do exceptions.raise_compiler_error('User creation not supported through sql using ' ~ target.type) %} diff --git a/macros/utils/cross_db_utils/timeadd.sql b/macros/utils/cross_db_utils/timeadd.sql index ca3a9260f..0f2419e86 100644 --- a/macros/utils/cross_db_utils/timeadd.sql +++ b/macros/utils/cross_db_utils/timeadd.sql @@ -33,3 +33,7 @@ {% macro trino__edr_timeadd(date_part, number, timestamp_expression) %} date_add('{{ date_part }}', {{ elementary.edr_cast_as_int(number) }}, {{ elementary.edr_cast_as_timestamp(timestamp_expression) }}) {% endmacro %} + +{% macro dremio__edr_timeadd(date_part, number, timestamp_expression) %} + timestampadd({{date_part}}, {{ elementary.edr_cast_as_int(number) }}, {{ elementary.edr_cast_as_timestamp(timestamp_expression) }}) +{% endmacro %} diff --git a/macros/utils/data_types/cast_column.sql b/macros/utils/data_types/cast_column.sql index 2a93629f0..af8980576 100644 --- a/macros/utils/data_types/cast_column.sql +++ b/macros/utils/data_types/cast_column.sql @@ -29,6 +29,10 @@ ) {%- endmacro -%} +{%- macro dremio__edr_cast_as_timestamp(timestamp_field) -%} + cast({{ timestamp_field }} as {{ elementary.edr_type_timestamp() }}) +{%- endmacro -%} + {%- macro edr_cast_as_float(column) -%} cast({{ column }} as {{ elementary.edr_type_float() }}) {%- endmacro -%} @@ -85,6 +89,10 @@ ) {%- endmacro -%} +{%- macro dremio__edr_cast_as_date(timestamp_field) -%} + cast({{ timestamp_field }} as {{ elementary.edr_type_date() }}) +{%- endmacro -%} + {%- macro const_as_text(string) -%} {{ return(adapter.dispatch('const_as_text', 'elementary')(string)) }} diff --git a/macros/utils/data_types/data_type.sql b/macros/utils/data_types/data_type.sql index f44d7455d..b18e88c27 100644 --- a/macros/utils/data_types/data_type.sql +++ b/macros/utils/data_types/data_type.sql @@ -156,3 +156,7 @@ {% macro trino__edr_type_timestamp() %} timestamp(6) {% endmacro %} + +{% macro dremio__edr_type_timestamp() %} + timestamp +{% endmacro %} diff --git a/macros/utils/graph/get_package_database_and_schema.sql b/macros/utils/graph/get_package_database_and_schema.sql index 99d9d2796..280d9d4fb 100644 --- a/macros/utils/graph/get_package_database_and_schema.sql +++ b/macros/utils/graph/get_package_database_and_schema.sql @@ -24,4 +24,17 @@ {% endif %} {% endif %} {{ return([none, none]) }} -{% endmacro %} \ No newline at end of file +{% endmacro %} + +{% macro dremio__get_package_database_and_schema(package_name='elementary') %} + {% if execute %} + {% set node_in_package = graph.nodes.values() + | selectattr("resource_type", "==", "model") + | selectattr("package_name", "==", package_name) + | selectattr("config.materialized", "!=", "view") | first %} + {% if node_in_package %} + {{ return([node_in_package.database, node_in_package.schema]) }} + {% endif %} + {% endif %} + {{ return([none, none]) }} +{% endmacro %} diff --git a/macros/utils/sql_utils/escape_reserved_keywords.sql b/macros/utils/sql_utils/escape_reserved_keywords.sql new file mode 100644 index 000000000..7347c6790 --- /dev/null +++ b/macros/utils/sql_utils/escape_reserved_keywords.sql @@ -0,0 +1,33 @@ +{% macro escape_reserved_keywords(keyword) %} + {% if elementary.is_reserved_keywords(keyword) %} + {% do return(elementary.escape_keywords(keyword)) %} + {% endif %} + {% do return(keyword) %} +{% endmacro %} + +{% macro is_reserved_keywords(keyword) %} + {% do return(adapter.dispatch('is_reserved_keywords', 'elementary')(keyword)) %} +{% endmacro %} + +{% macro default__is_reserved_keywords(keyword) %} + {% do return(false) %} +{% endmacro %} + +{% macro dremio__is_reserved_keywords(keyword) %} + {% set cleaned_keyword = (keyword | trim | lower) %} + {% do return(cleaned_keyword in ['filter', 'sql', 'timestamp', 'value', 'one', 'min', 'max', 'sum', 'count']) %} +{% endmacro %} + +{% macro escape_keywords(keyword) %} + {% do return(adapter.dispatch('escape_keywords', 'elementary')(keyword)) %} +{% endmacro %} + +{% macro default__escape_keywords(keyword) %} + {% do return(keyword) %} +{% endmacro %} + +{% macro dremio__escape_keywords(keyword) %} + {% do return('"' ~ keyword ~ '"') %} +{% endmacro %} + + diff --git a/macros/utils/sql_utils/escape_select.sql b/macros/utils/sql_utils/escape_select.sql index 097b21585..f19ca0f47 100644 --- a/macros/utils/sql_utils/escape_select.sql +++ b/macros/utils/sql_utils/escape_select.sql @@ -9,3 +9,7 @@ {% macro redshift__escape_select(column_names) %} {% do return('\"' + column_names | join('\", \"') + '\"') %} {% endmacro %} + +{% macro dremio__escape_select(column_names) %} + {% do return('\"' + column_names | join('\", \"') + '\"') %} +{% endmacro %} \ No newline at end of file diff --git a/macros/utils/table_operations/delete_and_insert.sql b/macros/utils/table_operations/delete_and_insert.sql index 83cbc18de..996efbf28 100644 --- a/macros/utils/table_operations/delete_and_insert.sql +++ b/macros/utils/table_operations/delete_and_insert.sql @@ -132,6 +132,29 @@ {% do return(queries) %} {% endmacro %} +{% macro dremio__get_delete_and_insert_queries(relation, insert_relation, delete_relation, delete_column_key) %} + {% set queries = [] %} + + {% if delete_relation %} + {% set delete_query %} + delete from {{ relation }} + where + {{ delete_column_key }} is null + or {{ delete_column_key }} in (select {{ delete_column_key }} from {{ delete_relation }}); + {% endset %} + {% do queries.append(delete_query) %} + {% endif %} + + {% if insert_relation %} + {% set insert_query %} + insert into {{ relation }} select * from {{ insert_relation }}; + {% endset %} + {% do queries.append(insert_query) %} + {% endif %} + + {% do return(queries) %} +{% endmacro %} + {% macro trino__get_delete_and_insert_queries(relation, insert_relation, delete_relation, delete_column_key) %} {% set queries = [] %} diff --git a/macros/utils/table_operations/get_relation_max_length.sql b/macros/utils/table_operations/get_relation_max_length.sql index c9282ad63..3e08320a1 100644 --- a/macros/utils/table_operations/get_relation_max_length.sql +++ b/macros/utils/table_operations/get_relation_max_length.sql @@ -34,3 +34,7 @@ {% macro clickhouse__get_relation_max_name_length(temporary, relation, sql_query) %} {{ return(128) }} {% endmacro %} + +{% macro dremio__get_relation_max_name_length(temporary, relation, sql_query) %} + {{ return(128) }} +{% endmacro %} \ No newline at end of file diff --git a/macros/utils/table_operations/has_temp_table_support.sql b/macros/utils/table_operations/has_temp_table_support.sql index 3068e3959..bcd1d2de4 100644 --- a/macros/utils/table_operations/has_temp_table_support.sql +++ b/macros/utils/table_operations/has_temp_table_support.sql @@ -18,6 +18,10 @@ {% do return(false) %} {% endmacro %} +{% macro dremio__has_temp_table_support() %} + {% do return(false) %} +{% endmacro %} + {% macro clickhouse__has_temp_table_support() %} {% do return(false) %} {% endmacro %} diff --git a/macros/utils/table_operations/insert_rows.sql b/macros/utils/table_operations/insert_rows.sql index 19826d762..a21961a25 100644 --- a/macros/utils/table_operations/insert_rows.sql +++ b/macros/utils/table_operations/insert_rows.sql @@ -62,7 +62,7 @@ {% set base_insert_query %} insert into {{ table_relation }} ({%- for column in columns -%} - {{- column.name -}} {{- "," if not loop.last else "" -}} + {{- elementary.escape_reserved_keywords(column.name) -}} {{- "," if not loop.last else "" -}} {%- endfor -%}) values {% endset %} {% do elementary.end_duration_measure_context('base_query_calc') %} @@ -153,6 +153,10 @@ {{- return(string_value | replace("'", "''")) -}} {%- endmacro -%} +{%- macro dremio__escape_special_chars(string_value) -%} + {{- return(string_value | replace("\'", "''")) -}} +{%- endmacro -%} + {%- macro trino__escape_special_chars(string_value) -%} {{- return(string_value | replace("'", "''")) -}} {%- endmacro -%} @@ -162,7 +166,7 @@ {%- if value is number -%} {{- value -}} {%- elif value is string and data_type == 'timestamp' -%} - {{- elementary.edr_cast_as_timestamp(elementary.edr_quote(value)) -}} + {{- elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(value)) -}} {%- elif value is string -%} '{{- elementary.escape_special_chars(value) -}}' {%- elif value is mapping or value is sequence -%} diff --git a/macros/utils/table_operations/make_temp_relation.sql b/macros/utils/table_operations/make_temp_relation.sql index 9ee9a4627..5b652f9f8 100644 --- a/macros/utils/table_operations/make_temp_relation.sql +++ b/macros/utils/table_operations/make_temp_relation.sql @@ -22,6 +22,11 @@ {% do return(tmp_relation) %} {% endmacro %} +{% macro dremio__edr_make_temp_relation(base_relation, suffix) %} + {% set base_relation_with_type = base_relation.incorporate(type='table') %} + {% do return(dbt.make_temp_relation(base_relation_with_type, suffix)) %} +{% endmacro %} + --- VIEWS {% macro make_temp_view_relation(base_relation, suffix=none) %} {% if not suffix %} diff --git a/models/edr/alerts/alerts_dbt_source_freshness.sql b/models/edr/alerts/alerts_dbt_source_freshness.sql index afbe27dc2..bc891f49d 100644 --- a/models/edr/alerts/alerts_dbt_source_freshness.sql +++ b/models/edr/alerts/alerts_dbt_source_freshness.sql @@ -23,7 +23,7 @@ select results.error, results.warn_after, results.error_after, - results.filter, + results.{{ elementary.escape_reserved_keywords('filter') }}, sources.unique_id, sources.database_name, sources.schema_name, @@ -40,7 +40,7 @@ select -- https://docs.databricks.com/en/delta/update-schema.html#automatic-schema-evolution-for-delta-lake-merge results.error_after as freshness_error_after, results.warn_after as freshness_warn_after, - results.filter as freshness_filter + results.{{ elementary.escape_reserved_keywords('filter') }} as freshness_filter from results join sources on results.unique_id = sources.unique_id where {{ not elementary.get_config_var('disable_source_freshness_alerts') }} and lower(status) != 'pass' diff --git a/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql b/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql index 10e262548..f20fb48cc 100644 --- a/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql +++ b/models/edr/data_monitoring/anomaly_detection/metrics_anomaly_score.sql @@ -46,7 +46,8 @@ metrics_anomaly_score as ( metric_name, case when training_stddev is null then null - when training_stddev = 0 then 0 + when training_set_size = 1 then null -- Single value case - no historical context for anomaly detection + when training_stddev = 0 then 0 -- Stationary data case - valid, all values are identical else (metric_value - training_avg) / (training_stddev) end as anomaly_score, metric_value as latest_metric_value, diff --git a/models/edr/run_results/snapshot_run_results.sql b/models/edr/run_results/snapshot_run_results.sql index e62e27e77..8c63b49f0 100644 --- a/models/edr/run_results/snapshot_run_results.sql +++ b/models/edr/run_results/snapshot_run_results.sql @@ -32,14 +32,14 @@ SELECT run_results.adapter_response, run_results.thread_id, run_results.group_name, - snapshots.database_name, - snapshots.schema_name, - coalesce(run_results.materialization, snapshots.materialization) as materialization, - snapshots.tags, - snapshots.package_name, - snapshots.path, - snapshots.original_path, - snapshots.owner, - snapshots.alias + model_snapshots.database_name, + model_snapshots.schema_name, + coalesce(run_results.materialization, model_snapshots.materialization) as materialization, + model_snapshots.tags, + model_snapshots.package_name, + model_snapshots.path, + model_snapshots.original_path, + model_snapshots.owner, + model_snapshots.alias FROM dbt_run_results run_results -JOIN dbt_snapshots snapshots ON run_results.unique_id = snapshots.unique_id +JOIN dbt_snapshots model_snapshots ON run_results.unique_id = model_snapshots.unique_id