Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6931918
handling column names that are reserved keywords in dremio
GuyEshdat Aug 4, 2025
ecfa917
added macros for dremio support
GuyEshdat Aug 4, 2025
fd066cc
renaming because snapshots is a reserved keyword in dremio
GuyEshdat Aug 4, 2025
a8a4374
making sure that dremio temp table relation has type=table because ot…
GuyEshdat Aug 4, 2025
c002b47
Ele 4877 dremio package elementary tests (#836)
haritamar Aug 4, 2025
90fe8db
another dremio reserved keyword
GuyEshdat Aug 5, 2025
6d4eda7
another dremio reserved keyword
GuyEshdat Aug 5, 2025
95948c3
removed the select part of the datadiff functio, it cause errors
GuyEshdat Aug 5, 2025
0412bae
fixed comment
GuyEshdat Aug 5, 2025
3e251cc
macro for generating schema names of dremio in tests, needed because …
GuyEshdat Aug 5, 2025
11976a2
first attemp of adding dremio to e2e tests
GuyEshdat Aug 5, 2025
9a63dba
change schema sepearator in dremio integration tests
GuyEshdat Aug 5, 2025
1b63275
temporary limiting tests to one test only, for faster debugging
GuyEshdat Aug 5, 2025
a9017b4
not clearing the env if the target is dremio
GuyEshdat Aug 5, 2025
c56ed1e
not clearing the env if the target is dremio
GuyEshdat Aug 5, 2025
da8e5f2
removed the specific test filter in the test-warehousre workflow
GuyEshdat Aug 5, 2025
b523e08
more dremio reserved keywords
GuyEshdat Aug 6, 2025
79d8ae1
setting dremio max relation name
GuyEshdat Aug 6, 2025
c4e6494
fixing dremio sources in e2e tests
GuyEshdat Aug 6, 2025
9d81741
changed column name in test because its previous name was a reserved …
GuyEshdat Aug 6, 2025
489a724
esacping reserved keywords in column names when inserting rows
GuyEshdat Aug 6, 2025
d346398
changed column name in tests because value is a reserved keyword in d…
GuyEshdat Aug 6, 2025
bce218f
handling db and schema of dremio in e2e tests
GuyEshdat Aug 6, 2025
83489c2
replaced datediff dremio implementation so we won't use timestampdiff
haritamar Aug 6, 2025
4f4603e
use edr_datetime_to_sql in a couple other places
haritamar Aug 6, 2025
b6fb5fc
pre-commit fixes
haritamar Aug 6, 2025
7d6bdbf
more escaping
haritamar Aug 6, 2025
cec4698
various fixes
haritamar Aug 6, 2025
c5c689c
bugfix for anomaly scores logic due to usage of stddev_pop
haritamar Aug 8, 2025
65c39fa
bugfix to test_dimension_anomalies_with_timestamp_exclude_final_results
haritamar Aug 8, 2025
73c88ed
bugfix to event freshness test
haritamar Aug 8, 2025
e616d80
fix test_missing_count
haritamar Aug 8, 2025
cf41fca
schema tests now work in Dremio too
haritamar Aug 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ on:
- athena
- trino
- clickhouse
- dremio
elementary-ref:
type: string
required: false
Expand Down Expand Up @@ -88,6 +89,11 @@ jobs:
working-directory: ${{ env.TESTS_DIR }}
run: docker compose up -d clickhouse

- name: Start Dremio
if: inputs.warehouse-type == 'dremio'
working-directory: ${{ env.TESTS_DIR }}
run: docker compose -f docker-compose-dremio.yml up -d

- name: Setup Python
uses: actions/setup-python@v4
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
target/
dbt_packages/
dbt_internal_packages/
logs/
scripts/

Expand Down
36 changes: 31 additions & 5 deletions integration_tests/dbt_project/macros/generate_schema_name.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set schema_name = target.schema -%}
{% if custom_schema_name %}
{% set schema_name = "{}_{}".format(schema_name, custom_schema_name) %}
{% endif %}

