diff --git a/.changes/unreleased/Features-20241229-174752.yaml b/.changes/unreleased/Features-20241229-174752.yaml new file mode 100644 index 000000000..6d2a01ae8 --- /dev/null +++ b/.changes/unreleased/Features-20241229-174752.yaml @@ -0,0 +1,6 @@ +kind: Features +body: add multiple substrategies for insert_overwrite and microbatch +time: 2024-12-29T17:47:52.647374Z +custom: + Author: borjavb + Issue: "1409" diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 25a83b0c6..5ed5dfbdb 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -17,6 +17,43 @@ {% do return(strategy) %} {% endmacro %} +{% macro dbt_bigquery_optimize_substrategy(partitions) %} + {#-- Set the optimal substrategy based on the nature of the overwrite#} + {% if partitions is not none and partitions != [] %} {# static #} + {{ return("delete+insert") }} + {% else %} {# dynamic #} + {{ return("copy_partitions") }} + {% endif %} +{% endmacro %} + +{% macro dbt_bigquery_validate_incremental_substrategy(config, strategy, copy_partitions, partitions) %} + {# -- Find and validate the function used for insert_overwrite + Legacy behaviour was to pass the copy_partitions as part of the `partition_by` clause + So we need to bring back that optionality into this validation. + #} + {%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%} + + {% if strategy in ['insert_overwrite', 'microbatch'] %} + {% if incremental_substrategy not in ['merge', 'commit+delete+insert', 'delete+insert', 'copy_partitions', 'optimal'] %} + {% set wrong_fn -%} + The 'incremental_substrategy' option has to be either 'merge' (default), 'commit+delete+insert', 'delete+insert', 'copy_partitions' or 'optimal' + {%- endset %} + {% do exceptions.raise_compiler_error(wrong_fn) %} + {% endif %} + {% elif incremental_substrategy is not none%} + {% set wrong_strategy_msg -%} + The 'incremental_substrategy' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. + {%- endset %} + {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} + {% endif %} + + {% if incremental_substrategy == 'optimal' %} + {{ return(dbt_bigquery_optimize_substrategy(partitions)) }} + {% else %} + {{ return(incremental_substrategy) }} + {% endif %} +{% endmacro %} + {% macro source_sql_with_partition(partition_by, source_sql) %} {%- if partition_by.time_ingestion_partitioning %} @@ -43,19 +80,19 @@ {% endmacro %} {% macro bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} {% set build_sql = bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% elif strategy == 'microbatch' %} {% set build_sql = bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% else %} {# strategy == 'merge' #} @@ -87,6 +124,10 @@ {%- set partitions = config.get('partitions', none) -%} {%- set cluster_by = config.get('cluster_by', none) -%} + {#-- Validate early that the incremental substrategy is set correctly for insert_overwrite or microbatch--#} + {% set incremental_substrategy = dbt_bigquery_validate_incremental_substrategy(config, strategy, partition_by.copy_partitions, partitions) -%} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) %} @@ -95,13 +136,8 @@ {{ run_hooks(pre_hooks) }} - {% if partition_by.copy_partitions is true and strategy not in ['insert_overwrite', 'microbatch'] %} {#-- We can't copy partitions with merge strategy --#} - {% set wrong_strategy_msg -%} - The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. - {%- endset %} - {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% elif existing_relation is none %} + {% if existing_relation is none %} {%- call statement('main', language=language) -%} {{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }} {%- endcall -%} @@ -153,7 +189,7 @@ {% endif %} {% set build_sql = bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates + strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates ) %} {%- call statement('main') -%} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 3ba67931e..02c40300c 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -1,5 +1,5 @@ {% macro bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% if partition_by is none %} {% set missing_partition_msg -%} @@ -9,7 +9,7 @@ {% endif %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {{ return(build_sql) }} @@ -38,17 +38,17 @@ {% endmacro %} {% macro bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% if partitions is not none and partitions != [] %} {# static #} - {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }} + {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy) }} {% else %} {# dynamic #} - {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} + {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) }} {% endif %} {% endmacro %} {% macro bq_static_insert_overwrite_sql( - tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% set predicate -%} @@ -75,32 +75,41 @@ ) {%- endset -%} - {% if copy_partitions %} + {% if incremental_substrategy == 'copy_partitions' %} {% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} {% else %} - {#-- In case we're putting the model SQL _directly_ into the MERGE statement, - we need to prepend the MERGE statement with the user-configured sql_header, - which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) - in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the - sql_header is included by the create_table_as macro. - #} - -- 1. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; - - {%- if tmp_relation_exists -%} - -- 2. clean up the temp table - drop table if exists {{ tmp_relation }}; - {%- endif -%} + {#-- In case we're putting the model SQL _directly_ into the MERGE/insert+delete transaction, + we need to prepend the merge/transaction statement with the user-configured sql_header, + which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) + in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the + sql_header is included by the create_table_as macro. + #} + + {% if incremental_substrategy == 'delete+insert' %} + -- 1. run insert_overwrite with delete+insert (without a transaction) strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% elif incremental_substrategy == 'commit+delete+insert' %} + -- 1. run insert_overwrite with delete+insert (with a transaction) strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists, transactional=True) }}; + {% else %} + -- 1. run insert_overwrite with merge strategy optimisation + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% endif %} + + {%- if tmp_relation_exists -%} + -- 2. clean up the temp table + drop table if exists {{ tmp_relation }}; + {%- endif -%} - {% endif %} + {% endif %} {% endmacro %} {% macro bq_dynamic_copy_partitions_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists ) %} {%- if tmp_relation_exists is false -%} - {# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #} + {# We run temp table creation in a separated script to move to partitions copy if it does not already exist #} {%- call statement('create_tmp_relation_for_copy', language='sql') -%} {{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql') }} @@ -117,9 +126,9 @@ drop table if exists {{ tmp_relation }} {% endmacro %} -{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %} - {%- if copy_partitions is true %} - {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} +{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) %} + {% if incremental_substrategy == 'copy_partitions' %} + {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }} {% else -%} {% set predicate -%} {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) @@ -155,12 +164,58 @@ from {{ tmp_relation }} ); - -- 3. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; - + {% if incremental_substrategy == 'delete+insert' %} + -- 3. run insert_overwrite with the delete+insert (without a transaction) strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% elif incremental_substrategy == 'commit+delete+insert' %} + -- 3. run insert_overwrite with the delete+insert (with a transaction) strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], transactional=True) }}; + {% else %} + -- 3. run insert_overwrite with the merge strategy optimisation + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% endif %} -- 4. clean up the temp table drop table if exists {{ tmp_relation }} {% endif %} {% endmacro %} + + + +{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header, transactional=False) -%} + {%- set predicates = [] if predicates is none else [] + predicates -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none and include_sql_header }} + + {% if transactional %} + -- We can rely on a multi-statement transaction to enable atomicity + -- If something goes south, nothing is committed + -- DELETE + INSERT allow for isolation lock + begin + begin transaction ; + {% endif %} + -- DELETE operations are free https://cloud.google.com/bigquery/docs/using-dml-with-partitioned-tables#using_dml_delete_to_delete_partitions + delete from {{ target }} as DBT_INTERNAL_DEST + where true + {% if predicates %} and {{ predicates | join(' and ') }} {% endif %}; + + -- INSERT the data + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) + {% if transactional %} + -- leaving the trailing ; out of the if as the calling macro already adds a leading ; to this output + ; commit transaction; + + exception when error then + -- If things go south, abort and rollback + raise using message = FORMAT("dbt error while commit+delete+instert: %s", @@error.message); + rollback transaction; + end + {% endif %} +{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql index d4c4b7453..21c52c4c6 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -18,10 +18,10 @@ {% endmacro %} {% macro bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {{ return(build_sql) }} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 21d5f15b6..72756f43b 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -647,3 +647,214 @@ }} select * from {{ ref('input_model') }} """ + +microbatch_model_no_unique_id_copy_partitions_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + partition_by={ + 'field': 'event_time', + 'data_type': 'timestamp', + 'granularity': 'day', + 'copy_partitions': true + }, + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) + ) +}} +select * from {{ ref('input_model') }} +""" + +overwrite_static_day_delete_and_insert_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='delete+insert', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + +overwrite_static_day_commit_delete_and_insert_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='commit+delete+insert', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + +overwrite_static_day_merge_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='merge', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + + +overwrite_static_day_copy_partitions_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='copy_partitions', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 1a339d601..dd6ab7196 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -27,6 +27,10 @@ overwrite_day_with_time_ingestion_sql, overwrite_day_with_time_partition_datetime_sql, overwrite_static_day_sql, + overwrite_static_day_delete_and_insert_sub_strategy_sql, + overwrite_static_day_commit_delete_and_insert_sub_strategy_sql, + overwrite_static_day_merge_sub_strategy_sql, + overwrite_static_day_copy_partitions_sub_strategy_sql, ) @@ -49,7 +53,11 @@ def models(self): "incremental_overwrite_time.sql": overwrite_time_sql, "incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql, "incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql, - "incremental_overwrite_static_day.sql": overwrite_static_day_sql, + "incremental_overwrite_static_substrategy_day.sql": overwrite_static_day_sql, + "incremental_overwrite_static_substrategy_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_commitdeleteinsert.sql": overwrite_static_day_commit_delete_and_insert_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_merge.sql": overwrite_static_day_merge_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_copy_partitions.sql": overwrite_static_day_copy_partitions_sub_strategy_sql, } @pytest.fixture(scope="class") @@ -84,7 +92,23 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_datetime", "incremental_overwrite_day_with_time_partition_expected", ), - ("incremental_overwrite_static_day", "incremental_overwrite_day_expected"), + ("incremental_overwrite_static_substrategy_day", "incremental_overwrite_day_expected"), + ( + "incremental_overwrite_static_substrategy_day_with_deleteinsert", + "incremental_overwrite_day_expected", + ), + ( + "incremental_overwrite_static_substrategy_day_with_commitdeleteinsert", + "incremental_overwrite_day_expected", + ), + ( + "incremental_overwrite_static_substrategy_day_with_merge", + "incremental_overwrite_day_expected", + ), + ( + "incremental_overwrite_static_substrategy_day_with_copy_partitions", + "incremental_overwrite_day_expected", + ), ] db_with_schema = f"{project.database}.{project.test_schema}" for incremental_strategy in incremental_strategies: