Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit b235260

Browse files
committed
add commit option
1 parent 48b34ae commit b235260

File tree

4 files changed

+88
-27
lines changed

4 files changed

+88
-27
lines changed

dbt/include/bigquery/macros/materializations/incremental.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
{%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%}
2727

2828
{% if strategy in ['insert_overwrite', 'microbatch'] %}
29-
{% if incremental_substrategy not in ['merge', 'delete+insert', 'copy_partitions'] %}
29+
{% if incremental_substrategy not in ['merge', 'commit+delete+insert', 'delete+insert', 'copy_partitions'] %}
3030
{% set wrong_fn -%}
31-
The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'.
31+
The 'incremental_substrategy' option has to be either 'merge' (default), 'commit+delete+insert', 'delete+insert' or 'copy_partitions'.
3232
{%- endset %}
3333
{% do exceptions.raise_compiler_error(wrong_fn) %}
3434
{% endif %}

dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,11 @@
8787
#}
8888

8989
{% if incremental_substrategy == 'delete+insert' %}
90-
-- 1. run insert_overwrite with delete+insert transaction strategy optimisation
90+
-- 1. run insert_overwrite with delete+insert (without a transaction) strategy optimisation
9191
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
92+
{% elif incremental_substrategy == 'commit+delete+insert' %}
93+
-- 1. run insert_overwrite with delete+insert (with a transaction) strategy optimisation
94+
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists, transactional=True) }};
9295
{% else %}
9396
-- 1. run insert_overwrite with merge strategy optimisation
9497
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
@@ -162,8 +165,11 @@
162165
);
163166

164167
{% if incremental_substrategy == 'delete+insert' %}
165-
-- 3. run insert_overwrite with the delete+insert transaction strategy optimisation
168+
-- 3. run insert_overwrite with the delete+insert (without a transaction) strategy optimisation
166169
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }};
170+
{% elif incremental_substrategy == 'commit+delete+insert' %}
171+
-- 3. run insert_overwrite with the delete+insert (with a transaction) strategy optimisation
172+
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], transactional=True) }};
167173
{% else %}
168174
-- 3. run insert_overwrite with the merge strategy optimisation
169175
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
@@ -177,39 +183,39 @@
177183

178184

179185

180-
{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header) -%}
186+
{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header, transactional=False) -%}
181187
{%- set predicates = [] if predicates is none else [] + predicates -%}
182188
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
183189
{%- set sql_header = config.get('sql_header', none) -%}
184190

185191
{{ sql_header if sql_header is not none and include_sql_header }}
186192

193+
{% if transactional %}
194+
-- We can rely on a multi-statement transaction to enable atomicity
195+
-- If something goes south, nothing is committed
196+
-- DELETE + INSERT allow for isolation lock
187197
begin
188-
begin transaction;
189-
190-
-- (as of Nov 2024)
191-
-- DELETE operations are free if the partition is a DATE
192-
-- * Not free if the partitions are granular (hourly, monthly)
193-
-- or some other conditions like subqueries and so on.
194-
delete from {{ target }} as DBT_INTERNAL_DEST
195-
where true
196-
{%- if predicates %}
197-
{% for predicate in predicates %}
198-
and {{ predicate }}
199-
{% endfor %}
200-
{%- endif -%};
201-
202-
203-
insert into {{ target }} ({{ dest_cols_csv }})
204-
(
205-
select {{ dest_cols_csv }}
206-
from {{ source }}
207-
);
198+
begin transaction ;
199+
{% endif %}
200+
-- DELETE operations are free https://cloud.google.com/bigquery/docs/using-dml-with-partitioned-tables#using_dml_delete_to_delete_partitions
201+
delete from {{ target }} as DBT_INTERNAL_DEST
202+
where true
203+
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %};
208204

209-
commit transaction;
205+
-- INSERT the data
206+
insert into {{ target }} ({{ dest_cols_csv }})
207+
(
208+
select {{ dest_cols_csv }}
209+
from {{ source }}
210+
)
211+
{% if transactional %}
212+
-- leaving the trailing ; out of the if as the calling macro already adds a leading ; to this output
213+
; commit transaction;
210214

211215
exception when error then
212-
raise using message = FORMAT("Error: %s", @@error.message);
216+
-- If things go south, abort and rollback
217+
raise using message = FORMAT("dbt error while commit+delete+instert: %s", @@error.message);
213218
rollback transaction;
214219
end
220+
{% endif %}
215221
{% endmacro %}

tests/functional/adapter/incremental/incremental_strategy_fixtures.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,55 @@
714714
select * from data
715715
""".lstrip()
716716

717+
overwrite_static_day_commit_delete_and_insert_sub_strategy_sql = """
718+
{% set partitions_to_replace = [
719+
"'2020-01-01'",
720+
"'2020-01-02'",
721+
] %}
722+
723+
{{
724+
config(
725+
materialized="incremental",
726+
incremental_strategy="insert_overwrite",
727+
incremental_substrategy='commit+delete+insert',
728+
cluster_by="id",
729+
partition_by={
730+
"field": "date_time",
731+
"data_type": "datetime",
732+
"granularity": "day"
733+
},
734+
partitions=partitions_to_replace,
735+
on_schema_change="sync_all_columns"
736+
)
737+
}}
738+
739+
740+
with data as (
741+
742+
{% if not is_incremental() %}
743+
744+
select 1 as id, cast('2020-01-01' as datetime) as date_time union all
745+
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
746+
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
747+
select 4 as id, cast('2020-01-01' as datetime) as date_time
748+
749+
{% else %}
750+
751+
-- we want to overwrite the 4 records in the 2020-01-01 partition
752+
-- with the 2 records below, but add two more in the 2020-01-02 partition
753+
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
754+
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
755+
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
756+
select 40 as id, cast('2020-01-02' as datetime) as date_time
757+
758+
{% endif %}
759+
760+
)
761+
762+
select * from data
763+
""".lstrip()
764+
765+
717766

718767
overwrite_static_day_merge_sub_strategy_sql = """
719768
{% set partitions_to_replace = [

tests/functional/adapter/incremental/test_incremental_strategies.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
overwrite_day_with_time_partition_datetime_sql,
2929
overwrite_static_day_sql,
3030
overwrite_static_day_delete_and_insert_sub_strategy_sql,
31+
overwrite_static_day_commit_delete_and_insert_sub_strategy_sql,
3132
overwrite_static_day_merge_sub_strategy_sql,
3233
overwrite_static_day_copy_partitions_sub_strategy_sql,
3334
)
@@ -54,6 +55,7 @@ def models(self):
5455
"incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql,
5556
"incremental_overwrite_static_substrategy_day.sql": overwrite_static_day_sql,
5657
"incremental_overwrite_static_substrategy_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql,
58+
"incremental_overwrite_static_substrategy_day_with_commitdeleteinsert.sql": overwrite_static_day_commit_delete_and_insert_sub_strategy_sql,
5759
"incremental_overwrite_static_substrategy_day_with_merge.sql": overwrite_static_day_merge_sub_strategy_sql,
5860
"incremental_overwrite_static_substrategy_day_with_copy_partitions.sql": overwrite_static_day_copy_partitions_sub_strategy_sql,
5961
}
@@ -95,6 +97,10 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se
9597
"incremental_overwrite_static_substrategy_day_with_deleteinsert",
9698
"incremental_overwrite_day_expected",
9799
),
100+
(
101+
"incremental_overwrite_static_substrategy_day_with_commitdeleteinsert",
102+
"incremental_overwrite_day_expected",
103+
),
98104
(
99105
"incremental_overwrite_static_substrategy_day_with_merge",
100106
"incremental_overwrite_day_expected",

0 commit comments

Comments
 (0)