From 7356c0b3dcbaa769233ac9e004f770f28c648794 Mon Sep 17 00:00:00 2001 From: Dmitry Volodin Date: Mon, 12 May 2025 20:06:44 +0200 Subject: [PATCH] Added explicit merge key and the test, new test passes, clarifying others Signed-off-by: Dmitry Volodin --- .../incremental/strategies.sql | 35 ++++++---- .../adapter/incremental/fixtures.py | 64 +++++++++++++++++++ .../test_incremental_strategies.py | 21 ++++++ 3 files changed, 109 insertions(+), 11 deletions(-) diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 1e1ff8097..82c19fa37 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -101,6 +101,15 @@ select {{source_cols_csv}} from {{ source_relation }} or not_matched_by_source_action_trimmed.startswith('update') ) %} + + {# + Merge actions can be set explicitly by providing a multiline code block + in the model config. In that case all matched/not matched actions settings + are ignored. + #} + {%- set merge_actions_explicit = config.get('merge_actions_explicit', '') | trim(' \n\t') -%} + + {%- set merge_actions_explicit_is_set = merge_actions_explicit | length > 0 %} {% if unique_key %} @@ -131,28 +140,32 @@ select {{source_cols_csv}} from {{ source_relation }} {{ source }} as {{ source_alias }} on {{ predicates | join('\n and ') }} - {%- if not skip_matched_step %} + {%- if merge_actions_explicit_is_set %} + {{ merge_actions_explicit }} + {%- else %} + {%- if not skip_matched_step %} when matched - {%- if matched_condition %} + {%- if matched_condition %} and ({{ matched_condition }}) - {%- endif %} + {%- endif %} then update set {{ get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias) }} - {%- endif %} - {%- if not skip_not_matched_step %} + {%- endif %} + {%- if not skip_not_matched_step %} when not matched - {%- if not_matched_condition %} + {%- if not_matched_condition %} and ({{ not_matched_condition }}) - {%- endif %} + {%- endif %} then insert {{ get_merge_insert(on_schema_change, source_columns, source_alias) }} - {%- endif %} - {%- if not_matched_by_source_action_is_set %} + {%- endif %} + {%- if not_matched_by_source_action_is_set %} when not matched by source - {%- if not_matched_by_source_condition %} + {%- if not_matched_by_source_condition %} and ({{ not_matched_by_source_condition }}) - {%- endif %} + {%- endif %} then {{ not_matched_by_source_action }} + {%- endif %} {%- endif %} {% endmacro %} diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index 18fcee40e..dae82cd70 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -283,6 +283,13 @@ 3,Dunkan,Aidaho,2 """ +merge_with_explicit_actions_expected = """id,first,second,updated_version +1,Jessica,Atreides,2 +2,Paul,Atreides,2 +4,Baron,Harkonnen,-1 +6,Emperor,Shaddam,1 +""" + base_model = """ {{ config( materialized = 'incremental' @@ -523,6 +530,63 @@ {% endif %} """ +merge_with_explicit_actions_model = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy='merge', + target_alias='t', + source_alias='s', + merge_actions_explicit=\"\"\" + when matched + and s.action = 'u' + and s.V > t.updated_version + then update set + t.first = s.first, + t.second = s.second, + t.updated_version = s.V + when matched + and s.action = 'd' + and s.V > t.updated_version + then delete + when not matched + then insert (id, first, second, updated_version) + values (s.id, s.first, s.second, s.V) + when not matched by source + and t.updated_version = 1 then delete + when not matched by source + then update set t.updated_version = -1 + \"\"\", +) }} + +{% if not is_incremental() %} + +-- data for first invocation of model + +select 1 as id, 'Vasya' as first, 'Pupkin' as second, 1 as updated_version -- planned to be updated +union all +select 2 as id, 'Paul' as first, 'Atreides' as second, 2 as updated_version -- planned to be kept +union all +select 3 as id, 'Dunkan' as first, 'Aidaho' as second, 1 as updated_version -- planned to be deleted +union all +select 4 as id, 'Baron' as first, 'Harkonnen' as second, 2 as updated_version -- should be updated +union all +select 5 as id, 'Raban' as first, '' as second, 1 as updated_version -- should be deleted + +{% else %} + +-- id, first, second, V, action + +select 1 as id, 'Jessica' as first, 'Atreides' as second, 2 as V, 'u' as action -- should update +union all +select 2 as id, 'Paul' as first, 'Atreides' as second, 1 as V, 'd' as action -- should skip, V<2 +union all +select 3 as id, 'Naknud' as first, 'Ohadia' as second, 2 as V, 'd' as action -- should delete +union all +select 6 as id, 'Emperor' as first, 'Shaddam' as second, 1 as V, 'i' as action -- should insert +{% endif %} +""" + simple_python_model = """ import pandas diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 45a24362f..00dbce3a4 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -349,3 +349,24 @@ def test_merge(self, project): project.adapter, ["merge_schema_evolution", "merge_schema_evolution_expected"], ) + + +class TestMergeWithExplicitActions(IncrementalBase): + @pytest.fixture(scope="class") + def seeds(self): + return { + "merge_with_explicit_actions_expected.csv": fixtures.merge_with_explicit_actions_expected, # noqa: E501 + } + + @pytest.fixture(scope="class") + def models(self): + return { + "merge_with_explicit_actions_model.sql": fixtures.merge_with_explicit_actions_model, + } + + def test_merge(self, project): + self.seed_and_run_twice() + util.check_relations_equal( + project.adapter, + ["merge_with_explicit_actions_model", "merge_with_explicit_actions_expected"], + )