Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ select {{source_cols_csv}} from {{ source_relation }}
or not_matched_by_source_action_trimmed.startswith('update')
)
%}

{#
Merge actions can be set explicitly by providing a multiline code block
in the model config. In that case all matched/not matched actions settings
are ignored.
#}
{%- set merge_actions_explicit = config.get('merge_actions_explicit', '') | trim(' \n\t') -%}

{%- set merge_actions_explicit_is_set = merge_actions_explicit | length > 0 %}


{% if unique_key %}
Expand Down Expand Up @@ -131,28 +140,32 @@ select {{source_cols_csv}} from {{ source_relation }}
{{ source }} as {{ source_alias }}
on
{{ predicates | join('\n and ') }}
{%- if not skip_matched_step %}
{%- if merge_actions_explicit_is_set %}
{{ merge_actions_explicit }}
{%- else %}
{%- if not skip_matched_step %}
when matched
{%- if matched_condition %}
{%- if matched_condition %}
and ({{ matched_condition }})
{%- endif %}
{%- endif %}
then update set
{{ get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias) }}
{%- endif %}
{%- if not skip_not_matched_step %}
{%- endif %}
{%- if not skip_not_matched_step %}
when not matched
{%- if not_matched_condition %}
{%- if not_matched_condition %}
and ({{ not_matched_condition }})
{%- endif %}
{%- endif %}
then insert
{{ get_merge_insert(on_schema_change, source_columns, source_alias) }}
{%- endif %}
{%- if not_matched_by_source_action_is_set %}
{%- endif %}
{%- if not_matched_by_source_action_is_set %}
when not matched by source
{%- if not_matched_by_source_condition %}
{%- if not_matched_by_source_condition %}
and ({{ not_matched_by_source_condition }})
{%- endif %}
{%- endif %}
then {{ not_matched_by_source_action }}
{%- endif %}
{%- endif %}
{% endmacro %}

Expand Down
64 changes: 64 additions & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@
3,Dunkan,Aidaho,2
"""

merge_with_explicit_actions_expected = """id,first,second,updated_version
1,Jessica,Atreides,2
2,Paul,Atreides,2
4,Baron,Harkonnen,-1
6,Emperor,Shaddam,1
"""

base_model = """
{{ config(
materialized = 'incremental'
Expand Down Expand Up @@ -523,6 +530,63 @@
{% endif %}
"""

merge_with_explicit_actions_model = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
merge_actions_explicit=\"\"\"
when matched
and s.action = 'u'
and s.V > t.updated_version
then update set
t.first = s.first,
t.second = s.second,
t.updated_version = s.V
when matched
and s.action = 'd'
and s.V > t.updated_version
then delete
when not matched
then insert (id, first, second, updated_version)
values (s.id, s.first, s.second, s.V)
when not matched by source
and t.updated_version = 1 then delete
when not matched by source
then update set t.updated_version = -1
\"\"\",
) }}

{% if not is_incremental() %}

-- data for first invocation of model

select 1 as id, 'Vasya' as first, 'Pupkin' as second, 1 as updated_version -- planned to be updated
union all
select 2 as id, 'Paul' as first, 'Atreides' as second, 2 as updated_version -- planned to be kept
union all
select 3 as id, 'Dunkan' as first, 'Aidaho' as second, 1 as updated_version -- planned to be deleted
union all
select 4 as id, 'Baron' as first, 'Harkonnen' as second, 2 as updated_version -- should be updated
union all
select 5 as id, 'Raban' as first, '' as second, 1 as updated_version -- should be deleted

{% else %}

-- id, first, second, V, action

select 1 as id, 'Jessica' as first, 'Atreides' as second, 2 as V, 'u' as action -- should update
union all
select 2 as id, 'Paul' as first, 'Atreides' as second, 1 as V, 'd' as action -- should skip, V<2
union all
select 3 as id, 'Naknud' as first, 'Ohadia' as second, 2 as V, 'd' as action -- should delete
union all
select 6 as id, 'Emperor' as first, 'Shaddam' as second, 1 as V, 'i' as action -- should insert
{% endif %}
"""

simple_python_model = """
import pandas

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,24 @@ def test_merge(self, project):
project.adapter,
["merge_schema_evolution", "merge_schema_evolution_expected"],
)


class TestMergeWithExplicitActions(IncrementalBase):
@pytest.fixture(scope="class")
def seeds(self):
return {
"merge_with_explicit_actions_expected.csv": fixtures.merge_with_explicit_actions_expected, # noqa: E501
}

@pytest.fixture(scope="class")
def models(self):
return {
"merge_with_explicit_actions_model.sql": fixtures.merge_with_explicit_actions_model,
}

def test_merge(self, project):
self.seed_and_run_twice()
util.check_relations_equal(
project.adapter,
["merge_with_explicit_actions_model", "merge_with_explicit_actions_expected"],
)