Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 4a86d9e

Browse files
committed
additional config for insert_overwrite
1 parent 83bb413 commit 4a86d9e

File tree

3 files changed

+91
-18
lines changed

3 files changed

+91
-18
lines changed

dbt/include/bigquery/macros/materializations/incremental.sql

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,28 @@
1717
{% do return(strategy) %}
1818
{% endmacro %}
1919

20+
21+
{% macro dbt_bigquery_validate_insert_overwrite_fn(config, strategy) %}
22+
{#-- Find and validate the function used for insert_overwrite #}
23+
{%- set insert_overwrite_fn = config.get('insert_overwrite_fn', none) -%}
24+
{%- set default_fn = 'merge' -%}
25+
{% if insert_overwrite_fn is none and strategy in ['insert_overwrite','microbatch']%}
26+
{{return (default_fn)}}
27+
{% elif insert_overwrite_fn is not in ["delete+insert"] and strategy in ['insert_overwrite','microbatch']%}
28+
{% set wrong_fn -%}
29+
The 'insert_overwrite_fn' option has to be either 'merge' (default) or 'delete+insert'.
30+
{%- endset %}
31+
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
32+
{% elif insert_overwrite_fn is not none and strategy not ['insert_overwrite','microbatch'] %}
33+
{% set wrong_strategy_msg -%}
34+
The 'insert_overwrite_fn' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
35+
{%- endset %}
36+
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
37+
{% else %}
38+
{% return(insert_overwrite_fn) %}
39+
{% endif %}
40+
{% endmacro %}
41+
2042
{% macro source_sql_with_partition(partition_by, source_sql) %}
2143

2244
{%- if partition_by.time_ingestion_partitioning %}
@@ -43,19 +65,19 @@
4365
{% endmacro %}
4466

4567
{% macro bq_generate_incremental_build_sql(
46-
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates
68+
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates, insert_overwrite_fn
4769
) %}
4870
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
4971
{% if strategy == 'insert_overwrite' %}
5072

5173
{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
52-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
74+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
5375
) %}
5476

5577
{% elif strategy == 'microbatch' %}
5678

5779
{% set build_sql = bq_generate_microbatch_build_sql(
58-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
80+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
5981
) %}
6082

6183
{% else %} {# strategy == 'merge' #}
@@ -81,6 +103,8 @@
81103

82104
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
83105
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}
106+
{#-- Validate early that the fn strategy is set correctly for insert_overwrite--#}
107+
{% set insert_overwrite_fn = dbt_bigquery_validate_insert_overwrite_fn(config, strategy) -%}
84108

85109
{%- set raw_partition_by = config.get('partition_by', none) -%}
86110
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
@@ -153,7 +177,7 @@
153177
{% endif %}
154178

155179
{% set build_sql = bq_generate_incremental_build_sql(
156-
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates
180+
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates, insert_overwrite_fn
157181
) %}
158182

159183
{%- call statement('main') -%}

dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{% macro bq_generate_incremental_insert_overwrite_build_sql(
2-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
2+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
33
) %}
44
{% if partition_by is none %}
55
{% set missing_partition_msg -%}
@@ -9,7 +9,7 @@
99
{% endif %}
1010

1111
{% set build_sql = bq_insert_overwrite_sql(
12-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
12+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
1313
) %}
1414

1515
{{ return(build_sql) }}
@@ -38,17 +38,17 @@
3838
{% endmacro %}
3939

4040
{% macro bq_insert_overwrite_sql(
41-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
41+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
4242
) %}
4343
{% if partitions is not none and partitions != [] %} {# static #}
44-
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }}
44+
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }}
4545
{% else %} {# dynamic #}
46-
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
46+
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }}
4747
{% endif %}
4848
{% endmacro %}
4949

5050
{% macro bq_static_insert_overwrite_sql(
51-
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
51+
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
5252
) %}
5353

5454
{% set predicate -%}
@@ -85,8 +85,14 @@
8585
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
8686
sql_header is included by the create_table_as macro.
8787
#}
88-
-- 1. run the merge statement
89-
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
88+
89+
{% if insert_overwrite_fn == 'delete+insert' %}
90+
-- 1. run insert_overwrite with delete+insert transaction strategy optimisation
91+
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
92+
{% else %}
93+
-- 1. run insert_overwrite with merge strategy optimisation
94+
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
95+
{% endif %}
9096

9197
{%- if tmp_relation_exists -%}
9298
-- 2. clean up the temp table
@@ -100,7 +106,7 @@
100106
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
101107
) %}
102108
{%- if tmp_relation_exists is false -%}
103-
{# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #}
109+
{# We run temp table creation in a separated script to move to partitions copy if it does not already exist #}
104110
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
105111
{{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql')
106112
}}
@@ -155,12 +161,55 @@
155161
from {{ tmp_relation }}
156162
);
157163

158-
-- 3. run the merge statement
159-
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
160-
164+
{% if insert_overwrite_fn == 'delete+insert' %}
165+
-- 3. run insert_overwrite with the delete+insert transaction strategy optimisation
166+
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }};
167+
{% else %}
168+
-- 3. run insert_overwrite with the merge strategy optimisation
169+
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
170+
{% endif %}
161171
-- 4. clean up the temp table
162172
drop table if exists {{ tmp_relation }}
163173

164174
{% endif %}
165175

166176
{% endmacro %}
177+
178+
179+
180+
{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header) -%}
181+
{%- set predicates = [] if predicates is none else [] + predicates -%}
182+
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
183+
{%- set sql_header = config.get('sql_header', none) -%}
184+
185+
{{ sql_header if sql_header is not none and include_sql_header }}
186+
187+
begin
188+
begin transaction;
189+
190+
-- (as of Nov 2024)
191+
-- DELETE operations are free if the partition is a DATE
192+
-- * Not free if the partitions are granular (hourly, monthly)
193+
-- or some other conditions like subqueries and so on.
194+
delete from {{ target }} as DBT_INTERNAL_DEST
195+
where true
196+
{%- if predicates %}
197+
{% for predicate in predicates %}
198+
and {{ predicate }}
199+
{% endfor %}
200+
{%- endif -%};
201+
202+
203+
insert into {{ target }} ({{ dest_cols_csv }})
204+
(
205+
select {{ dest_cols_csv }}
206+
from {{ source }}
207+
);
208+
209+
commit transaction;
210+
211+
exception when error then
212+
raise using message = FORMAT("Error: %s", @@error.message);
213+
rollback transaction;
214+
end
215+
{% endmacro %}

dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
{% endmacro %}
1919

2020
{% macro bq_generate_microbatch_build_sql(
21-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
21+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
2222
) %}
2323
{% set build_sql = bq_insert_overwrite_sql(
24-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
24+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
2525
) %}
2626

2727
{{ return(build_sql) }}

0 commit comments

Comments
 (0)