{% set schema_name = get_default_schema_name(custom_schema_name, node) %}
{% set schema_name_suffix_by_var = var('schema_name_suffix', '') %}
{% if schema_name_suffix_by_var %}
{% set schema_name = schema_name + schema_name_suffix_by_var %}
{% endif %}

{% do return(schema_name) %}
{%- endmacro %}

{% macro get_default_schema_name(custom_schema_name, node) -%}
{% do return(adapter.dispatch('get_default_schema_name', 'elementary_tests')(custom_schema_name, node)) %}
{% endmacro %}

{% macro default__get_default_schema_name(custom_schema_name, node) -%}
{%- set schema_name = target.schema -%}
{% if custom_schema_name %}
{% set schema_name = "{}_{}".format(schema_name, custom_schema_name) %}
{% endif %}
{% do return(schema_name) %}
{%- endmacro %}

{% macro dremio__get_default_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema if not is_datalake_node(node)
else target.root_path -%}
{%- if custom_schema_name is none -%}

{% do return(default_schema) %}

{%- elif default_schema == 'no_schema' -%}

{% do return(custom_schema_name) %}

{%- else -%}

{% do return("{}_{}".format(default_schema, custom_schema_name)) %}

{%- endif -%}
{%- endmacro %}
2 changes: 1 addition & 1 deletion integration_tests/dbt_project/models/one.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
)
}}

SELECT 1 AS one
SELECT 1 AS {{ elementary.escape_reserved_keywords('one') }}
85 changes: 85 additions & 0 deletions integration_tests/docker-compose-dremio.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
services:
# Nessie Catalog Server Using In-Memory Store
nessie:
image: projectnessie/nessie:latest
container_name: catalog
networks:
- dremio-lakehouse
ports:
- 19120:19120

# Minio Storage Server
minio:
image: minio/minio:latest
container_name: storage
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=storage
- MINIO_REGION_NAME=us-east-1
- MINIO_REGION=us-east-1
networks:
- dremio-lakehouse
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
volumes:
- minio_data:/data

minio-setup:
image: minio/mc
container_name: minio-setup
depends_on:
- minio
entrypoint: >
/bin/sh -c "
until (echo > /dev/tcp/minio/9000) >/dev/null 2>&1; do
echo 'Waiting for MinIO...';
sleep 2;
done;
mc alias set myminio http://minio:9000 admin password;
mc mb myminio/datalake;
mc ls myminio;
"
networks:
- dremio-lakehouse

# Dremio
dremio:
image: dremio/dremio-oss:latest
platform: linux/amd64
ports:
- 9047:9047
- 31010:31010
- 32010:32010
- 45678:45678
container_name: dremio
environment:
- DREMIO_JAVA_SERVER_EXTRA_OPTS=-Dpaths.dist=file:///opt/dremio/data/dist -Ddebug.addDefaultUser=true
- SERVICES_COORDINATOR_ENABLED=true
- SERVICES_EXECUTOR_ENABLED=true
networks:
- dremio-lakehouse
volumes:
- dremio_data:/opt/dremio/data:rw
# Workaround for permission issues in podman
user: "0"

dremio-setup:
image: alpine:latest
container_name: dremio-setup
depends_on:
- dremio
volumes:
- ./docker/dremio/dremio-setup.sh:/dremio-setup.sh
command: sh /dremio-setup.sh
networks:
- dremio-lakehouse

networks:
dremio-lakehouse:

volumes:
dremio_data:
minio_data:
33 changes: 33 additions & 0 deletions integration_tests/docker/dremio/dremio-setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/sh

# Install required tools
apk add --no-cache curl jq

# Wait for Dremio to be ready
until curl -s http://dremio:9047; do
echo "Waiting for Dremio..."
sleep 5
done

echo "Dremio is up. Proceeding with configuration..."

# Log in to Dremio to get the auth token
AUTH_TOKEN=$(curl -s -X POST "http://dremio:9047/apiv2/login" \
-H "Content-Type: application/json" \
--data "{\"userName\":\"dremio\", \"password\":\"dremio123\"}" | jq -r .token)

# Check if AUTH_TOKEN is not empty
if [ -z "$AUTH_TOKEN" ]; then
echo "Failed to obtain Dremio auth token"
exit 1
fi

echo "Obtained Dremio auth token"

# Create the S3 source in Dremio
curl -s -X PUT "http://dremio:9047/apiv2/source/S3Source" \
-H "Content-Type: application/json" \
-H "Authorization: _dremio$AUTH_TOKEN" \
--data "{\"name\":\"S3Source\",\"config\":{\"credentialType\":\"ACCESS_KEY\",\"accessKey\":\"admin\",\"accessSecret\":\"password\",\"secure\":false,\"externalBucketList\":[],\"enableAsync\":true,\"enableFileStatusCheck\":true,\"rootPath\":\"/\",\"defaultCtasFormat\":\"ICEBERG\",\"propertyList\":[{\"name\":\"fs.s3a.path.style.access\",\"value\":\"true\"},{\"name\":\"fs.s3a.endpoint\",\"value\":\"minio:9000\"},{\"name\":\"dremio.s3.compat\",\"value\":\"true\"}],\"whitelistedBuckets\":[],\"isCachingEnabled\":false,\"maxCacheSpacePct\":100},\"type\":\"S3\",\"metadataPolicy\":{\"deleteUnavailableDatasets\":true,\"autoPromoteDatasets\":false,\"namesRefreshMillis\":3600000,\"datasetDefinitionRefreshAfterMillis\":3600000,\"datasetDefinitionExpireAfterMillis\":10800000,\"authTTLMillis\":86400000,\"updateMode\":\"PREFETCH_QUERIED\"}}"

echo "S3 Source created in Dremio"
8 changes: 6 additions & 2 deletions integration_tests/tests/data_seeder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
from contextlib import contextmanager
from pathlib import Path
from typing import List
from typing import Generator, List

from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from logger import get_logger
Expand All @@ -18,7 +19,8 @@ def __init__(
self.dbt_project_path = dbt_project_path
self.seeds_dir_path = seeds_dir_path

def seed(self, data: List[dict], table_name: str):
@contextmanager
def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]:
seed_path = self.seeds_dir_path.joinpath(f"{table_name}.csv")
try:
with seed_path.open("w") as seed_file:
Expand All @@ -28,5 +30,7 @@ def seed(self, data: List[dict], table_name: str):
writer.writerows(data)
seed_file.flush()
self.dbt_runner.seed(select=str(relative_seed_path), full_refresh=True)

yield
finally:
seed_path.unlink()
25 changes: 20 additions & 5 deletions integration_tests/tests/dbt_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from contextlib import contextmanager, nullcontext
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, List, Literal, Optional, Union, overload
from typing import Any, Dict, Generator, List, Literal, Optional, Union, overload
from uuid import uuid4

from data_seeder import DbtDataSeeder
from dbt_utils import get_database_and_schema_properties
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.clients.dbt.factory import create_dbt_runner
from logger import get_logger
Expand Down Expand Up @@ -42,7 +43,7 @@ def get_dbt_runner(target: str, project_dir: str) -> BaseDbtRunner:
class DbtProject:
def __init__(self, target: str, project_dir: str):
self.dbt_runner = get_dbt_runner(target, project_dir)

