Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 25e45c7

Browse files
committed
substrategy
fixtures Create Features-20241229-174752.yaml
1 parent f00bc38 commit 25e45c7

File tree

6 files changed

+179
-67
lines changed

6 files changed

+179
-67
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: add multiple substrategies for insert_overwrite and microbatch
3+
time: 2024-12-29T17:47:52.647374Z
4+
custom:
5+
Author: borjavb
6+
Issue: "1409"

dbt/include/bigquery/macros/materializations/incremental.sql

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,27 @@
1818
{% endmacro %}
1919

2020

21-
{% macro dbt_bigquery_validate_insert_overwrite_fn(config, strategy) %}
22-
{#-- Find and validate the function used for insert_overwrite #}
23-
{%- set insert_overwrite_fn = config.get('insert_overwrite_fn', none) -%}
24-
{%- set default_fn = 'merge' -%}
25-
{% if insert_overwrite_fn is none and strategy in ['insert_overwrite','microbatch']%}
26-
{{return (default_fn)}}
27-
{% elif insert_overwrite_fn is not in ["delete+insert"] and strategy in ['insert_overwrite','microbatch']%}
21+
{% macro dbt_bigquery_validate_incremental_substrategy(config, strategy, copy_partitions) %}
22+
{#-- Find and validate the function used for insert_overwrite
23+
Legacy behaviour was to pass the copy_partitions as part of the `partition_by` clause
24+
So we need to bring back that optionality into this validation.
25+
#}
26+
{%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%}
27+
28+
{% if strategy in ['insert_overwrite', 'microbatch'] %}
29+
{% if incremental_substrategy not in ['merge', 'delete+insert', 'copy_partitions'] %}
2830
{% set wrong_fn -%}
29-
The 'insert_overwrite_fn' option has to be either 'merge' (default) or 'delete+insert'.
31+
The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'.
3032
{%- endset %}
3133
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
32-
{% elif insert_overwrite_fn is not none and strategy not ['insert_overwrite','microbatch'] %}
34+
{% endif %}
35+
{% elif incremental_substrategy is not none%}
3336
{% set wrong_strategy_msg -%}
34-
The 'insert_overwrite_fn' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
37+
The 'incremental_substrategy' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
3538
{%- endset %}
3639
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
37-
{% else %}
38-
{% return(insert_overwrite_fn) %}
39-
{% endif %}
40+
{% endif %}
41+
{{ return(incremental_substrategy) %}}
4042
{% endmacro %}
4143

4244
{% macro source_sql_with_partition(partition_by, source_sql) %}
@@ -65,19 +67,19 @@
6567
{% endmacro %}
6668

6769
{% macro bq_generate_incremental_build_sql(
68-
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates, insert_overwrite_fn
70+
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates,
6971
) %}
7072
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
7173
{% if strategy == 'insert_overwrite' %}
7274

7375
{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
74-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
76+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
7577
) %}
7678

7779
{% elif strategy == 'microbatch' %}
7880

7981
{% set build_sql = bq_generate_microbatch_build_sql(
80-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
82+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
8183
) %}
8284

8385
{% else %} {# strategy == 'merge' #}
@@ -103,14 +105,16 @@
103105

104106
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
105107
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}
106-
{#-- Validate early that the fn strategy is set correctly for insert_overwrite--#}
107-
{% set insert_overwrite_fn = dbt_bigquery_validate_insert_overwrite_fn(config, strategy) -%}
108108

109109
{%- set raw_partition_by = config.get('partition_by', none) -%}
110110
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
111111
{%- set partitions = config.get('partitions', none) -%}
112112
{%- set cluster_by = config.get('cluster_by', none) -%}
113113

114+
{#-- Validate early that the incremental substrategy is set correctly for insert_overwrite or microbatch--#}
115+
{% set incremental_substrategy = dbt_bigquery_validate_incremental_substrategy(config, strategy, partition_by.copy_partitions) -%}
116+
117+
114118
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
115119
{% set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) %}
116120

@@ -119,13 +123,8 @@
119123

120124
{{ run_hooks(pre_hooks) }}
121125

122-
{% if partition_by.copy_partitions is true and strategy not in ['insert_overwrite', 'microbatch'] %} {#-- We can't copy partitions with merge strategy --#}
123-
{% set wrong_strategy_msg -%}
124-
The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
125-
{%- endset %}
126-
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
127126

128-
{% elif existing_relation is none %}
127+
{% if existing_relation is none %}
129128
{%- call statement('main', language=language) -%}
130129
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
131130
{%- endcall -%}
@@ -177,7 +176,7 @@
177176
{% endif %}
178177

179178
{% set build_sql = bq_generate_incremental_build_sql(
180-
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates, insert_overwrite_fn
179+
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates,
181180
) %}
182181

183182
{%- call statement('main') -%}

dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{% macro bq_generate_incremental_insert_overwrite_build_sql(
2-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
2+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
33
) %}
44
{% if partition_by is none %}
55
{% set missing_partition_msg -%}
@@ -9,7 +9,7 @@
99
{% endif %}
1010

1111
{% set build_sql = bq_insert_overwrite_sql(
12-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
12+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
1313
) %}
1414

1515
{{ return(build_sql) }}
@@ -38,17 +38,17 @@
3838
{% endmacro %}
3939

4040
{% macro bq_insert_overwrite_sql(
41-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
41+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
4242
) %}
4343
{% if partitions is not none and partitions != [] %} {# static #}
44-
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }}
44+
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy) }}
4545
{% else %} {# dynamic #}
46-
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }}
46+
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) }}
4747
{% endif %}
4848
{% endmacro %}
4949

5050
{% macro bq_static_insert_overwrite_sql(
51-
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
51+
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
5252
) %}
5353

5454
{% set predicate -%}
@@ -75,35 +75,35 @@
7575
)
7676
{%- endset -%}
7777

78-
{% if copy_partitions %}
78+
{% if incremental_substrategy == 'copy_partitions' %}
7979
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
8080
{% else %}
8181

82-
{#-- In case we're putting the model SQL _directly_ into the MERGE statement,
83-
we need to prepend the MERGE statement with the user-configured sql_header,
84-
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
85-
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
86-
sql_header is included by the create_table_as macro.
87-
#}
88-
89-
{% if insert_overwrite_fn == 'delete+insert' %}
90-
-- 1. run insert_overwrite with delete+insert transaction strategy optimisation
91-
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
92-
{% else %}
93-
-- 1. run insert_overwrite with merge strategy optimisation
94-
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
95-
{% endif %}
96-
97-
{%- if tmp_relation_exists -%}
98-
-- 2. clean up the temp table
99-
drop table if exists {{ tmp_relation }};
100-
{%- endif -%}
82+
{#-- In case we're putting the model SQL _directly_ into the MERGE/insert+delete transaction,
83+
we need to prepend the merge/transaction statement with the user-configured sql_header,
84+
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
85+
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
86+
sql_header is included by the create_table_as macro.
87+
#}
88+
89+
{% if incremental_substrategy == 'delete+insert' %}
90+
-- 1. run insert_overwrite with delete+insert transaction strategy optimisation
91+
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
92+
{% else %}
93+
-- 1. run insert_overwrite with merge strategy optimisation
94+
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
95+
{% endif %}
96+
97+
{%- if tmp_relation_exists -%}
98+
-- 2. clean up the temp table
99+
drop table if exists {{ tmp_relation }};
100+
{%- endif -%}
101101

102-
{% endif %}
102+
{% endif %}
103103
{% endmacro %}
104104

105105
{% macro bq_dynamic_copy_partitions_insert_overwrite_sql(
106-
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
106+
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists
107107
) %}
108108
{%- if tmp_relation_exists is false -%}
109109
{# We run temp table creation in a separated script to move to partitions copy if it does not already exist #}
@@ -123,9 +123,9 @@
123123
drop table if exists {{ tmp_relation }}
124124
{% endmacro %}
125125

126-
{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
127-
{%- if copy_partitions is true %}
128-
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
126+
{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) %}
127+
{% if incremental_substrategy == 'copy_partitions' %}
128+
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }}
129129
{% else -%}
130130
{% set predicate -%}
131131
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
@@ -161,7 +161,7 @@
161161
from {{ tmp_relation }}
162162
);
163163

164-
{% if insert_overwrite_fn == 'delete+insert' %}
164+
{% if incremental_substrategy == 'delete+insert' %}
165165
-- 3. run insert_overwrite with the delete+insert transaction strategy optimisation
166166
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }};
167167
{% else %}
@@ -185,11 +185,11 @@
185185
{{ sql_header if sql_header is not none and include_sql_header }}
186186

187187
begin
188-
begin transaction;
188+
begin transaction;
189189

190190
-- (as of Nov 2024)
191-
-- DELETE operations are free if the partition is a DATE
192-
-- * Not free if the partitions are granular (hourly, monthly)
191+
-- DELETE operations are free if the partition is a DATE
192+
-- * Not free if the partitions are granular (hourly, monthly)
193193
-- or some other conditions like subqueries and so on.
194194
delete from {{ target }} as DBT_INTERNAL_DEST
195195
where true

dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
{% endmacro %}
1919

2020
{% macro bq_generate_microbatch_build_sql(
21-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
21+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
2222
) %}
2323
{% set build_sql = bq_insert_overwrite_sql(
24-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
24+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
2525
) %}
2626

2727
{{ return(build_sql) }}

tests/functional/adapter/incremental/incremental_strategy_fixtures.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,105 @@
676676
config(
677677
materialized="incremental",
678678
incremental_strategy="insert_overwrite",
679-
insert_overwrite_fn='delete+insert',
679+
incremental_substrategy='delete+insert',
680+
cluster_by="id",
681+
partition_by={
682+
"field": "date_time",
683+
"data_type": "datetime",
684+
"granularity": "day"
685+
},
686+
partitions=partitions_to_replace,
687+
on_schema_change="sync_all_columns"
688+
)
689+
}}
690+
691+
692+
with data as (
693+
694+
{% if not is_incremental() %}
695+
696+
select 1 as id, cast('2020-01-01' as datetime) as date_time union all
697+
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
698+
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
699+
select 4 as id, cast('2020-01-01' as datetime) as date_time
700+
701+
{% else %}
702+
703+
-- we want to overwrite the 4 records in the 2020-01-01 partition
704+
-- with the 2 records below, but add two more in the 2020-01-02 partition
705+
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
706+
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
707+
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
708+
select 40 as id, cast('2020-01-02' as datetime) as date_time
709+
710+
{% endif %}
711+
712+
)
713+
714+
select * from data
715+
""".lstrip()
716+
717+
718+
overwrite_static_day_merge_sub_strategy_sql = """
719+
{% set partitions_to_replace = [
720+
"'2020-01-01'",
721+
"'2020-01-02'",
722+
] %}
723+
724+
{{
725+
config(
726+
materialized="incremental",
727+
incremental_strategy="insert_overwrite",
728+
incremental_substrategy='merge',
729+
cluster_by="id",
730+
partition_by={
731+
"field": "date_time",
732+
"data_type": "datetime",
733+
"granularity": "day"
734+
},
735+
partitions=partitions_to_replace,
736+
on_schema_change="sync_all_columns"
737+
)
738+
}}
739+
740+
741+
with data as (
742+
743+
{% if not is_incremental() %}
744+
745+
select 1 as id, cast('2020-01-01' as datetime) as date_time union all
746+
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
747+
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
748+
select 4 as id, cast('2020-01-01' as datetime) as date_time
749+
750+
{% else %}
751+
752+
-- we want to overwrite the 4 records in the 2020-01-01 partition
753+
-- with the 2 records below, but add two more in the 2020-01-02 partition
754+
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
755+
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
756+
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
757+
select 40 as id, cast('2020-01-02' as datetime) as date_time
758+
759+
{% endif %}
760+
761+
)
762+
763+
select * from data
764+
""".lstrip()
765+
766+
767+
overwrite_static_day_copy_partitions_sub_strategy_sql = """
768+
{% set partitions_to_replace = [
769+
"'2020-01-01'",
770+
"'2020-01-02'",
771+
] %}
772+
773+
{{
774+
config(
775+
materialized="incremental",
776+
incremental_strategy="insert_overwrite",
777+
incremental_substrategy='copy_partitions',
680778
cluster_by="id",
681779
partition_by={
682780
"field": "date_time",

0 commit comments

Comments
 (0)