diff --git a/.github/workflows/test-all-warehouses.yml b/.github/workflows/test-all-warehouses.yml index a45f1d50d..6ab47c726 100644 --- a/.github/workflows/test-all-warehouses.yml +++ b/.github/workflows/test-all-warehouses.yml @@ -37,7 +37,6 @@ jobs: matrix: dbt-version: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || - ! contains(github.event_name, 'pull_request') && fromJSON('["1.8.0", "latest_official"]') || fromJSON('["latest_official"]') }} warehouse-type: [ @@ -57,6 +56,14 @@ jobs: warehouse-type: postgres - dbt-version: "${{ inputs.dbt-version || 'latest_pre' }}" warehouse-type: postgres + - dbt-version: "${{ inputs.dbt-version || 'fusion' }}" + warehouse-type: snowflake + - dbt-version: "${{ inputs.dbt-version || 'fusion' }}" + warehouse-type: bigquery + - dbt-version: "${{ inputs.dbt-version || 'fusion' }}" + warehouse-type: redshift + - dbt-version: "${{ inputs.dbt-version || 'fusion' }}" + warehouse-type: databricks_catalog uses: ./.github/workflows/test-warehouse.yml with: warehouse-type: ${{ matrix.warehouse-type }} diff --git a/.github/workflows/test-warehouse.yml b/.github/workflows/test-warehouse.yml index cbf3cd10b..65bed62cd 100644 --- a/.github/workflows/test-warehouse.yml +++ b/.github/workflows/test-warehouse.yml @@ -124,37 +124,44 @@ jobs: run: pip install databricks-sql-connector==2.9.3 - name: Install dbt + if: ${{ inputs.dbt-version != 'fusion' }} run: pip install${{ (inputs.dbt-version == 'latest_pre' && ' --pre') || '' }} "dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}" "dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || (inputs.warehouse-type == 'athena' && 'athena-community') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('~={0}', inputs.dbt-version)) || '' }}" + - name: Install dbt-fusion + if: inputs.dbt-version == 'fusion' + run: | + curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update + - name: Install Elementary run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]" - - name: Install dependencies - working-directory: ${{ env.TESTS_DIR }} - run: | - dbt deps --project-dir dbt_project - pip install -r requirements.txt - - name: Write dbt profiles env: - PROFILES_YML: ${{ secrets.CI_PROFILES_YML }} + PROFILES_YML: ${{ (inputs.dbt-version == 'fusion' && secrets.CI_PROFILES_YML_FUSION) || secrets.CI_PROFILES_YML }} run: | mkdir -p ~/.dbt - DBT_VERSION=$(pip show dbt-core | grep -i version | awk '{print $2}' | sed 's/\.//g') + DBT_VERSION=$(echo "${{ inputs.dbt-version }}" | sed 's/\.//g') UNDERSCORED_REF_NAME=$(echo "${{ inputs.warehouse-type }}_dbt_${DBT_VERSION}_${BRANCH_NAME}" | awk '{print tolower($0)}' | head -c 40 | sed "s/[-\/]/_/g") echo "$PROFILES_YML" | base64 -d | sed "s//dbt_pkg_$UNDERSCORED_REF_NAME/g" > ~/.dbt/profiles.yml + - name: Install dependencies + working-directory: ${{ env.TESTS_DIR }} + run: | + ${{ (inputs.dbt-version == 'fusion' && '~/.local/bin/dbt') || 'dbt' }} deps --project-dir dbt_project + ln -sfn ${{ github.workspace }}/dbt-data-reliability dbt_project/dbt_packages/elementary + pip install -r requirements.txt + - name: Check DWH connection working-directory: ${{ env.TESTS_DIR }} run: | - dbt debug -t "${{ inputs.warehouse-type }}" + ${{ (inputs.dbt-version == 'fusion' && '~/.local/bin/dbt') || 'dbt' }} debug -t "${{ inputs.warehouse-type }}" - name: Test working-directory: "${{ env.TESTS_DIR }}/tests" - run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}.html --self-contained-html --clear-on-end + run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}.html --self-contained-html --clear-on-end ${{ (inputs.dbt-version == 'fusion' && '--runner-method fusion') || '' }} - name: Upload test results if: always() diff --git a/.gitignore b/.gitignore index 15e8d8196..5172d726c 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ __pycache__/ # vscode .vscode/ +dbt_internal_packages/ + +/package-lock.yml diff --git a/integration_tests/dbt_project/.gitignore b/integration_tests/dbt_project/.gitignore index 3fa344fd4..e6328d622 100644 --- a/integration_tests/dbt_project/.gitignore +++ b/integration_tests/dbt_project/.gitignore @@ -1,2 +1,3 @@ data models/tmp +dbt_internal_packages/ diff --git a/integration_tests/dbt_project/dbt_project.yml b/integration_tests/dbt_project/dbt_project.yml index 34cfa5204..948f4fcc8 100644 --- a/integration_tests/dbt_project/dbt_project.yml +++ b/integration_tests/dbt_project/dbt_project.yml @@ -18,7 +18,6 @@ clean-targets: # directories to be removed by `dbt clean` vars: debug_logs: "{{ env_var('DBT_EDR_DEBUG', False) }}" - mute_ensure_materialization_override: true models: elementary_tests: diff --git a/integration_tests/dbt_project/macros/create_all_types_table.sql b/integration_tests/dbt_project/macros/create_all_types_table.sql index 89d541887..b04953686 100644 --- a/integration_tests/dbt_project/macros/create_all_types_table.sql +++ b/integration_tests/dbt_project/macros/create_all_types_table.sql @@ -31,9 +31,7 @@ CURRENT_TIME() as time_col, CURRENT_TIMESTAMP() as timestamp_col, {% endset %} - {% set create_table_query = dbt.create_table_as(false, relation, sql_query) %} - {% do elementary.edr_log(create_table_query) %} - {% do elementary.run_query(create_table_query) %} + {% do elementary.edr_create_table_as(false, relation, sql_query) %} {% endmacro %} {% macro snowflake__create_all_types_table() %} @@ -81,9 +79,7 @@ [1,2,3] as array_col, TO_GEOGRAPHY('POINT(-122.35 37.55)') as geography_col {% endset %} - {% set create_table_query = dbt.create_table_as(false, relation, sql_query) %} - {% do elementary.edr_log(create_table_query) %} - {% do elementary.run_query(create_table_query) %} + {% do elementary.edr_create_table_as(false, relation, sql_query) %} {% endmacro %} {% macro redshift__create_all_types_table() %} @@ -123,9 +119,7 @@ ST_GeogFromText('SRID=4324;POLYGON((0 0,0 1,1 1,10 10,1 0,0 0))') as geography_col, JSON_PARSE('{"data_type": "super"}') as super_col {% endset %} - {% set create_table_query = dbt.create_table_as(false, relation, sql_query) %} - {% do elementary.edr_log(create_table_query) %} - {% do elementary.run_query(create_table_query) %} + {% do elementary.edr_create_table_as(false, relation, sql_query) %} {% endmacro %} @@ -184,9 +178,7 @@ 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid as uuid_col, xmlcomment('text') as xml_col {% endset %} - {% set create_table_query = dbt.create_table_as(false, relation, sql_query) %} - {% do elementary.edr_log(create_table_query) %} - {% do elementary.run_query(create_table_query) %} + {% do elementary.edr_create_table_as(false, relation, sql_query) %} {% endmacro %} {% macro default__create_all_types_table() %} diff --git a/integration_tests/dbt_project/macros/materializations.sql b/integration_tests/dbt_project/macros/materializations.sql index 51ae1c98e..cb1c5a450 100644 --- a/integration_tests/dbt_project/macros/materializations.sql +++ b/integration_tests/dbt_project/macros/materializations.sql @@ -1,19 +1,19 @@ {% materialization test, default %} {% if var('enable_elementary_test_materialization', false) %} - {% do return(elementary.materialization_test_default.call_macro()) %} + {% do return(elementary.materialization_test_default()) %} {% else %} - {% do return(dbt.materialization_test_default.call_macro()) %} + {% do return(dbt.materialization_test_default()) %} {% endif %} {% endmaterialization %} {% materialization test, adapter="snowflake" %} {% if var('enable_elementary_test_materialization', false) %} - {% do return(elementary.materialization_test_snowflake.call_macro()) %} + {% do return(elementary.materialization_test_snowflake()) %} {% else %} {% if dbt.materialization_test_snowflake %} - {% do return(dbt.materialization_test_snowflake.call_macro()) %} + {% do return(dbt.materialization_test_snowflake()) %} {% else %} - {% do return(dbt.materialization_test_default.call_macro()) %} + {% do return(dbt.materialization_test_default()) %} {% endif %} {% endif %} {% endmaterialization %} diff --git a/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql b/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql new file mode 100644 index 000000000..c7c0f20cc --- /dev/null +++ b/integration_tests/dbt_project/macros/replace_empty_strings_with_nulls.sql @@ -0,0 +1,18 @@ +{% macro replace_empty_strings_with_nulls(table_name) %} + {% set relation = ref(table_name) %} + {% set columns = adapter.get_columns_in_relation(relation) %} + + {% for col in columns %} + {% set data_type = elementary.get_column_data_type(col) %} + {% set normalized_data_type = elementary.normalize_data_type(data_type) %} + + {% if normalized_data_type == "string" %} + {% set update_query %} + update {{ relation }} + set {{ col["name"] }} = NULL + where {{ col["name"] }} = '' + {% endset %} + {% do elementary.run_query(update_query) %} + {% endif %} + {% endfor %} +{% endmacro %} diff --git a/integration_tests/dbt_project/models/exposures.yml b/integration_tests/dbt_project/models/exposures.yml index 11afb2474..64a9a250c 100644 --- a/integration_tests/dbt_project/models/exposures.yml +++ b/integration_tests/dbt_project/models/exposures.yml @@ -15,11 +15,12 @@ exposures: owner: name: Callum McData email: data@jaffleshop.com - meta: - referenced_columns: - - column_name: id - data_type: numeric - node: ref('customers') + config: + meta: + referenced_columns: + - column_name: id + data_type: numeric + node: ref('customers') - name: orders label: Returned Orders @@ -35,8 +36,9 @@ exposures: owner: name: Callum McData email: data@jaffleshop.com - meta: - referenced_columns: - - column_name: "order_id" - data_type: "string" - - column_name: "ZOMG" + config: + meta: + referenced_columns: + - column_name: "order_id" + data_type: "string" + - column_name: "ZOMG" diff --git a/integration_tests/dbt_project/models/schema.yml b/integration_tests/dbt_project/models/schema.yml index e8a4aa7ca..87d61879e 100644 --- a/integration_tests/dbt_project/models/schema.yml +++ b/integration_tests/dbt_project/models/schema.yml @@ -5,7 +5,8 @@ models: description: This table has basic information about a customer, as well as some derived facts based on a customer's orders tests: - elementary.exposure_schema_validity: - tags: [exposure_customers] + config: + tags: [exposure_customers] columns: - name: id @@ -20,7 +21,8 @@ models: tests: - elementary.exposure_schema_validity: - tags: [exposure_orders] + config: + tags: [exposure_orders] columns: - name: order_id diff --git a/integration_tests/dbt_project/packages.yml b/integration_tests/dbt_project/packages.yml index b74c0ef5a..4d192b784 100644 --- a/integration_tests/dbt_project/packages.yml +++ b/integration_tests/dbt_project/packages.yml @@ -1,10 +1,4 @@ packages: - local: ../../ - package: dbt-labs/dbt_utils - version: | - {%- set minor_to_utils_range_map = { - "0": [">=0.8.0", "<0.9.0"], - "1": [">=0.8.0", "<0.9.0"], - "2": [">=0.8.0", "<1.0.0"], - } -%} - {{- minor_to_utils_range_map.get(dbt_version.split('.')[1], [">=0.8.0", "<2.0.0"]) -}} + version: [">=0.8.0", "<2.0.0"] diff --git a/integration_tests/deprecated_tests/dbt_project.yml b/integration_tests/deprecated_tests/dbt_project.yml index f87f86098..4c677dcfb 100644 --- a/integration_tests/deprecated_tests/dbt_project.yml +++ b/integration_tests/deprecated_tests/dbt_project.yml @@ -20,7 +20,6 @@ vars: days_back: 30 debug_logs: "{{ env_var('DBT_EDR_DEBUG', False) }}" custom_run_started_at: "{{ modules.datetime.datetime.utcfromtimestamp(0) }}" - mute_ensure_materialization_override: true seeds: +schema: test_seeds diff --git a/integration_tests/deprecated_tests/macros/system/materializations.sql b/integration_tests/deprecated_tests/macros/system/materializations.sql index 7d2297dc9..89de786c2 100644 --- a/integration_tests/deprecated_tests/macros/system/materializations.sql +++ b/integration_tests/deprecated_tests/macros/system/materializations.sql @@ -1,7 +1,7 @@ {% materialization test, default %} - {% do return(elementary.materialization_test_default.call_macro()) %} + {% do return(elementary.materialization_test_default()) %} {% endmaterialization %} {% materialization test, adapter="snowflake" %} - {% do return(elementary.materialization_test_snowflake.call_macro()) %} + {% do return(elementary.materialization_test_snowflake()) %} {% endmaterialization %} diff --git a/integration_tests/tests/conftest.py b/integration_tests/tests/conftest.py index 56a34ca43..257854ccf 100644 --- a/integration_tests/tests/conftest.py +++ b/integration_tests/tests/conftest.py @@ -1,15 +1,24 @@ import shutil from pathlib import Path from tempfile import mkdtemp +from typing import Optional import pytest +import yaml from dbt.version import __version__ as dbt_version from dbt_project import DbtProject +from elementary.clients.dbt.factory import RunnerMethod from env import Environment from logger import get_logger from packaging import version DBT_PROJECT_PATH = Path(__file__).parent.parent / "dbt_project" +DBT_FUSION_SUPPORTED_TARGETS = [ + "snowflake", + "bigquery", + "redshift", + "databricks_catalog", +] logger = get_logger(__name__) @@ -18,10 +27,11 @@ def pytest_addoption(parser): parser.addoption("--target", action="store", default="postgres") parser.addoption("--skip-init", action="store_true", default=False) parser.addoption("--clear-on-end", action="store_true", default=False) + parser.addoption("--runner-method", action="store", default=None) @pytest.fixture(scope="session") -def project_dir_copy(): +def project_dir_copy(runner_method: Optional[RunnerMethod]): dbt_project_copy_dir = mkdtemp(prefix="integration_tests_project_") try: shutil.copytree( @@ -30,16 +40,55 @@ def project_dir_copy(): dirs_exist_ok=True, symlinks=True, ) + _edit_packages_yml_to_include_absolute_elementary_package_path( + dbt_project_copy_dir + ) + _remove_python_models_for_dbt_fusion(dbt_project_copy_dir, runner_method) yield dbt_project_copy_dir finally: shutil.rmtree(dbt_project_copy_dir) +def _edit_packages_yml_to_include_absolute_elementary_package_path( + project_dir_copy: str, +): + logger.info( + f"Editing packages.yml to include absolute elementary package path for project {project_dir_copy}" + ) + + packages_yml_path = Path(project_dir_copy) / "packages.yml" + with packages_yml_path.open("r") as packages_yml_file: + packages_yml = yaml.safe_load(packages_yml_file) + + packages_yml["packages"][0]["local"] = str( + (DBT_PROJECT_PATH / packages_yml["packages"][0]["local"]).resolve() + ) + with packages_yml_path.open("w") as packages_yml_file: + yaml.dump(packages_yml, packages_yml_file) + + +def _remove_python_models_for_dbt_fusion( + project_dir_copy: str, runner_method: Optional[RunnerMethod] +): + if runner_method != RunnerMethod.FUSION: + return + + logger.info(f"Removing python tests for project {project_dir_copy}") + + # walk on the models dir and delete python files + for path in (Path(project_dir_copy) / "models").rglob("*.py"): + path.unlink() + + @pytest.fixture(scope="session", autouse=True) def init_tests_env( - target: str, skip_init: bool, clear_on_end: bool, project_dir_copy: str + target: str, + skip_init: bool, + clear_on_end: bool, + project_dir_copy: str, + runner_method: Optional[RunnerMethod], ): - env = Environment(target, project_dir_copy) + env = Environment(target, project_dir_copy, runner_method) if not skip_init: logger.info("Initializing test environment") env.clear() @@ -70,6 +119,13 @@ def only_on_targets(request, target: str): pytest.skip("Test unsupported for target: {}".format(target)) +@pytest.fixture(autouse=True) +def skip_for_dbt_fusion(request, runner_method: Optional[RunnerMethod]): + if request.node.get_closest_marker("skip_for_dbt_fusion"): + if runner_method == RunnerMethod.FUSION: + pytest.skip("Test unsupported for dbt fusion") + + @pytest.fixture(autouse=True) def requires_dbt_version(request): if request.node.get_closest_marker("requires_dbt_version"): @@ -85,8 +141,10 @@ def requires_dbt_version(request): @pytest.fixture -def dbt_project(target: str, project_dir_copy: str) -> DbtProject: - return DbtProject(target, project_dir_copy) +def dbt_project( + target: str, project_dir_copy: str, runner_method: Optional[RunnerMethod] +) -> DbtProject: + return DbtProject(target, project_dir_copy, runner_method) @pytest.fixture(scope="session") @@ -104,6 +162,20 @@ def clear_on_end(request) -> bool: return request.config.getoption("--clear-on-end") +@pytest.fixture(scope="session") +def runner_method(request, target: str) -> Optional[RunnerMethod]: + runner_method_str = request.config.getoption("--runner-method") + if runner_method_str: + runner_method = RunnerMethod(runner_method_str) + if ( + runner_method == RunnerMethod.FUSION + and target not in DBT_FUSION_SUPPORTED_TARGETS + ): + raise ValueError(f"Fusion runner is not supported for target: {target}") + return runner_method + return None + + @pytest.fixture def test_id(request) -> str: if request.cls: diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 16340164c..ca45d2a09 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -9,7 +9,7 @@ from data_seeder import DbtDataSeeder from dbt_utils import get_database_and_schema_properties from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner -from elementary.clients.dbt.factory import create_dbt_runner +from elementary.clients.dbt.factory import RunnerMethod, create_dbt_runner from logger import get_logger from ruamel.yaml import YAML @@ -31,19 +31,29 @@ logger = get_logger(__name__) -def get_dbt_runner(target: str, project_dir: str) -> BaseDbtRunner: +def get_dbt_runner( + target: str, project_dir: str, runner_method: Optional[RunnerMethod] = None +) -> BaseDbtRunner: return create_dbt_runner( project_dir, target=target, vars=_DEFAULT_VARS.copy(), raise_on_failure=False, + runner_method=runner_method, ) class DbtProject: - def __init__(self, target: str, project_dir: str): - self.dbt_runner = get_dbt_runner(target, project_dir) + def __init__( + self, + target: str, + project_dir: str, + runner_method: Optional[RunnerMethod] = None, + ): + self.dbt_runner = get_dbt_runner(target, project_dir, runner_method) self.target = target + self.runner_method = runner_method + self.project_dir_path = Path(project_dir) self.models_dir_path = self.project_dir_path / "models" self.tmp_models_dir_path = self.models_dir_path / "tmp" @@ -111,6 +121,7 @@ def test( test_vars: Optional[dict] = None, elementary_enabled: bool = True, model_config: Optional[Dict[str, Any]] = None, + test_config: Optional[Dict[str, Any]] = None, *, multiple_results: Literal[False] = False, ) -> Dict[str, Any]: @@ -131,6 +142,7 @@ def test( test_vars: Optional[dict] = None, elementary_enabled: bool = True, model_config: Optional[Dict[str, Any]] = None, + test_config: Optional[Dict[str, Any]] = None, *, multiple_results: Literal[True], ) -> List[Dict[str, Any]]: @@ -151,6 +163,7 @@ def test( elementary_enabled: bool = True, model_config: Optional[Dict[str, Any]] = None, column_config: Optional[Dict[str, Any]] = None, + test_config: Optional[Dict[str, Any]] = None, *, multiple_results: bool = False, ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: @@ -172,10 +185,17 @@ def test( if columns: table_yaml["columns"] = columns + test_yaml = {dbt_test_name: {"arguments": test_args}} + if test_config: + test_yaml[dbt_test_name]["config"] = test_config + if test_column is None: - table_yaml["tests"] = [{dbt_test_name: test_args}] + table_yaml["tests"] = [test_yaml] else: - column_def = {"name": test_column, "tests": [{dbt_test_name: test_args}]} + column_def = { + "name": test_column, + "tests": [test_yaml], + } if column_config: column_def["config"] = column_config table_yaml["columns"] = [column_def] @@ -240,7 +260,16 @@ def seed(self, data: List[dict], table_name: str): with DbtDataSeeder( self.dbt_runner, self.project_dir_path, self.seeds_dir_path ).seed(data, table_name): - return + self._fix_seed_if_needed(table_name) + + def _fix_seed_if_needed(self, table_name: str): + # Hack for BigQuery - seems like we get empty strings instead of nulls in seeds, so we + # fix them here + if self.runner_method == RunnerMethod.FUSION and self.target == "bigquery": + self.dbt_runner.run_operation( + "elementary_tests.replace_empty_strings_with_nulls", + macro_args={"table_name": table_name}, + ) @contextmanager def seed_context( @@ -298,5 +327,8 @@ def write_yaml(self, content: dict, name: Optional[str] = None): path = self.models_dir_path / name with open(path, "w") as f: YAML().dump(content, f) - yield path - path.unlink() + + try: + yield path + finally: + path.unlink() diff --git a/integration_tests/tests/env.py b/integration_tests/tests/env.py index b0baa46db..b5d108d25 100644 --- a/integration_tests/tests/env.py +++ b/integration_tests/tests/env.py @@ -1,10 +1,20 @@ +from typing import Optional + import dbt_project +from elementary.clients.dbt.factory import RunnerMethod class Environment: - def __init__(self, target: str, project_dir: str): + def __init__( + self, + target: str, + project_dir: str, + runner_method: Optional[RunnerMethod] = None, + ): self.target = target - self.dbt_runner = dbt_project.get_dbt_runner(target, project_dir) + self.dbt_runner = dbt_project.get_dbt_runner( + target, project_dir, runner_method=runner_method + ) def clear(self): # drop schema in dremio doesnt work, but we run the dremio tests with docker so its not really important to drop the schema diff --git a/integration_tests/tests/pytest.ini b/integration_tests/tests/pytest.ini index e7b9cd06f..512fdcf0b 100644 --- a/integration_tests/tests/pytest.ini +++ b/integration_tests/tests/pytest.ini @@ -3,3 +3,4 @@ markers = skip_targets(targets): skip test for the given targets only_on_targets(targets): skip test for non given targets requires_dbt_version(version): skip test if dbt version is not satisfied + skip_for_dbt_fusion: skip test for dbt fusion diff --git a/integration_tests/tests/test_all_columns_anomalies.py b/integration_tests/tests/test_all_columns_anomalies.py index 024dee64c..83c8fb38b 100644 --- a/integration_tests/tests/test_all_columns_anomalies.py +++ b/integration_tests/tests/test_all_columns_anomalies.py @@ -59,7 +59,7 @@ def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject): # Anomalies currently not supported on ClickHouse @pytest.mark.skip_targets(["clickhouse"]) -def test_all_columns_anomalies_with_where_expression( +def test_all_columns_anomalies_with_where_parameter( test_id: str, dbt_project: DbtProject ): utc_today = datetime.utcnow().date() @@ -92,9 +92,8 @@ def test_all_columns_anomalies_with_where_expression( ] ] - params = DBT_TEST_ARGS test_results = dbt_project.test( - test_id, DBT_TEST_NAME, params, data=data, multiple_results=True + test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, multiple_results=True ) col_to_status = {res["column_name"].lower(): res["status"] for res in test_results} assert col_to_status == { @@ -103,23 +102,23 @@ def test_all_columns_anomalies_with_where_expression( "universe": "pass", } - params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'") test_results = dbt_project.test( test_id, DBT_TEST_NAME, - params, + DBT_TEST_ARGS, multiple_results=True, test_vars={"force_metrics_backfill": True}, + test_config={"where": "universe = 'Marvel'"}, ) assert all([res["status"] == "pass" for res in test_results]) - params = dict(DBT_TEST_ARGS, where="universe = 'DC'") test_results = dbt_project.test( test_id, DBT_TEST_NAME, - params, + DBT_TEST_ARGS, multiple_results=True, test_vars={"force_metrics_backfill": True}, + test_config={"where": "universe = 'DC'"}, ) col_to_status = {res["column_name"].lower(): res["status"] for res in test_results} assert col_to_status == { diff --git a/integration_tests/tests/test_anomaly_test_configuration.py b/integration_tests/tests/test_anomaly_test_configuration.py index 094895f35..10cc72688 100644 --- a/integration_tests/tests/test_anomaly_test_configuration.py +++ b/integration_tests/tests/test_anomaly_test_configuration.py @@ -114,6 +114,7 @@ def get_value(key: str): expected_config=_get_expected_adapted_config("test"), ) @pytest.mark.skip_targets(["clickhouse"]) +@pytest.mark.skip_for_dbt_fusion def test_anomaly_test_configuration( dbt_project: DbtProject, vars_config: dict, diff --git a/integration_tests/tests/test_collect_metrics.py b/integration_tests/tests/test_collect_metrics.py index 467e57fd2..867a0735d 100644 --- a/integration_tests/tests/test_collect_metrics.py +++ b/integration_tests/tests/test_collect_metrics.py @@ -150,6 +150,7 @@ def test_collect_group_by_metrics(test_id: str, dbt_project: DbtProject): # Anomalies currently not supported on ClickHouse @pytest.mark.skip_targets(["clickhouse"]) +@pytest.mark.skip_for_dbt_fusion def test_collect_metrics_unique_metric_name(test_id: str, dbt_project: DbtProject): args = DBT_TEST_ARGS.copy() args["metrics"].append(args["metrics"][0]) diff --git a/integration_tests/tests/test_column_anomalies.py b/integration_tests/tests/test_column_anomalies.py index 913f42e04..709c49cf9 100644 --- a/integration_tests/tests/test_column_anomalies.py +++ b/integration_tests/tests/test_column_anomalies.py @@ -112,29 +112,28 @@ def test_column_anomalies_with_where_parameter(test_id: str, dbt_project: DbtPro ] ] - params = DBT_TEST_ARGS test_result = dbt_project.test( - test_id, DBT_TEST_NAME, params, data=data, test_column="superhero" + test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, test_column="superhero" ) assert test_result["status"] == "fail" - params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'") test_result = dbt_project.test( test_id, DBT_TEST_NAME, - params, + DBT_TEST_ARGS, test_column="superhero", test_vars={"force_metrics_backfill": True}, + test_config={"where": "universe = 'Marvel'"}, ) assert test_result["status"] == "pass" - params = dict(DBT_TEST_ARGS, where="universe = 'DC'") test_result = dbt_project.test( test_id, DBT_TEST_NAME, - params, + DBT_TEST_ARGS, test_column="superhero", test_vars={"force_metrics_backfill": True}, + test_config={"where": "universe = 'DC'"}, ) assert test_result["status"] == "fail" diff --git a/integration_tests/tests/test_column_pii_sampling.py b/integration_tests/tests/test_column_pii_sampling.py index dff983da6..fdabf37cc 100644 --- a/integration_tests/tests/test_column_pii_sampling.py +++ b/integration_tests/tests/test_column_pii_sampling.py @@ -370,7 +370,7 @@ def test_meta_tags_and_accepted_values(test_id: str, dbt_project: DbtProject): test_args=dict(column_name=SENSITIVE_COLUMN, values=["invalid@example.com"]), data=data, columns=[ - {"name": SENSITIVE_COLUMN, "meta": {"tags": ["pii"]}}, + {"name": SENSITIVE_COLUMN, "config": {"meta": {"tags": ["pii"]}}}, {"name": SAFE_COLUMN}, ], test_vars={ diff --git a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py index 109e56284..44b048544 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py +++ b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py @@ -96,6 +96,7 @@ def test_metrics_anomaly_score(dbt_project: DbtProject): @pytest.mark.requires_dbt_version("1.8.0") +@pytest.mark.skip_for_dbt_fusion def test_source_freshness_results(test_id: str, dbt_project: DbtProject): database_property, schema_property = get_database_and_schema_properties( dbt_project.target @@ -115,11 +116,13 @@ def test_source_freshness_results(test_id: str, dbt_project: DbtProject): "tables": [ { "name": test_id, - "loaded_at_field": loaded_at_field, - "freshness": { - "warn_after": { - "count": 1, - "period": "hour", + "config": { + "loaded_at_field": loaded_at_field, + "freshness": { + "warn_after": { + "count": 1, + "period": "hour", + }, }, }, } diff --git a/integration_tests/tests/test_dbt_artifacts/test_columns.py b/integration_tests/tests/test_dbt_artifacts/test_columns.py index 41238bef4..e7b306e4d 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_columns.py +++ b/integration_tests/tests/test_dbt_artifacts/test_columns.py @@ -59,4 +59,4 @@ def test_flatten_table_columns( )[0] ) flattened_column_names = [column["name"] for column in flattened_columns] - assert flattened_column_names == expected_columns + assert set(flattened_column_names) == set(expected_columns) diff --git a/integration_tests/tests/test_dbt_artifacts/test_groups.py b/integration_tests/tests/test_dbt_artifacts/test_groups.py index 811eeb39e..092e741e7 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_groups.py +++ b/integration_tests/tests/test_dbt_artifacts/test_groups.py @@ -88,6 +88,7 @@ def assert_group_row_in_db_groups(dbt_project, group_name, owner_name, owner_ema ), f"Expected owner email '{owner_email}', got '{group_row.get('owner_email')}'" +@pytest.mark.skip_for_dbt_fusion def test_model_and_groups(dbt_project: DbtProject, tmp_path): """ Test that a model assigned to a group inherits the group attribute in the dbt_models artifact table. @@ -108,7 +109,9 @@ def test_model_and_groups(dbt_project: DbtProject, tmp_path): "models": [ { "name": model_name, - "group": group_name, + "config": { + "group": group_name, + }, "description": "A model assigned to a group for testing", } ], @@ -160,6 +163,7 @@ def test_model_and_groups(dbt_project: DbtProject, tmp_path): @pytest.mark.skip_targets(["dremio"]) +@pytest.mark.skip_for_dbt_fusion def test_two_groups(dbt_project: DbtProject, tmp_path): """ Test that two models assigned to two different groups inherit the correct group attribute in the dbt_models artifact table. @@ -181,12 +185,16 @@ def test_two_groups(dbt_project: DbtProject, tmp_path): "models": [ { "name": model_name_1, - "group": group_name_1, + "config": { + "group": group_name_1, + }, "description": "A model assigned to group 1 for testing", }, { "name": model_name_2, - "group": group_name_2, + "config": { + "group": group_name_2, + }, "description": "A model assigned to group 2 for testing", }, ], @@ -263,7 +271,9 @@ def test_test_group_attribute(dbt_project: DbtProject, tmp_path): "models": [ { "name": model_name, - "group": group_name, + "config": { + "group": group_name, + }, "description": "A model assigned to a group for testing", "columns": [{"name": "col", "tests": ["unique"]}], } @@ -334,7 +344,9 @@ def test_test_override_group(dbt_project: DbtProject, tmp_path): "models": [ { "name": model_name, - "group": test_group, + "config": { + "group": test_group, + }, "description": "A model assigned to a group for testing", "columns": [ { @@ -398,7 +410,9 @@ def test_seed_group_attribute(dbt_project: DbtProject, tmp_path): "seeds": [ { "name": seed_name, - "group": group_name, + "config": { + "group": group_name, + }, "description": "A seed assigned to a group for testing", } ], @@ -445,6 +459,7 @@ def test_seed_group_attribute(dbt_project: DbtProject, tmp_path): @pytest.mark.skip_targets(["dremio"]) +@pytest.mark.skip_for_dbt_fusion def test_snapshot_group_attribute(dbt_project: DbtProject, tmp_path): """ Test that a snapshot assigned to a group inherits the group attribute in the dbt_snapshots artifact table. @@ -469,7 +484,9 @@ def test_snapshot_group_attribute(dbt_project: DbtProject, tmp_path): "snapshots": [ { "name": snapshot_name, - "group": group_name, + "config": { + "group": group_name, + }, "description": "A snapshot assigned to a group for testing", } ], diff --git a/integration_tests/tests/test_dimension_anomalies.py b/integration_tests/tests/test_dimension_anomalies.py index cf62da0c5..69e9e8637 100644 --- a/integration_tests/tests/test_dimension_anomalies.py +++ b/integration_tests/tests/test_dimension_anomalies.py @@ -144,19 +144,24 @@ def test_dimensions_anomalies_with_where_parameter( for universe, superhero in [("DC", "Superman"), ("Marvel", "Spiderman")] ] - params = DBT_TEST_ARGS - test_result = dbt_project.test(test_id, DBT_TEST_NAME, params, data=data) + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) assert test_result["status"] == "fail" - params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'") test_result = dbt_project.test( - test_id, DBT_TEST_NAME, params, test_vars={"force_metrics_backfill": True} + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + test_vars={"force_metrics_backfill": True}, + test_config={"where": "universe = 'Marvel'"}, ) assert test_result["status"] == "pass" - params = dict(params, where="universe = 'DC'") test_result = dbt_project.test( - test_id, DBT_TEST_NAME, params, test_vars={"force_metrics_backfill": True} + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + test_vars={"force_metrics_backfill": True}, + test_config={"where": "universe = 'DC'"}, ) assert test_result["status"] == "fail" diff --git a/integration_tests/tests/test_disable_samples_config.py b/integration_tests/tests/test_disable_samples_config.py index 849672a98..3ab2d97ec 100644 --- a/integration_tests/tests/test_disable_samples_config.py +++ b/integration_tests/tests/test_disable_samples_config.py @@ -30,13 +30,14 @@ def test_disable_samples_config_prevents_sampling( test_result = dbt_project.test( test_id, "not_null", - dict(column_name=COLUMN_NAME, meta={"disable_test_samples": True}), + dict(column_name=COLUMN_NAME), data=data, as_model=True, test_vars={ "enable_elementary_test_materialization": True, "test_sample_row_count": 5, }, + test_config={"meta": {"disable_test_samples": True}}, ) assert test_result["status"] == "fail" @@ -55,13 +56,14 @@ def test_disable_samples_false_allows_sampling(test_id: str, dbt_project: DbtPro test_result = dbt_project.test( test_id, "not_null", - dict(column_name=COLUMN_NAME, meta={"disable_test_samples": False}), + dict(column_name=COLUMN_NAME), data=data, as_model=True, test_vars={ "enable_elementary_test_materialization": True, "test_sample_row_count": 5, }, + test_config={"meta": {"disable_test_samples": False}}, ) assert test_result["status"] == "fail" @@ -85,10 +87,7 @@ def test_disable_samples_config_overrides_pii_tags( test_result = dbt_project.test( test_id, "not_null", - dict( - column_name=COLUMN_NAME, - meta={"disable_test_samples": True, "tags": ["pii"]}, - ), + dict(column_name=COLUMN_NAME), data=data, as_model=True, test_vars={ @@ -96,6 +95,7 @@ def test_disable_samples_config_overrides_pii_tags( "test_sample_row_count": 5, "disable_samples_on_pii_tags": True, }, + test_config={"meta": {"disable_test_samples": True, "tags": ["pii"]}}, ) assert test_result["status"] == "fail" @@ -116,7 +116,7 @@ def test_disable_samples_and_pii_interaction(test_id: str, dbt_project: DbtProje test_result = dbt_project.test( test_id, "not_null", - dict(column_name="col1", meta={"disable_test_samples": True}), + dict(column_name="col1"), data=data, as_model=True, test_vars={ @@ -125,6 +125,7 @@ def test_disable_samples_and_pii_interaction(test_id: str, dbt_project: DbtProje "disable_samples_on_pii_tags": True, "pii_tags": ["pii"], }, + test_config={"meta": {"disable_test_samples": True}}, ) assert test_result["status"] == "fail" @@ -144,13 +145,14 @@ def test_disable_samples_with_multiple_columns(test_id: str, dbt_project: DbtPro test_result = dbt_project.test( test_id, "not_null", - dict(column_name="col1", meta={"disable_test_samples": True}), + dict(column_name="col1"), data=data, as_model=True, test_vars={ "enable_elementary_test_materialization": True, "test_sample_row_count": 5, }, + test_config={"meta": {"disable_test_samples": True}}, ) assert test_result["status"] == "fail" diff --git a/integration_tests/tests/test_exposure_schema_validity.py b/integration_tests/tests/test_exposure_schema_validity.py index 73c39c1d9..8dc9f541c 100644 --- a/integration_tests/tests/test_exposure_schema_validity.py +++ b/integration_tests/tests/test_exposure_schema_validity.py @@ -9,6 +9,7 @@ def seed(dbt_project: DbtProject): assert seed_result is True +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_existing_exposure_yml_invalid( test_id: str, dbt_project: DbtProject ): @@ -27,6 +28,7 @@ def test_exposure_schema_validity_existing_exposure_yml_invalid( assert test_result.success is False +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_existing_exposure_yml_valid( test_id: str, dbt_project: DbtProject ): @@ -45,6 +47,7 @@ def test_exposure_schema_validity_existing_exposure_yml_valid( @pytest.mark.skip_targets(["spark"]) +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_no_exposures(test_id: str, dbt_project: DbtProject): test_result = dbt_project.test(test_id, DBT_TEST_NAME) assert test_result["status"] == "pass" @@ -52,13 +55,12 @@ def test_exposure_schema_validity_no_exposures(test_id: str, dbt_project: DbtPro # Schema validity currently not supported on ClickHouse @pytest.mark.skip_targets(["spark", "clickhouse"]) +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_correct_columns_and_types( test_id: str, dbt_project: DbtProject ): - explicit_target_for_bigquery = ( - "other" - if dbt_project.dbt_runner.target in ["bigquery", "snowflake", "dremio", ""] - else "string" + target_data_type = ( + "other" if dbt_project.dbt_runner.target == "dremio" else "string" ) DBT_TEST_ARGS = { "node": "models.exposures_test", @@ -69,7 +71,7 @@ def test_exposure_schema_validity_correct_columns_and_types( "referenced_columns": [ { "column_name": "order_id", - "data_type": explicit_target_for_bigquery, + "data_type": target_data_type, } ] }, @@ -86,6 +88,7 @@ def test_exposure_schema_validity_correct_columns_and_types( @pytest.mark.skip_targets(["spark"]) +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_correct_columns_and_invalid_type( test_id: str, dbt_project: DbtProject ): @@ -118,6 +121,7 @@ def test_exposure_schema_validity_correct_columns_and_invalid_type( # Schema validity currently not supported on ClickHouse @pytest.mark.skip_targets(["spark", "clickhouse"]) +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_invalid_type_name_present_in_error( test_id: str, dbt_project: DbtProject ): @@ -160,6 +164,7 @@ def test_exposure_schema_validity_invalid_type_name_present_in_error( @pytest.mark.skip_targets(["spark"]) +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_correct_columns_and_missing_type( test_id: str, dbt_project: DbtProject ): @@ -183,6 +188,7 @@ def test_exposure_schema_validity_correct_columns_and_missing_type( @pytest.mark.skip_targets(["spark"]) +@pytest.mark.skip_for_dbt_fusion def test_exposure_schema_validity_missing_columns( test_id: str, dbt_project: DbtProject ): diff --git a/integration_tests/tests/test_failed_row_count.py b/integration_tests/tests/test_failed_row_count.py index 81943019b..8de6aec8b 100644 --- a/integration_tests/tests/test_failed_row_count.py +++ b/integration_tests/tests/test_failed_row_count.py @@ -6,6 +6,7 @@ # Failed row count currently not supported on ClickHouse @pytest.mark.skip_targets(["clickhouse"]) +@pytest.mark.skip_for_dbt_fusion def test_count_failed_row_count(test_id: str, dbt_project: DbtProject): null_count = 50 data = [{COLUMN_NAME: None} for _ in range(null_count)] @@ -23,6 +24,7 @@ def test_count_failed_row_count(test_id: str, dbt_project: DbtProject): ) # when the failed_row_count_calc is count(*), these should be equal +@pytest.mark.skip_for_dbt_fusion def test_sum_failed_row_count(test_id: str, dbt_project: DbtProject): non_unique_count = 50 data = [{COLUMN_NAME: 5} for _ in range(non_unique_count)] @@ -42,6 +44,7 @@ def test_sum_failed_row_count(test_id: str, dbt_project: DbtProject): # Failed row count currently not supported on ClickHouse @pytest.mark.skip_targets(["clickhouse"]) +@pytest.mark.skip_for_dbt_fusion def test_custom_failed_row_count(test_id: str, dbt_project: DbtProject): null_count = 50 overwrite_failed_row_count = 5 @@ -61,6 +64,7 @@ def test_custom_failed_row_count(test_id: str, dbt_project: DbtProject): assert test_result["failed_row_count"] == overwrite_failed_row_count +@pytest.mark.skip_for_dbt_fusion def test_warn_if_0(test_id: str, dbt_project: DbtProject): # Edge case that we want to verify diff --git a/integration_tests/tests/test_jsonschema.py b/integration_tests/tests/test_jsonschema.py index f8707de23..7e70acecc 100644 --- a/integration_tests/tests/test_jsonschema.py +++ b/integration_tests/tests/test_jsonschema.py @@ -14,6 +14,7 @@ @pytest.mark.requires_dbt_version("1.3.0") +@pytest.mark.skip_for_dbt_fusion class TestJsonschema: @pytest.mark.only_on_targets(SUPPORTED_TARGETS) def test_valid(self, test_id: str, dbt_project: DbtProject): diff --git a/integration_tests/tests/test_long_strings.py b/integration_tests/tests/test_long_strings.py index 6094ac292..37c0296f9 100644 --- a/integration_tests/tests/test_long_strings.py +++ b/integration_tests/tests/test_long_strings.py @@ -1,3 +1,4 @@ +import pytest from dbt_project import DbtProject SAFE_QUERY_SIZE = 10000 @@ -17,6 +18,7 @@ def read_run_result(dbt_project, test_id): )[0] +@pytest.mark.skip_for_dbt_fusion def test_query_size_exceed(test_id: str, dbt_project: DbtProject): dbt_project.dbt_runner.vars["disable_run_results"] = False max_query_size = int( @@ -35,6 +37,7 @@ def test_query_size_exceed(test_id: str, dbt_project: DbtProject): assert len(result["compiled_code"]) < max_query_size +@pytest.mark.skip_for_dbt_fusion def test_query_size_safe(test_id: str, dbt_project: DbtProject): dbt_project.dbt_runner.vars["disable_run_results"] = False query = generate_query(SAFE_QUERY_SIZE) diff --git a/integration_tests/tests/test_python.py b/integration_tests/tests/test_python.py index 1fbd8cbab..52fdb2276 100644 --- a/integration_tests/tests/test_python.py +++ b/integration_tests/tests/test_python.py @@ -10,6 +10,7 @@ @pytest.mark.requires_dbt_version("1.3.0") +@pytest.mark.skip_for_dbt_fusion class TestPython: @pytest.mark.only_on_targets(SUPPORTED_TARGETS) @Parametrization.autodetect_parameters() diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index fd9de2d88..7f01091fe 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -74,29 +74,28 @@ def test_volume_anomalies_with_where_parameter( for payback in ["karate", "ka-razy"] ] - params = DBT_TEST_ARGS test_result = dbt_project.test( - test_id, DBT_TEST_NAME, params, data=data, as_model=as_model + test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, as_model=as_model ) assert test_result["status"] == "fail" - params = dict(DBT_TEST_ARGS, where="payback = 'karate'") test_result = dbt_project.test( test_id, DBT_TEST_NAME, - params, + DBT_TEST_ARGS, as_model=as_model, test_vars={"force_metrics_backfill": True}, + test_config={"where": "payback = 'karate'"}, ) assert test_result["status"] == "pass" - params = dict(DBT_TEST_ARGS, where="payback = 'ka-razy'") test_result = dbt_project.test( test_id, DBT_TEST_NAME, - params, + DBT_TEST_ARGS, as_model=as_model, test_vars={"force_metrics_backfill": True}, + test_config={"where": "payback = 'ka-razy'"}, ) assert test_result["status"] == "fail" diff --git a/macros/edr/dbt_artifacts/flatten_node.sql b/macros/edr/dbt_artifacts/flatten_node.sql new file mode 100644 index 000000000..dbb26c141 --- /dev/null +++ b/macros/edr/dbt_artifacts/flatten_node.sql @@ -0,0 +1,19 @@ +{% macro flatten_node(node_dict) %} + {% set resource_type = node_dict.get("resource_type") %} + {% set flatten_func = { + "model": elementary.flatten_model, + "snapshot": elementary.flatten_model, + "source": elementary.flatten_source, + "seed": elementary.flatten_seed, + "test": elementary.flatten_test, + "group": elementary.flatten_group, + "metric": elementary.flatten_metric, + "exposure": elementary.flatten_exposure + }.get(resource_type) %} + + {% if not flatten_func %} + {% do exceptions.raise_compiler_error("Unknown resource type: " ~ resource_type) %} + {% endif %} + + {% do return(flatten_func(node_dict)) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/edr/dbt_artifacts/upload_dbt_artifacts.sql b/macros/edr/dbt_artifacts/upload_dbt_artifacts.sql index b8b3d2841..8be1d3008 100644 --- a/macros/edr/dbt_artifacts/upload_dbt_artifacts.sql +++ b/macros/edr/dbt_artifacts/upload_dbt_artifacts.sql @@ -51,12 +51,15 @@ order by metadata_hash {% endset %} {% set artifacts_hashes_results = elementary.run_query(stored_artifacts_query) %} - {% set artifact_agate_hashes = artifacts_hashes_results.group_by("artifacts_model") %} - {% set artifacts_hashes = {} %} - {% for artifacts_model, metadata_hashes in artifact_agate_hashes.items() %} - {% do artifacts_hashes.update({artifacts_model: metadata_hashes.columns["metadata_hash"]}) %} + {% set artifacts_hashes = elementary.agate_to_dicts(artifacts_hashes_results) %} + + {% set artifacts_hashes_per_model = {} %} + {% for artifacts_hashes_row in artifacts_hashes %} + {% do artifacts_hashes_per_model.setdefault(artifacts_hashes_row['artifacts_model'], []) %} + {% do artifacts_hashes_per_model[artifacts_hashes_row['artifacts_model']].append(artifacts_hashes_row['metadata_hash']) %} {% endfor %} - {% do return(artifacts_hashes) %} + + {% do return(artifacts_hashes_per_model) %} {% endmacro %} {% macro get_artifacts_hashes_for_model(model_name) %} diff --git a/macros/edr/dbt_artifacts/upload_dbt_invocation.sql b/macros/edr/dbt_artifacts/upload_dbt_invocation.sql index baae0d482..b65e48fab 100644 --- a/macros/edr/dbt_artifacts/upload_dbt_invocation.sql +++ b/macros/edr/dbt_artifacts/upload_dbt_invocation.sql @@ -87,8 +87,12 @@ {%- macro get_invocation_select_filter() -%} {% set config = elementary.get_runtime_config() %} - {%- if invocation_args_dict and invocation_args_dict.select -%} - {{- return(invocation_args_dict.select) -}} + {%- if invocation_args_dict -%} + {%- if invocation_args_dict.select -%} + {{- return(invocation_args_dict.select) -}} + {%- elif invocation_args_dict.SELECT -%} + {{- return(invocation_args_dict.SELECT) -}} + {%- endif -%} {%- elif config.args and config.args.select -%} {{- return(config.args.select) -}} {%- else -%} @@ -102,6 +106,16 @@ {% do return(invocation_args_dict.selector) %} {% elif invocation_args_dict and invocation_args_dict.selector_name %} {% do return(invocation_args_dict.selector_name) %} + {% elif invocation_args_dict and invocation_args_dict.INVOCATION_COMMAND %} + {% set match = modules.re.search( + "--selector(?:\s+|=)(\S+)", + invocation_args_dict.INVOCATION_COMMAND + ) %} + {% if match %} + {% do return(match.group(1)) %} + {% else %} + {% do return(none) %} + {% endif %} {% elif config.args and config.args.selector_name %} {% do return(config.args.selector_name) %} {% else %} @@ -118,6 +132,8 @@ {% else %} {% set invocation_vars = fromyaml(invocation_args_dict.vars) %} {% endif %} + {% elif invocation_args_dict and invocation_args_dict.VARS %} + {% set invocation_vars = invocation_args_dict.VARS %} {% elif config.cli_vars %} {% set invocation_vars = config.cli_vars %} {% endif %} @@ -128,7 +144,7 @@ {% set all_vars = {} %} {% set config = elementary.get_runtime_config() %} {%- if config.vars -%} - {% do all_vars.update(config.vars.to_dict()) %} + {% do all_vars.update(elementary.dbt_object_to_dict(config.vars)) %} {%- endif -%} {% do all_vars.update(elementary.get_invocation_vars()) %} {{- return(all_vars) -}} diff --git a/macros/edr/dbt_artifacts/upload_dbt_tests.sql b/macros/edr/dbt_artifacts/upload_dbt_tests.sql index 8b641d7c5..b1f313d2f 100644 --- a/macros/edr/dbt_artifacts/upload_dbt_tests.sql +++ b/macros/edr/dbt_artifacts/upload_dbt_tests.sql @@ -57,20 +57,23 @@ {% set config_meta_dict = elementary.safe_get_with_default(config_dict, 'meta', {}) %} {% set meta_dict = elementary.safe_get_with_default(node_dict, 'meta', {}) %} - {% do meta_dict.update(config_meta_dict) %} + + {% set unified_meta = {} %} + {% do unified_meta.update(config_meta_dict) %} + {% do unified_meta.update(meta_dict) %} {% set description = none %} {% if dbt_version >= '1.9.0' and node_dict.get('description') %} {% set description = node_dict.get('description') %} - {% elif meta_dict.get('description') %} - {% set description = meta_dict.pop('description') %} + {% elif unified_meta.get('description') %} + {% set description = unified_meta.pop('description') %} {% elif default_description %} {% set description = default_description %} {% endif %} {% set config_tags = elementary.safe_get_with_default(config_dict, 'tags', []) %} {% set global_tags = elementary.safe_get_with_default(node_dict, 'tags', []) %} - {% set meta_tags = elementary.safe_get_with_default(meta_dict, 'tags', []) %} + {% set meta_tags = elementary.safe_get_with_default(unified_meta, 'tags', []) %} {% set tags = elementary.union_lists(config_tags, global_tags) %} {% set tags = elementary.union_lists(tags, meta_tags) %} @@ -79,7 +82,7 @@ {% set test_models_owners = [] %} {% set test_models_tags = [] %} {% for test_model_node in test_model_nodes %} - {% set flatten_test_model_node = elementary.flatten_model(test_model_node) %} + {% set flatten_test_model_node = elementary.flatten_node(test_model_node) %} {% set test_model_owner = flatten_test_model_node.get('owner') %} {% if test_model_owner %} {% if test_model_owner is string %} @@ -130,6 +133,8 @@ {% endif %} {% endif %} + {% set group_name = config_dict.get("group") or node_dict.get("group") %} + {% set primary_test_model_database = none %} {% set primary_test_model_schema = none %} {%- if primary_test_model_id.data is not none -%} @@ -137,6 +142,7 @@ {%- if tested_model_node -%} {% set primary_test_model_database = tested_model_node.get('database') %} {% set primary_test_model_schema = tested_model_node.get('schema') %} + {% set group_name = group_name or tested_model_node.get('group') %} {%- endif -%} {%- endif -%} @@ -160,7 +166,7 @@ 'tags': elementary.filter_none_and_sort(tags), 'model_tags': elementary.filter_none_and_sort(test_models_tags), 'model_owners': elementary.filter_none_and_sort(test_models_owners), - 'meta': meta_dict, + 'meta': unified_meta, 'database_name': primary_test_model_database, 'schema_name': primary_test_model_schema, 'depends_on_macros': elementary.filter_none_and_sort(depends_on_dict.get('macros', [])), @@ -174,8 +180,8 @@ 'compiled_code': elementary.get_compiled_code(node_dict), 'path': node_dict.get('path'), 'generated_at': elementary.datetime_now_utc_as_string(), - 'quality_dimension': meta_dict.get('quality_dimension') or elementary.get_quality_dimension(test_original_name, test_namespace), - 'group_name': config_dict.get("group") or node_dict.get("group"), + 'quality_dimension': unified_meta.get('quality_dimension') or elementary.get_quality_dimension(test_original_name, test_namespace), + 'group_name': group_name, }%} {% do flatten_test_metadata_dict.update({"metadata_hash": elementary.get_artifact_metadata_hash(flatten_test_metadata_dict)}) %} {{ return(flatten_test_metadata_dict) }} @@ -185,7 +191,7 @@ {% set test_type = 'generic' %} {%- if test_namespace == 'dbt_expectations' -%} {% set test_type = 'expectation' %} - {%- elif 'tests/generic' in test_path or 'macros/' in test_path -%} + {%- elif 'tests/generic' in test_path or 'macros/' in test_path or 'generic_tests/' in test_path -%} {% set test_type = 'generic' %} {%- elif 'tests/' in test_path -%} {% set test_type = 'singular' %} diff --git a/macros/edr/materializations/test/test.sql b/macros/edr/materializations/test/test.sql index 6f52f67c4..ebc4c4dcf 100644 --- a/macros/edr/materializations/test/test.sql +++ b/macros/edr/materializations/test/test.sql @@ -169,9 +169,8 @@ {% endif %} {# Get the compiled test query #} - {% set test_query = elementary.get_compiled_code(flattened_test) %} - {% set test_query_lower = test_query.lower() %} - + {% set test_query_lower = sql.lower() %} + {# Check if query uses * (select all columns) #} {# Note: This is intentionally conservative and may over-censor in cases like "SELECT * FROM other_table" in CTEs, but it's better to be safe with PII data #} diff --git a/macros/edr/system/configuration/is_elementary_enabled.sql b/macros/edr/system/configuration/is_elementary_enabled.sql index 8c3aa66ec..a31e0cc0e 100644 --- a/macros/edr/system/configuration/is_elementary_enabled.sql +++ b/macros/edr/system/configuration/is_elementary_enabled.sql @@ -1,3 +1,4 @@ {% macro is_elementary_enabled() %} - {% do return("elementary" in graph) %} + {% set database_name = elementary.get_package_database_and_schema()[0] %} + {% do return(database_name is not none) %} {% endmacro %} diff --git a/macros/edr/system/hooks/on_run_end.sql b/macros/edr/system/hooks/on_run_end.sql index 04d488d17..d66f5349b 100644 --- a/macros/edr/system/hooks/on_run_end.sql +++ b/macros/edr/system/hooks/on_run_end.sql @@ -1,4 +1,8 @@ {% macro on_run_end() %} + {% if not elementary.is_elementary_enabled() %} + {% do return('') %} + {% endif %} + {%- if execute and not elementary.is_docs_command() %} {% set edr_cli_run = elementary.get_config_var('edr_cli_run') %} {% if not execute or edr_cli_run %} diff --git a/macros/edr/system/hooks/on_run_start.sql b/macros/edr/system/hooks/on_run_start.sql index 62d838905..3a32f69aa 100644 --- a/macros/edr/system/hooks/on_run_start.sql +++ b/macros/edr/system/hooks/on_run_start.sql @@ -1,11 +1,14 @@ {% macro on_run_start() %} + {% if not elementary.is_elementary_enabled() %} + {% do return('') %} + {% endif %} + {% set edr_cli_run = elementary.get_config_var('edr_cli_run') %} {% if not execute or edr_cli_run %} {{ return('') }} {% endif %} {% do elementary.recommend_dbt_core_artifacts_upgrade() %} - {% do elementary.ensure_materialize_override() %} {% set runtime_config = elementary.get_elementary_runtime_config(include_defaults=false) %} {% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %} {% set elementary_version = elementary.get_elementary_package_version() %} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 0884b0fcb..97da70f01 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -70,7 +70,6 @@ 'anomaly_direction': 'both', 'store_result_rows_in_own_table': true, 'mute_dbt_upgrade_recommendation': false, - 'mute_ensure_materialization_override': false, 'calculate_failed_count': true, 'tests_use_temp_tables': false, 'clean_elementary_temp_tables': true, diff --git a/macros/edr/system/system_utils/get_pii_columns_from_parent_model.sql b/macros/edr/system/system_utils/get_pii_columns_from_parent_model.sql index b60d374c1..dd8377838 100644 --- a/macros/edr/system/system_utils/get_pii_columns_from_parent_model.sql +++ b/macros/edr/system/system_utils/get_pii_columns_from_parent_model.sql @@ -3,6 +3,7 @@ column_node.get('tags', []), column_node.get('config', {}).get('tags', []), column_node.get('meta', {}).get('tags', []), + column_node.get('config', {}).get('meta', {}).get('tags', []), ] %} {% set all_column_tags = [] %} diff --git a/macros/edr/system/system_utils/get_test_model.sql b/macros/edr/system/system_utils/get_test_model.sql new file mode 100644 index 000000000..4a5743e02 --- /dev/null +++ b/macros/edr/system/system_utils/get_test_model.sql @@ -0,0 +1,6 @@ +{% macro get_test_model() %} + {# This macro is used to get the global "model" object for tests, + we created it since context["model"] doesn't return the same thing in dbt-fusion #} + + {% do return(model) %} +{% endmacro %} diff --git a/macros/edr/system/system_utils/is_dbt_fusion.sql b/macros/edr/system/system_utils/is_dbt_fusion.sql new file mode 100644 index 000000000..2a3c1e08b --- /dev/null +++ b/macros/edr/system/system_utils/is_dbt_fusion.sql @@ -0,0 +1,7 @@ +{% macro is_dbt_fusion() %} + {% if dbt_version.split(".")[0] | int > 1 %} + {% do return(true) %} + {% endif %} + + {% do return(false) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/edr/tests/on_run_end/handle_tests_results.sql b/macros/edr/tests/on_run_end/handle_tests_results.sql index 98860c606..6b8e0eef2 100644 --- a/macros/edr/tests/on_run_end/handle_tests_results.sql +++ b/macros/edr/tests/on_run_end/handle_tests_results.sql @@ -25,37 +25,60 @@ {% do return('') %} {% endmacro %} -{% macro get_result_enriched_elementary_test_results(cached_elementary_test_results, cached_elementary_test_failed_row_counts, render_result_rows=false) %} - {% set elementary_test_results = [] %} - - {% for result in results | selectattr('node.resource_type', '==', 'test') %} - {% set result = elementary.get_run_result_dict(result) %} - {% set elementary_test_results_rows = cached_elementary_test_results.get(result.node.unique_id) %} - {% set elementary_test_failed_row_count = cached_elementary_test_failed_row_counts.get(result.node.unique_id) %} +{% macro get_normalized_test_status(result, elementary_test_results_row) %} + {% set failures = elementary_test_results_row.get("failures", result.failures) %} + {% set status = result.status %} + + {# For Elementary anomaly tests, we actually save more than one result per test, in that case the dbt status will be "fail" + even if one such result failed and the rest succeeded. To handle this, we make sure to mark the status as "pass" for these + results if the number of failed rows is 0. + We don't want to do this for every test though - because otherwise it can break configurations like warn_if=0 #} + {% if failures == 0 and elementary_test_results_row.get("test_type") == "anomaly_detection" %} + {% set status = "pass" %} + {% endif %} - {# Materializing the test failed and therefore was not added to the cache. #} - {% if not elementary_test_results_rows %} - {% set flattened_test = elementary.flatten_test(result.node) %} - {% set elementary_test_results_rows = [elementary.get_dbt_test_result_row(flattened_test)] %} + {% if elementary.is_dbt_fusion() %} + {% if status == 'error' %} + {# dbt-fusion currently doesn't distinguish between failure and error #} + {% set status = "fail" %} + {% elif status == 'success' %} + {# dbt-fusion seems to sometime return 'pass' and sometimes 'success', so we normalize to 'pass' #} + {% set status = "pass" %} {% endif %} + {% endif %} - {% for elementary_test_results_row in elementary_test_results_rows %} - {% set failures = elementary_test_results_row.get("failures", result.failures) %} + {% do return(status) %} +{% endmacro %} - {# For Elementary anomaly tests, we actually save more than one result per test, in that case the dbt status will be "fail" - even if one such result failed and the rest succeeded. To handle this, we make sure to mark the status as "pass" for these - results if the number of failed rows is 0. - We don't want to do this for every test though - because otherwise it can break configurations like warn_if=0 #} - {% set status = "pass" if failures == 0 and elementary_test_results_row.get("test_type") == "anomaly_detection" else result.status %} +{% macro get_result_enriched_elementary_test_results(cached_elementary_test_results, cached_elementary_test_failed_row_counts, render_result_rows=false) %} + {% set elementary_test_results = [] %} - {% do elementary_test_results_row.update({'status': status, 'failures': failures, 'invocation_id': invocation_id, - 'failed_row_count': elementary_test_failed_row_count}) %} - {% do elementary_test_results_row.setdefault('test_results_description', result.message) %} - {% if render_result_rows %} - {% do elementary_test_results_row.update({"result_rows": elementary.render_result_rows(elementary_test_results_row.result_rows)}) %} + {% for result in results %} + {% set result = elementary.get_run_result_dict(result) %} + {% set node = result.node or elementary.get_node(result.unique_id or result.node.unique_id) %} + {% if node.resource_type == 'test' %} + {% set elementary_test_results_rows = cached_elementary_test_results.get(node.unique_id) %} + {% set elementary_test_failed_row_count = cached_elementary_test_failed_row_counts.get(node.unique_id) %} + + {# Materializing the test failed and therefore was not added to the cache. #} + {% if not elementary_test_results_rows %} + {% set flattened_test = elementary.flatten_test(node) %} + {% set elementary_test_results_rows = [elementary.get_dbt_test_result_row(flattened_test)] %} {% endif %} - {% do elementary_test_results.append(elementary_test_results_row) %} - {% endfor %} + + {% for elementary_test_results_row in elementary_test_results_rows %} + {% set failures = elementary_test_results_row.get("failures", result.failures) %} + {% set status = elementary.get_normalized_test_status(result, elementary_test_results_row) %} + + {% do elementary_test_results_row.update({'status': status, 'failures': failures, 'invocation_id': invocation_id, + 'failed_row_count': elementary_test_failed_row_count}) %} + {% do elementary_test_results_row.setdefault('test_results_description', result.message) %} + {% if render_result_rows %} + {% do elementary_test_results_row.update({"result_rows": elementary.render_result_rows(elementary_test_results_row.result_rows)}) %} + {% endif %} + {% do elementary_test_results.append(elementary_test_results_row) %} + {% endfor %} + {% endif %} {% endfor %} {% do return(elementary_test_results) %} @@ -115,7 +138,7 @@ {% endset %} {{ elementary.file_log("Inserting metrics into {}.".format(target_relation)) }} - {%- do elementary.run_query(dbt.create_table_as(True, temp_relation, test_tables_union_query)) %} + {%- do elementary.edr_create_table_as(true, temp_relation, test_tables_union_query) %} {% do elementary.run_query(insert_query) %} {% if not elementary.has_temp_table_support() %} @@ -163,7 +186,7 @@ {% endset %} {{ elementary.file_log("Inserting schema columns snapshot into {}.".format(target_relation)) }} - {%- do elementary.run_query(dbt.create_table_as(True, temp_relation, test_tables_union_query)) %} + {%- do elementary.edr_create_table_as(true, temp_relation, test_tables_union_query) %} {% do elementary.run_query(insert_query) %} {% if not elementary.has_temp_table_support() %} @@ -174,13 +197,15 @@ {% macro pop_test_result_rows(elementary_test_results) %} {% set result_rows = [] %} {% for test_result in elementary_test_results %} - {% for result_row in test_result.pop('result_rows', []) %} - {% do result_rows.append({ - "elementary_test_results_id": test_result.id, - "detected_at": test_result.detected_at, - "result_row": result_row - }) %} - {% endfor %} + {% if 'result_rows' in test_result %} + {% for result_row in test_result.pop('result_rows') %} + {% do result_rows.append({ + "elementary_test_results_id": test_result.id, + "detected_at": test_result.detected_at, + "result_row": result_row + }) %} + {% endfor %} + {% endif %} {% endfor %} {% do return(result_rows) %} {% endmacro %} diff --git a/macros/edr/tests/on_run_start/ensure_materialize_override.sql b/macros/edr/tests/on_run_start/ensure_materialize_override.sql deleted file mode 100644 index cf088c73b..000000000 --- a/macros/edr/tests/on_run_start/ensure_materialize_override.sql +++ /dev/null @@ -1,30 +0,0 @@ -{% macro ensure_materialize_override() %} - {% if elementary.get_config_var("mute_ensure_materialization_override") %} - {% do return(none) %} - {% endif %} - - {% set runtime_config = elementary.get_runtime_config() %} - {% if runtime_config.args.require_explicit_package_overrides_for_builtin_materializations is false %} - {% do elementary.file_log("Materialization override is enabled.") %} - {% do return(none) %} - {% endif %} - - {% set major, minor, revision = dbt_version.split(".") %} - {% set major = major | int %} - {% set minor = minor | int %} - {% if major > 1 or major == 1 and minor >= 8 %} - {%- set msg %} -IMPORTANT - Starting from dbt 1.8, users must explicitly allow packages to override materializations. -Elementary requires this ability to support collection of samples and failed row count for dbt tests. -Please add the following flag to dbt_project.yml to allow it: - -flags: - require_explicit_package_overrides_for_builtin_materializations: false - -Notes - -* This is a temporary measure that will result in a deprecation warning, please ignore it for now. Elementary is working with the dbt-core team on a more permanent solution. -* This message can be muted by setting the 'mute_ensure_materialization_override' var to true. - {% endset %} - {% do log(msg, info=true) %} - {% endif %} -{% endmacro %} diff --git a/macros/edr/tests/test_ai_data_validation.sql b/macros/edr/tests/test_ai_data_validation.sql index a16c0b748..14852cc16 100644 --- a/macros/edr/tests/test_ai_data_validation.sql +++ b/macros/edr/tests/test_ai_data_validation.sql @@ -1,7 +1,7 @@ {% test ai_data_validation(model, column_name, expectation_prompt, llm_model_name=none, prompt_context='') %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} {% endif %} diff --git a/macros/edr/tests/test_all_columns_anomalies.sql b/macros/edr/tests/test_all_columns_anomalies.sql index 2a4161651..37c78f3c9 100644 --- a/macros/edr/tests/test_all_columns_anomalies.sql +++ b/macros/edr/tests/test_all_columns_anomalies.sql @@ -1,7 +1,7 @@ {% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity,ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, dimensions) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} {% endif %} @@ -93,7 +93,7 @@ {{- elementary.test_log('end', full_table_name, 'all columns') }} - {% set flattened_test = elementary.flatten_test(context["model"]) %} + {% set flattened_test = elementary.flatten_test(elementary.get_test_model()) %} {% set anomaly_scores_sql = elementary.get_read_anomaly_scores_query() %} {% do elementary.store_metrics_table_in_cache() %} {% do elementary.store_anomaly_test_results(flattened_test, anomaly_scores_sql) %} diff --git a/macros/edr/tests/test_collect_metrics.sql b/macros/edr/tests/test_collect_metrics.sql index 12ceb4523..f0a213e02 100644 --- a/macros/edr/tests/test_collect_metrics.sql +++ b/macros/edr/tests/test_collect_metrics.sql @@ -26,7 +26,7 @@ {% set dimensions = [] %} {% endif %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {% do exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") %} {% endif %} diff --git a/macros/edr/tests/test_column_anomalies.sql b/macros/edr/tests/test_column_anomalies.sql index e208abf13..94480d2c2 100644 --- a/macros/edr/tests/test_column_anomalies.sql +++ b/macros/edr/tests/test_column_anomalies.sql @@ -1,7 +1,7 @@ {% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity,ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, dimensions) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} {% endif %} @@ -92,7 +92,7 @@ {% set anomaly_scores_test_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'anomaly_scores', anomaly_scores_query) %} {{ elementary.test_log('end', full_table_name, column_name) }} - {% set flattened_test = elementary.flatten_test(context["model"]) %} + {% set flattened_test = elementary.flatten_test(elementary.get_test_model()) %} {% set anomaly_scores_sql = elementary.get_read_anomaly_scores_query() %} {% do elementary.store_metrics_table_in_cache() %} {% do elementary.store_anomaly_test_results(flattened_test, anomaly_scores_sql) %} diff --git a/macros/edr/tests/test_dimension_anomalies.sql b/macros/edr/tests/test_dimension_anomalies.sql index b11353e4e..6412973a2 100644 --- a/macros/edr/tests/test_dimension_anomalies.sql +++ b/macros/edr/tests/test_dimension_anomalies.sql @@ -1,7 +1,7 @@ {% test dimension_anomalies(model, dimensions, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity,ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_final_results) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} {% endif %} @@ -70,8 +70,9 @@ {% set anomaly_scores_test_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'anomaly_scores', anomaly_scores_query) %} {{ elementary.test_log('end', full_table_name) }} - {% set flattened_test = elementary.flatten_test(context["model"]) %} + {% set flattened_test = elementary.flatten_test(elementary.get_test_model()) %} {% set anomalous_dimension_rows_sql = elementary.get_anomaly_query_for_dimension_anomalies(flattened_test) %} + {% do elementary.store_metrics_table_in_cache() %} {% do elementary.store_anomaly_test_results(flattened_test, anomalous_dimension_rows_sql) %} diff --git a/macros/edr/tests/test_event_freshness_anomalies.sql b/macros/edr/tests/test_event_freshness_anomalies.sql index 5856260cd..0f6b1af05 100644 --- a/macros/edr/tests/test_event_freshness_anomalies.sql +++ b/macros/edr/tests/test_event_freshness_anomalies.sql @@ -1,7 +1,7 @@ {% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} {{ config(tags = ['elementary-tests']) }} {% if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} {% endif %} diff --git a/macros/edr/tests/test_exposure_schema_validity.sql b/macros/edr/tests/test_exposure_schema_validity.sql index 476bdee88..2b5f66010 100644 --- a/macros/edr/tests/test_exposure_schema_validity.sql +++ b/macros/edr/tests/test_exposure_schema_validity.sql @@ -16,7 +16,7 @@ {%- set exposures = (exposures or graph.exposures).values() -%} {%- set columns = columns or adapter.get_columns_in_relation(model) -%} - {%- set model_relation = elementary.get_model_relation_for_test(model, context["model"]) -%} + {%- set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) -%} {%- set full_table_name = elementary.relation_to_full_name(model_relation) -%} {{- elementary.test_log('start', full_table_name, 'exposure validation') -}} diff --git a/macros/edr/tests/test_schema_changes.sql b/macros/edr/tests/test_schema_changes.sql index 3db6c4da5..cd8e1710c 100644 --- a/macros/edr/tests/test_schema_changes.sql +++ b/macros/edr/tests/test_schema_changes.sql @@ -1,7 +1,7 @@ {% test schema_changes(model) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} {% endif %} @@ -30,7 +30,7 @@ {{ elementary.debug_log('schema_changes_alert_query - \n' ~ schema_changes_alert_query) }} {% set alerts_temp_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'schema_changes_alerts', schema_changes_alert_query) %} - {% set flattened_test = elementary.flatten_test(context["model"]) %} + {% set flattened_test = elementary.flatten_test(elementary.get_test_model()) %} {% set schema_changes_sql = 'select * from {}'.format(alerts_temp_table_relation) %} {% do elementary.store_schema_snapshot_tables_in_cache() %} {% do elementary.store_schema_test_results(flattened_test, schema_changes_sql) %} diff --git a/macros/edr/tests/test_schema_changes_from_baseline.sql b/macros/edr/tests/test_schema_changes_from_baseline.sql index 4fd012924..f979fb268 100644 --- a/macros/edr/tests/test_schema_changes_from_baseline.sql +++ b/macros/edr/tests/test_schema_changes_from_baseline.sql @@ -1,7 +1,7 @@ {% test schema_changes_from_baseline(model, fail_on_added=False, enforce_types=False) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source)") }} {% endif %} @@ -20,7 +20,7 @@ {% set full_table_name = elementary.relation_to_full_name(model_relation) %} {% set changes_from_baseline_query = elementary.get_column_changes_from_baseline_query(model_relation, full_table_name, baseline_table_relation, include_added=fail_on_added) %} - {% set flattened_test = elementary.flatten_test(context["model"]) %} + {% set flattened_test = elementary.flatten_test(elementary.get_test_model()) %} {% do elementary.store_schema_snapshot_tables_in_cache() %} {% do elementary.store_schema_test_results(flattened_test, changes_from_baseline_query) %} diff --git a/macros/edr/tests/test_table_anomalies.sql b/macros/edr/tests/test_table_anomalies.sql index c190f58d1..662bbc269 100644 --- a/macros/edr/tests/test_table_anomalies.sql +++ b/macros/edr/tests/test_table_anomalies.sql @@ -1,7 +1,7 @@ {% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} - {% set model_relation = elementary.get_model_relation_for_test(model, context["model"]) %} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} {% if not model_relation %} {{ exceptions.raise_compiler_error("The test has unsupported configuration, please contact Elementary support") }} {% endif %} @@ -82,7 +82,7 @@ {% set anomaly_scores_test_table_relation = elementary.create_elementary_test_table(database_name, tests_schema_name, test_table_name, 'anomaly_scores', anomaly_scores_query) %} {{ elementary.test_log('end', full_table_name) }} - {% set flattened_test = elementary.flatten_test(context["model"]) %} + {% set flattened_test = elementary.flatten_test(elementary.get_test_model()) %} {% set anomaly_scores_sql = elementary.get_read_anomaly_scores_query() %} {% do elementary.store_metrics_table_in_cache() %} {% do elementary.store_anomaly_test_results(flattened_test, anomaly_scores_sql) %} diff --git a/macros/edr/tests/test_utils/create_elementary_test_table.sql b/macros/edr/tests/test_utils/create_elementary_test_table.sql index 57ecc70d1..0b060b3b0 100644 --- a/macros/edr/tests/test_utils/create_elementary_test_table.sql +++ b/macros/edr/tests/test_utils/create_elementary_test_table.sql @@ -1,11 +1,7 @@ {% macro create_elementary_test_table(database_name, schema_name, test_name, table_type, sql_query) %} {% if execute %} - {% set temp_table_name = elementary.table_name_with_suffix(test_name, "__" ~ table_type ~ elementary.get_timestamped_table_suffix()).replace("*", "") %} - - {% set default_identifier_quoting = api.Relation.get_default_quote_policy().get_part("identifier") %} - {% if not adapter.config.quoting.get("identifier", default_identifier_quoting) %} - {% set temp_table_name = adapter.quote(temp_table_name) %} - {% endif %} + {% set temp_table_name = elementary.table_name_with_suffix(test_name, "__" ~ table_type ~ elementary.get_timestamped_table_suffix()) %} + {% set temp_table_name = temp_table_name.replace("*", "").replace("-", "_").replace(".", "_") %} {{ elementary.debug_log(table_type ~ ' table: ' ~ database_name ~ '.' ~ schema_name ~ '.' ~ temp_table_name) }} diff --git a/macros/edr/tests/test_utils/get_model_relation_for_test.sql b/macros/edr/tests/test_utils/get_model_relation_for_test.sql index 97ed7c616..ed5f59d8c 100644 --- a/macros/edr/tests/test_utils/get_model_relation_for_test.sql +++ b/macros/edr/tests/test_utils/get_model_relation_for_test.sql @@ -4,6 +4,16 @@ {% do return(test_model) %} {% endif %} + {# If the test depends on a single model ID, try to get the relation from it #} + {% set depends_on_node_ids = test_node.get("depends_on", {}).get("nodes") %} + {% if depends_on_node_ids and depends_on_node_ids | length == 1 %} + {% set node = elementary.get_node(depends_on_node_ids[0]) %} + {% set relation = elementary.get_relation_from_node(node) %} + {% if relation %} + {% do return(relation) %} + {% endif %} + {% endif %} + {# Test model is a string, this might mean that a "where" parameter was passed to the test. In the heuristic below we rely on the fact that in this case the model jinja will have a very specific structure (see the "build_model_str" function in dbt-core) #} diff --git a/macros/edr/tests/test_utils/validate_unique_metric_names.sql b/macros/edr/tests/test_utils/validate_unique_metric_names.sql index 135e1fc06..13e90da41 100644 --- a/macros/edr/tests/test_utils/validate_unique_metric_names.sql +++ b/macros/edr/tests/test_utils/validate_unique_metric_names.sql @@ -10,7 +10,7 @@ {% do metric_names.append(metric.name) %} {% endfor %} - {% set test_node = context["model"] %} + {% set test_node = elementary.get_test_model() %} {% set parent_model_unique_ids = elementary.get_parent_model_unique_ids_from_test_node(test_node) %} {% for graph_node in graph.nodes.values() %} diff --git a/macros/utils/cross_db_utils/get_user_creation_query.sql b/macros/utils/cross_db_utils/get_user_creation_query.sql index f8fcbf03f..4154a7df7 100644 --- a/macros/utils/cross_db_utils/get_user_creation_query.sql +++ b/macros/utils/cross_db_utils/get_user_creation_query.sql @@ -155,16 +155,9 @@ GRANT VIEW REFLECTION ON {{ db_type }} "{{ db_name }}" TO USER "{{ parameters["u {% set db_name_to_type = {} %} {% for row in elementary.agate_to_dicts(elementary.run_query(dremio_databases_query)) %} - {% if row["database_name"] | lower not in configured_dbs %} - {% continue %} + {% if row["database_name"] | lower in configured_dbs and (row["database_name"] not in db_name_to_type or row["database_type"] == "CATALOG") %} + {% do db_name_to_type.update({row["database_name"]: row["database_type"]}) %} {% endif %} - - {# This condition guarantees that if there's at least one view in the DB we'll consider it as a catalog (see explanation above) #} - {% if row["database_type"] in db_name_to_type and row["database_type"] != "CATALOG" %} - {% continue %} - {% endif %} - - {% do db_name_to_type.update({row["database_name"]: row["database_type"]}) %} {% endfor %} {% do return(db_name_to_type) %} diff --git a/macros/utils/cross_db_utils/incremental_strategy.sql b/macros/utils/cross_db_utils/incremental_strategy.sql index c6ef526bd..5fc910390 100644 --- a/macros/utils/cross_db_utils/incremental_strategy.sql +++ b/macros/utils/cross_db_utils/incremental_strategy.sql @@ -10,6 +10,10 @@ {% do return("merge") %} {% endmacro %} +{%- macro redshift__get_default_incremental_strategy() %} + {% do return("merge") %} +{% endmacro %} + {% macro default__get_default_incremental_strategy() %} {% do return(none) %} {% endmacro %} diff --git a/macros/utils/data_types/normalize_data_type.sql b/macros/utils/data_types/normalize_data_type.sql index ebec23ed5..9e11ab164 100644 --- a/macros/utils/data_types/normalize_data_type.sql +++ b/macros/utils/data_types/normalize_data_type.sql @@ -6,16 +6,21 @@ {%- endif %} {%- if data_type is defined and data_type is not none %} - {%- if data_type in elementary.data_type_list('string') %} + {%- if elementary.is_data_type_of_normalized_type(data_type, 'string') %} {{ return('string') }} - {%- elif data_type in elementary.data_type_list('numeric') %} + {%- elif elementary.is_data_type_of_normalized_type(data_type, 'numeric') %} {{ return('numeric') }} - {%- elif data_type in elementary.data_type_list('timestamp') %} + {%- elif elementary.is_data_type_of_normalized_type(data_type, 'timestamp') %} {{ return('timestamp') }} - {%- elif data_type in elementary.data_type_list("boolean") %} + {%- elif elementary.is_data_type_of_normalized_type(data_type, "boolean") %} {{ return("boolean") }} {%- else %} {{ return('other') }} {% endif %} {%- endif %} {% endmacro %} + +{% macro is_data_type_of_normalized_type(data_type, normalized_type) %} + {% set data_type_list = elementary.data_type_list(normalized_type) | map('lower') %} + {% do return(data_type | lower in data_type_list) %} +{% endmacro %} diff --git a/macros/utils/graph/get_result_node.sql b/macros/utils/graph/get_result_node.sql index 237dc15de..8a751fdab 100644 --- a/macros/utils/graph/get_result_node.sql +++ b/macros/utils/graph/get_result_node.sql @@ -1,12 +1,13 @@ {% macro get_result_node(identifier, package_name='elementary') %} {% for result in results %} - {% if result.node.identifier == identifier %} + {% set node = elementary.get_node(result.unique_id) %} + {% if node.identifier == identifier %} {% if package_name %} - {% if result.node.package_name == package_name %} - {{ return(result.node) }} + {% if node.package_name == package_name %} + {{ return(node) }} {% endif %} {% else %} - {{ return(result.node) }} + {{ return(node) }} {% endif %} {% endif %} {% endfor %} diff --git a/macros/utils/graph/get_run_result_dict.sql b/macros/utils/graph/get_run_result_dict.sql index 0ec705161..f1ba2c758 100644 --- a/macros/utils/graph/get_run_result_dict.sql +++ b/macros/utils/graph/get_run_result_dict.sql @@ -3,13 +3,13 @@ {% set major = major | int %} {% set minor = minor | int %} {% if major < 1 or major == 1 and minor < 8 %} - {% do return(run_result.to_dict()) %} + {% do return(elementary.dbt_object_to_dict(run_result)) %} {% else %} {# There's a bug in dbt 1.8 which causes run_result.to_dict to fail on an exception #} {% set timing_dicts = [] %} {% if run_result.timing %} {% for item in run_result.timing %} - {% do timing_dicts.append(item.to_dict()) %} + {% do timing_dicts.append(elementary.dbt_object_to_dict(item)) %} {% endfor %} {% endif %} @@ -20,7 +20,7 @@ 'failures': run_result.failures, 'execution_time': run_result.execution_time, 'timing': timing_dicts, - 'node': run_result.node.to_dict() if run_result.node else None, + 'node': elementary.dbt_object_to_dict(run_result.node) if run_result.node else None, 'thread_id': run_result.thread_id }) %} {% endif %} diff --git a/macros/utils/run_queries/agate_to_dicts.sql b/macros/utils/run_queries/agate_to_dicts.sql index 6451b8769..0c4ac343f 100644 --- a/macros/utils/run_queries/agate_to_dicts.sql +++ b/macros/utils/run_queries/agate_to_dicts.sql @@ -1,14 +1,46 @@ {% macro agate_to_dicts(agate_table) %} - {% set column_types = agate_table.column_types %} + {% set rows = namespace(data=none) %} + {% if elementary.is_dbt_fusion() %} + {% set rows.data = agate_table %} + {% else %} + {% set rows.data = agate_table.rows %} + {% endif %} + {% set serializable_rows = [] %} - {% for agate_row in agate_table.rows %} + {% for agate_row in rows.data %} {% set serializable_row = {} %} {% for col_name, col_value in agate_row.items() %} - {% set serializable_col_value = column_types[loop.index0].jsonify(col_value) %} + {% set serializable_col_value = elementary.agate_val_serialize(col_value) %} {% set serializable_col_name = col_name | lower %} {% do serializable_row.update({serializable_col_name: serializable_col_value}) %} {% endfor %} {% do serializable_rows.append(serializable_row) %} {% endfor %} {{ return(serializable_rows) }} -{% endmacro %} \ No newline at end of file +{% endmacro %} + +{% macro agate_val_serialize(val) %} + {% if val.year is defined %} + {% do return(val.isoformat()) %} + {% endif %} + {% if elementary.edr_is_decimal(val) %} + {% do return(elementary.edr_serialize_decimal(val)) %} + {% endif %} + {% do return(val) %} +{% endmacro %} + +{% macro edr_is_decimal(val) %} + {# A hacky way to check if a value is of type Decimal, as there isn't a straightforward way to check that #} + {% do return(val is number and val.normalize is defined and val.normalize is not none) %} +{% endmacro %} + +{% macro edr_serialize_decimal(val) %} + {% set dec_tuple = val.normalize().as_tuple() %} + + {# A hacky way to standardize Decimals which are not JSON-serializable #} + {% if dec_tuple[2] == 0 %} + {% do return(val | string | int) %} + {% else %} + {% do return(val | string | float) %} + {% endif %} +{% endmacro %} diff --git a/macros/utils/run_queries/dbt_object_to_dict.sql b/macros/utils/run_queries/dbt_object_to_dict.sql new file mode 100644 index 000000000..1a2d248ed --- /dev/null +++ b/macros/utils/run_queries/dbt_object_to_dict.sql @@ -0,0 +1,7 @@ +{% macro dbt_object_to_dict(agate_table) %} + {% if elementary.is_dbt_fusion() %} + {% do return(agate_table) %} + {% endif %} + + {% do return(agate_table.to_dict()) %} +{% endmacro %} diff --git a/macros/utils/run_queries/run_query.sql b/macros/utils/run_queries/run_query.sql index 126935bf0..de9f84843 100644 --- a/macros/utils/run_queries/run_query.sql +++ b/macros/utils/run_queries/run_query.sql @@ -21,7 +21,13 @@ {% endmacro %} {% macro snowflake__format_query_with_metadata(query) %} - {# Snowflake removes leading comments #} + {#- Strip ; from last statement to prevent error in dbt-fusion -#} + {%- set query = query.strip() -%} + {%- if query.endswith(';') -%} + {%- set query = query[:-1] -%} + {%- endif -%} + + {# Snowflake removes leading comments, so comment is after the statement #} {{ query }} /* --ELEMENTARY-METADATA-- {{ elementary.get_elementary_query_metadata() | tojson }} --END-ELEMENTARY-METADATA-- */ {% endmacro %} diff --git a/macros/utils/table_operations/create_intermediate_relation.sql b/macros/utils/table_operations/create_intermediate_relation.sql index 7eb64a9df..7ff67eccb 100644 --- a/macros/utils/table_operations/create_intermediate_relation.sql +++ b/macros/utils/table_operations/create_intermediate_relation.sql @@ -1,5 +1,15 @@ {% macro create_intermediate_relation(base_relation, rows, temporary, like_columns=none) %} - {% set int_relation = elementary.edr_make_intermediate_relation(base_relation).incorporate(type='table') %} + {% set int_relation = elementary.edr_make_intermediate_relation(base_relation) %} + + {# It seems that in dbt-fusion we fail in case the database/schema are None and not passed explicitly + through the "path" param (happens in temp tables for some adapters). + So to be safe, we just pass all of them explicitly. #} + {% set int_relation = int_relation.incorporate( + type='table', + path={"database": int_relation.database, + "schema": int_relation.schema, + "table": int_relation.identifier} + ) %} {% if not elementary.has_temp_table_support() %} {% set temporary = false %} diff --git a/macros/utils/table_operations/create_or_replace.sql b/macros/utils/table_operations/create_or_replace.sql index 4c1399565..a25e877b5 100644 --- a/macros/utils/table_operations/create_or_replace.sql +++ b/macros/utils/table_operations/create_or_replace.sql @@ -4,39 +4,31 @@ {# Snowflake and Bigquery #} {% macro default__create_or_replace(temporary, relation, sql_query) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query) %} {% endmacro %} {% macro redshift__create_or_replace(temporary, relation, sql_query) %} - {% do dbt.drop_relation_if_exists(relation) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} - {% do adapter.commit() %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query, drop_first=true, should_commit=true) %} {% endmacro %} {% macro postgres__create_or_replace(temporary, relation, sql_query) %} {% do elementary.run_query("BEGIN") %} - {% do dbt.drop_relation_if_exists(relation) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query, drop_first=true) %} {% do elementary.run_query("COMMIT") %} {% endmacro %} {% macro spark__create_or_replace(temporary, relation, sql_query) %} - {% do dbt.drop_relation_if_exists(relation) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} - {% do adapter.commit() %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query, drop_first=true, should_commit=true) %} {% endmacro %} {% macro athena__create_or_replace(temporary, relation, sql_query) %} - {% do dbt.drop_relation_if_exists(relation) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query, drop_first=true) %} {% endmacro %} {% macro trino__create_or_replace(temporary, relation, sql_query) %} - {% do dbt.drop_relation_if_exists(relation) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query, drop_first=true) %} {% endmacro %} {% macro clickhouse__create_or_replace(temporary, relation, sql_query) %} - {% do dbt.drop_relation_if_exists(relation) %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, sql_query)) %} + {% do elementary.edr_create_table_as(temporary, relation, sql_query, drop_first=true) %} {% endmacro %} diff --git a/macros/utils/table_operations/create_table_as.sql b/macros/utils/table_operations/create_table_as.sql new file mode 100644 index 000000000..438119e00 --- /dev/null +++ b/macros/utils/table_operations/create_table_as.sql @@ -0,0 +1,69 @@ +{% macro edr_create_table_as(temporary, relation, sql_query, drop_first=false, should_commit=false) %} + {# This macro contains a simplified implementation that replaces our usage of + dbt.create_table_as and serves our needs. + This version also runs the query rather than return the SQL. + #} + + {% if drop_first %} + {% do dbt.drop_relation_if_exists(relation) %} + {% endif %} + + {% set create_query = elementary.edr_get_create_table_as_sql(temporary, relation, sql_query) %} + {% do elementary.run_query(create_query) %} + + {% if should_commit %} + {% do adapter.commit() %} + {% endif %} +{% endmacro %} + + +{% macro edr_get_create_table_as_sql(temporary, relation, sql_query) %} + {{ return(adapter.dispatch("edr_get_create_table_as_sql", "elementary")(temporary, relation, sql_query)) }} +{% endmacro %} + +{% macro default__edr_get_create_table_as_sql(temporary, relation, sql_query) %} + {{ dbt.get_create_table_as_sql(temporary, relation, sql_query) }} +{% endmacro %} + +{# Simplified versions for dbt-fusion supported adapters as the original dbt macro + no longer works outside of the scope of a model's materialization #} + +{% macro snowflake__edr_get_create_table_as_sql(temporary, relation, sql_query) %} + create or replace {% if temporary %} temporary {% endif %} table {{ relation }} + as {{ sql_query }} +{% endmacro %} + +{% macro bigquery__edr_get_create_table_as_sql(temporary, relation, sql_query) %} + create or replace table {{ relation }} + {% if temporary %} + options (expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 hour)) + {% endif %} + as {{ sql_query }} +{% endmacro %} + +{% macro postgres__edr_get_create_table_as_sql(temporary, relation, sql_query) %} + create {% if temporary %} temporary {% endif %} table {{ relation.include(database=(not temporary), schema=(not temporary)) }} + as {{ sql_query }} +{% endmacro %} + +{% macro databricks__edr_get_create_table_as_sql(temporary, relation, sql_query) %} + {% if temporary %} + {% if elementary.is_dbt_fusion() %} + {# + dbt fusion does not run Databricks statements in the same session, so we can't use temp + views. + (the view will be dropped later as has_temp_table_support returns False for Databricks) + + More details here - https://github.com/dbt-labs/dbt-fusion/blob/fa78a4099553a805af7629ac80be55e23e24bb4c/crates/dbt-loader/src/dbt_macro_assets/dbt-databricks/macros/relations/table/create.sql#L54 + #} + {% set relation_type = 'view' %} + {% else %} + {% set relation_type = 'temporary view' %} + {% endif %} + {% else %} + {% set relation_type = 'table' %} + {% endif %} + + create or replace {{ relation_type }} {{ relation }} + as {{ sql_query }} +{% endmacro %} diff --git a/macros/utils/table_operations/create_table_like.sql b/macros/utils/table_operations/create_table_like.sql index 97a96347b..8ee74edb0 100644 --- a/macros/utils/table_operations/create_table_like.sql +++ b/macros/utils/table_operations/create_table_like.sql @@ -11,5 +11,5 @@ FROM {{ like_relation }} WHERE 1 = 0 {% endset %} - {% do elementary.run_query(dbt.create_table_as(temporary, relation, empty_table_query)) %} + {% do elementary.edr_create_table_as(temporary, relation, empty_table_query) %} {% endmacro %} diff --git a/macros/utils/table_operations/create_temp_table.sql b/macros/utils/table_operations/create_temp_table.sql index 00de501d6..c1ad4bd61 100644 --- a/macros/utils/table_operations/create_temp_table.sql +++ b/macros/utils/table_operations/create_temp_table.sql @@ -8,12 +8,7 @@ identifier=table_name, type='table') -%} {% set temp_table_relation = elementary.make_temp_table_relation(temp_table_relation) %} - {% if temp_table_exists %} - {% do adapter.drop_relation(temp_table_relation) %} - {% do elementary.run_query(dbt.create_table_as(True, temp_table_relation, sql_query)) %} - {% else %} - {% do elementary.run_query(dbt.create_table_as(True, temp_table_relation, sql_query)) %} - {% endif %} + {% do elementary.edr_create_table_as(True, temp_table_relation, sql_query, drop_first=temp_table_exists) %} {{ return(temp_table_relation) }}{% endmacro %} {% macro snowflake__create_temp_table(database_name, schema_name, table_name, sql_query) %} diff --git a/macros/utils/table_operations/replace_table_data.sql b/macros/utils/table_operations/replace_table_data.sql index 6dcad9a06..1ed641769 100644 --- a/macros/utils/table_operations/replace_table_data.sql +++ b/macros/utils/table_operations/replace_table_data.sql @@ -5,7 +5,7 @@ {# Default (Bigquery & Snowflake) - upload data to a temp table, and then atomically replace the table with a new one #} {% macro default__replace_table_data(relation, rows) %} {% set intermediate_relation = elementary.create_intermediate_relation(relation, rows, temporary=True) %} - {% do elementary.run_query(dbt.create_table_as(False, relation, 'select * from {}'.format(intermediate_relation))) %} + {% do elementary.edr_create_table_as(False, relation, 'select * from {}'.format(intermediate_relation)) %} {% do adapter.drop_relation(intermediate_relation) %} {% endmacro %}