Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.
Open
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241229-174752.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: add multiple substrategies for insert_overwrite and microbatch
time: 2024-12-29T17:47:52.647374Z
custom:
Author: borjavb
Issue: "1409"
56 changes: 46 additions & 10 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,43 @@
{% do return(strategy) %}
{% endmacro %}

{% macro dbt_bigquery_optimize_substrategy(partitions) %}
{#-- Set the optimal substrategy based on the nature of the overwrite#}
{% if partitions is not none and partitions != [] %} {# static #}
{{ return("delete+insert") }}
{% else %} {# dynamic #}
{{ return("copy_partitions") }}
{% endif %}
{% endmacro %}

{% macro dbt_bigquery_validate_incremental_substrategy(config, strategy, copy_partitions, partitions) %}
{# -- Find and validate the function used for insert_overwrite
Legacy behaviour was to pass the copy_partitions as part of the `partition_by` clause
So we need to bring back that optionality into this validation.
#}
{%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%}

{% if strategy in ['insert_overwrite', 'microbatch'] %}
{% if incremental_substrategy not in ['merge', 'commit+delete+insert', 'delete+insert', 'copy_partitions', 'optimal'] %}
{% set wrong_fn -%}
The 'incremental_substrategy' option has to be either 'merge' (default), 'commit+delete+insert', 'delete+insert', 'copy_partitions' or 'optimal'
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_fn) %}
{% endif %}
{% elif incremental_substrategy is not none%}
{% set wrong_strategy_msg -%}
The 'incremental_substrategy' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
{% endif %}

{% if incremental_substrategy == 'optimal' %}
{{ return(dbt_bigquery_optimize_substrategy(partitions)) }}
{% else %}
{{ return(incremental_substrategy) }}
{% endif %}
{% endmacro %}

{% macro source_sql_with_partition(partition_by, source_sql) %}

{%- if partition_by.time_ingestion_partitioning %}
Expand All @@ -43,19 +80,19 @@
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates
) %}
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{% elif strategy == 'microbatch' %}

{% set build_sql = bq_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{% else %} {# strategy == 'merge' #}
Expand Down Expand Up @@ -87,6 +124,10 @@
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{#-- Validate early that the incremental substrategy is set correctly for insert_overwrite or microbatch--#}
{% set incremental_substrategy = dbt_bigquery_validate_incremental_substrategy(config, strategy, partition_by.copy_partitions, partitions) -%}


{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) %}

Expand All @@ -95,13 +136,8 @@

{{ run_hooks(pre_hooks) }}

{% if partition_by.copy_partitions is true and strategy not in ['insert_overwrite', 'microbatch'] %} {#-- We can't copy partitions with merge strategy --#}
{% set wrong_strategy_msg -%}
The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}

{% elif existing_relation is none %}
{% if existing_relation is none %}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
{%- endcall -%}
Expand Down Expand Up @@ -153,7 +189,7 @@
{% endif %}

{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates
) %}

{%- call statement('main') -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
Expand All @@ -9,7 +9,7 @@
{% endif %}

{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{{ return(build_sql) }}
Expand Down Expand Up @@ -38,17 +38,17 @@
{% endmacro %}

{% macro bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy) }}
{% else %} {# dynamic #}
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) }}
{% endif %}
{% endmacro %}

{% macro bq_static_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{% set predicate -%}
Expand All @@ -75,32 +75,41 @@
)
{%- endset -%}

{% if copy_partitions %}
{% if incremental_substrategy == 'copy_partitions' %}
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% else %}

{#-- In case we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}
-- 1. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}
{#-- In case we're putting the model SQL _directly_ into the MERGE/insert+delete transaction,
we need to prepend the merge/transaction statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}

{% if incremental_substrategy == 'delete+insert' %}
-- 1. run insert_overwrite with delete+insert (without a transaction) strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{% elif incremental_substrategy == 'commit+delete+insert' %}
-- 1. run insert_overwrite with delete+insert (with a transaction) strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists, transactional=True) }};
{% else %}
-- 1. run insert_overwrite with merge strategy optimisation
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{% endif %}

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}

{% endif %}
{% endif %}
{% endmacro %}

{% macro bq_dynamic_copy_partitions_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists
) %}
{%- if tmp_relation_exists is false -%}
{# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #}
{# We run temp table creation in a separated script to move to partitions copy if it does not already exist #}
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
{{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql')
}}
Expand All @@ -117,9 +126,9 @@
drop table if exists {{ tmp_relation }}
{% endmacro %}

{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
{%- if copy_partitions is true %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) %}
{% if incremental_substrategy == 'copy_partitions' %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }}
{% else -%}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
Expand Down Expand Up @@ -155,12 +164,58 @@
from {{ tmp_relation }}
);

-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};

{% if incremental_substrategy == 'delete+insert' %}
-- 3. run insert_overwrite with the delete+insert (without a transaction) strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% elif incremental_substrategy == 'commit+delete+insert' %}
-- 3. run insert_overwrite with the delete+insert (with a transaction) strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], transactional=True) }};
{% else %}
-- 3. run insert_overwrite with the merge strategy optimisation
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% endif %}
-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{% endif %}

{% endmacro %}



{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header, transactional=False) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none and include_sql_header }}

{% if transactional %}
-- We can rely on a multi-statement transaction to enable atomicity
-- If something goes south, nothing is committed
-- DELETE + INSERT allow for isolation lock
begin
begin transaction ;
{% endif %}
-- DELETE operations are free https://cloud.google.com/bigquery/docs/using-dml-with-partitioned-tables#using_dml_delete_to_delete_partitions
delete from {{ target }} as DBT_INTERNAL_DEST
where true
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %};

-- INSERT the data
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% if transactional %}
-- leaving the trailing ; out of the if as the calling macro already adds a leading ; to this output
; commit transaction;

exception when error then
-- If things go south, abort and rollback
raise using message = FORMAT("dbt error while commit+delete+instert: %s", @@error.message);
rollback transaction;
end
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
{% endmacro %}

{% macro bq_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}
{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{{ return(build_sql) }}
Expand Down
Loading
Loading