self.target = target
self.project_dir_path = Path(project_dir)
self.models_dir_path = self.project_dir_path / "models"
self.tmp_models_dir_path = self.models_dir_path / "tmp"
Expand Down Expand Up @@ -187,12 +188,16 @@ def test(
test_id, materialization
)
else:
database_property, schema_property = get_database_and_schema_properties(
self.target
)
props_yaml = {
"version": 2,
"sources": [
{
"name": "test_data",
"schema": f"{{{{ target.schema }}}}{SCHEMA_NAME_SUFFIX}",
"schema": f"{{{{ target.{schema_property} }}}}{SCHEMA_NAME_SUFFIX}",
"database": f"{{{{ target.{database_property} }}}}",
"tables": [table_yaml],
}
],
Expand Down Expand Up @@ -230,9 +235,19 @@ def test(
return [test_result] if multiple_results else test_result

def seed(self, data: List[dict], table_name: str):
return DbtDataSeeder(
with DbtDataSeeder(
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
).seed(data, table_name):
return

@contextmanager
def seed_context(
self, data: List[dict], table_name: str
) -> Generator[None, None, None]:
with DbtDataSeeder(
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
).seed(data, table_name)
).seed(data, table_name):
yield

@contextmanager
def create_temp_model_for_existing_table(
Expand Down
6 changes: 6 additions & 0 deletions integration_tests/tests/dbt_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def get_database_and_schema_properties(target: str, is_view: bool = False):
if target == "dremio" and not is_view:
return "datalake", "root_path"
elif target == "clickhouse":
return "schema", "schema"
return "database", "schema"
5 changes: 4 additions & 1 deletion integration_tests/tests/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

class Environment:
def __init__(self, target: str, project_dir: str):
self.target = target
self.dbt_runner = dbt_project.get_dbt_runner(target, project_dir)

def clear(self):
self.dbt_runner.run_operation("elementary_tests.clear_env")
# drop schema in dremio doesnt work, but we run the dremio tests with docker so its not really important to drop the schema
if self.target != "dremio":
self.dbt_runner.run_operation("elementary_tests.clear_env")

def init(self):
self.dbt_runner.run(selector="init")
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/tests/test_anomalies_backfill_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
bucket_start,
bucket_end,
metric_value,
row_number() over (partition by id order by updated_at desc) as row_number
row_number() over (partition by id order by updated_at desc) as row_num
from {{{{ ref("data_monitoring_metrics") }}}}
where metric_name = 'row_count' and lower(full_table_name) like '%{test_id}'
)
select bucket_start, bucket_end, metric_value from metrics_ordered
where row_number = 1
where row_num = 1
"""

# This returns data points used in the latest anomaly test
Expand Down
15 changes: 12 additions & 3 deletions integration_tests/tests/test_dbt_artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from dbt_flags import set_flags
from dbt_project import DbtProject
from dbt_utils import get_database_and_schema_properties

TEST_MODEL = "one"

Expand Down Expand Up @@ -95,17 +96,25 @@ def test_metrics_anomaly_score(dbt_project: DbtProject):

@pytest.mark.requires_dbt_version("1.8.0")
def test_source_freshness_results(test_id: str, dbt_project: DbtProject):
database_property, schema_property = get_database_and_schema_properties(
dbt_project.target
)
loaded_at_field = (
'"UPDATE_TIME"::timestamp'
if dbt_project.target != "dremio"
else "TO_TIMESTAMP(SUBSTRING(UPDATE_TIME, 0, 23), 'YYYY-MM-DD HH24:MI:SS.FFF')"
)
source_config = {
"version": 2,
"sources": [
{
"name": "test_source",
"database": "{{target.database if target.type != 'clickhouse' else target.schema}}",
"schema": "{{target.schema}}",
"database": f"{{{{ target.{database_property} }}}}",
"schema": f"{{{{ target.{schema_property} }}}}",
"tables": [
{
"name": test_id,
"loaded_at_field": '"UPDATE_TIME"::timestamp',
"loaded_at_field": loaded_at_field,
"freshness": {
"warn_after": {
"count": 1,
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/tests/test_dimension_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def test_dimension_anomalies_with_timestamp_exclude_final_results(
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"dimensions": ["superhero"],
"exclude_final_results": "value > 15",
"exclude_final_results": '{{ elementary.escape_reserved_keywords("value") }} > 15',
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "fail"
Expand All @@ -200,7 +200,7 @@ def test_dimension_anomalies_with_timestamp_exclude_final_results(
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"dimensions": ["superhero"],
"exclude_final_results": "average > 3",
"exclude_final_results": '{{ elementary.escape_reserved_keywords("average") }} > 3',
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "fail"
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/tests/test_disable_elementary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt_project import DbtProject

COLUMN_NAME = "value"
COLUMN_NAME = "some_column"


def test_running_dbt_tests_without_elementary(test_id: str, dbt_project: DbtProject):
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/tests/test_failed_row_count.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from dbt_project import DbtProject

COLUMN_NAME = "value"
COLUMN_NAME = "some_column"


# Failed row count currently not supported on ClickHouse
Expand Down
Loading