Skip to content

Commit fed0e2e

Browse files
Adap 540/add copy partitions with static partitions (#1100)
Co-authored-by: Colin Rogers <[email protected]>
1 parent 176f183 commit fed0e2e

File tree

4 files changed

+188
-47
lines changed

4 files changed

+188
-47
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Add support for copy_partitions when used with static partitions.
3+
time: 2025-05-14T18:54:07.360083-07:00
4+
custom:
5+
Author: versusfacit Kayrnt
6+
Issue: "540"

dbt-bigquery/src/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql

Lines changed: 127 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,62 +40,144 @@
4040
{% macro bq_insert_overwrite_sql(
4141
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
4242
) %}
43-
{% if partitions is not none and partitions != [] %} {# static #}
44-
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }}
45-
{% else %} {# dynamic #}
46-
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
47-
{% endif %}
43+
44+
{# static #}
45+
{% if partitions is not none and partitions != [] %}
46+
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }}
47+
48+
{# dynamic #}
49+
{% else %}
50+
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
51+
{% endif %}
52+
4853
{% endmacro %}
4954

50-
{% macro bq_static_insert_overwrite_sql(
51-
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
55+
{#
56+
-- static partitions refer to a fixed set of partition values that are
57+
-- known ahead of time and do not depend on source data or runtime
58+
-- conditions. these values are typically hardcoded or configured
59+
-- externally (e.g., via dbt variables or macros).
60+
--
61+
-- with `insert_overwrite`, dbt uses a predefined partition filter
62+
-- expression to overwrite specific partitions
63+
-- (e.g., where partition_column = '2024-05-01').
64+
-- it excels where using a fixed minimal partition list, no need to
65+
-- query source data for partitions, and insert statements can be
66+
-- batched. this also keeps things deterministic.
67+
#}
68+
69+
{% macro bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %}
70+
{%- set source_sql -%}
71+
(
72+
{% if tmp_relation_exists %}
73+
select
74+
{% if partition_by.time_ingestion_partitioning %}
75+
{{ partition_by.insertable_time_partitioning_field() }},
76+
{% endif %}
77+
* from {{ tmp_relation }}
78+
{%- elif partition_by.time_ingestion_partitioning -%}
79+
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, true) }}
80+
{%- else -%}
81+
{{ sql }}
82+
{%- endif %}
83+
)
84+
{%- endset -%}
85+
{{ return(source_sql) }}
86+
{% endmacro %}
87+
88+
89+
{#
90+
-- Static-partition + `copy_partitions=true` should inline the literals / foldable constants
91+
-- supplied via the `partitions=` config.
92+
93+
-- The inline method will still copy (truncate) a partition even
94+
-- if the incremental run produced no rows for that date, because the user
95+
-- explicitly listed it in `partitions=`.
96+
#}
97+
{% macro bq_static_copy_partitions_insert_overwrite_sql(
98+
tmp_relation, target_relation, sql, partition_by, partitions, tmp_relation_exists
5299
) %}
53100

54-
{% set predicate -%}
55-
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
56-
{{ partitions | join (', ') }}
57-
)
58-
{%- endset %}
101+
{%- if tmp_relation_exists is false -%}
102+
{%- set source_sql = bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %}
59103

60-
{%- set source_sql -%}
61-
(
62-
{% if partition_by.time_ingestion_partitioning and tmp_relation_exists -%}
63-
select
64-
{{ partition_by.insertable_time_partitioning_field() }},
65-
* from {{ tmp_relation }}
66-
{% elif tmp_relation_exists -%}
67-
select
68-
* from {{ tmp_relation }}
69-
{%- elif partition_by.time_ingestion_partitioning -%}
70-
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
71-
{%- else -%}
72-
{{sql}}
73-
{%- endif %}
74-
75-
)
76-
{%- endset -%}
104+
{# -- we run temp table creation in a separate script to move to partitions copy if it doesn't already exist #}
105+
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
106+
{{ bq_create_table_as(partition_by, true, tmp_relation, source_sql, 'sql')
107+
}}
108+
{%- endcall %}
109+
{%- endif -%}
77110

78-
{% if copy_partitions %}
79-
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
80-
{% else %}
111+
{%- set partitions_sql -%}
112+
select
113+
cast(partition_literal as timestamp) as partition_ts
114+
from unnest([
115+
{{ partitions | join(', ') }}
116+
]) as partition_literal
117+
{%- endset -%}
81118

82-
{#-- In case we're putting the model SQL _directly_ into the MERGE statement,
83-
we need to prepend the MERGE statement with the user-configured sql_header,
84-
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
85-
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
86-
sql_header is included by the create_table_as macro.
87-
#}
88-
-- 1. run the merge statement
89-
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
119+
{%- set resolved_partitions = run_query(partitions_sql).columns[0].values() -%}
90120

91-
{%- if tmp_relation_exists -%}
92-
-- 2. clean up the temp table
93-
drop table if exists {{ tmp_relation }};
94-
{%- endif -%}
121+
{% do bq_copy_partitions(tmp_relation, target_relation, resolved_partitions, partition_by) %}
95122

96-
{% endif %}
123+
{# clean up temp table #}
124+
drop table if exists {{ tmp_relation }}
97125
{% endmacro %}
98126

127+
128+
{% macro bq_static_insert_overwrite_sql(
129+
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
130+
) %}
131+
132+
{%- if copy_partitions %}
133+
{{ bq_static_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, tmp_relation_exists) }}
134+
{% else -%}
135+
{% set predicate -%}
136+
{{ partition_by.render_wrapped(alias='dbt_internal_dest') }} in (
137+
{{ partitions | join (', ') }}
138+
)
139+
{%- endset %}
140+
141+
{%- set source_sql = bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %}
142+
{#
143+
-- when the model sql is inserted directly into the merge statement,
144+
-- we need to prepend it with the user-defined `sql_header`. this is
145+
-- important when the model sql references elements like variables or udfs
146+
-- defined in the header.
147+
148+
-- in the case where a temporary table is created first (i.e., the
149+
-- "temp table exists" path), the `sql_header` is already included via
150+
-- the `create_table_as` macro, so no additional handling is needed.
151+
#}
152+
153+
-- 1. run the merge statement
154+
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
155+
156+
{%- if tmp_relation_exists -%}
157+
-- 2. clean up the temp table
158+
drop table if exists {{ tmp_relation }};
159+
{%- endif -%}
160+
161+
{%- endif -%}
162+
{% endmacro %}
163+
164+
165+
{#
166+
-- dynamic partitions refer to the set partition values that are
167+
-- determined at runtime, based on either the contents of source
168+
-- data (e.g. values in an updated_date column) or external runtime
169+
-- parameters (e.g. time of day, system params).
170+
--
171+
-- with `insert_overwrite`, this allows dbt to compute which
172+
-- partitions to overwrite dynamically using a partition filter
173+
-- expression (e.g., where partition_column >= current_date()).
174+
--
175+
-- this enables targeted incremental updates by overwriting only the
176+
-- affected partitions, rather than replacing the entire table.
177+
-- this reduces latency and cost by limiting the scope of writes.
178+
#}
179+
180+
99181
{% macro bq_dynamic_copy_partitions_insert_overwrite_sql(
100182
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
101183
) %}

dbt-bigquery/tests/functional/adapter/incremental/incremental_strategy_fixtures.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,52 @@
336336
}}
337337
338338
339+
with data as (
340+
341+
{% if not is_incremental() %}
342+
343+
select 1 as id, cast('2020-01-01' as date) as date_day union all
344+
select 2 as id, cast('2020-01-01' as date) as date_day union all
345+
select 3 as id, cast('2020-01-01' as date) as date_day union all
346+
select 4 as id, cast('2020-01-01' as date) as date_day
347+
348+
{% else %}
349+
350+
-- we want to overwrite the 4 records in the 2020-01-01 partition
351+
-- with the 2 records below, but add two more in the 2020-01-02 partition
352+
select 10 as id, cast('2020-01-01' as date) as date_day union all
353+
select 20 as id, cast('2020-01-01' as date) as date_day union all
354+
select 30 as id, cast('2020-01-02' as date) as date_day union all
355+
select 40 as id, cast('2020-01-02' as date) as date_day
356+
357+
{% endif %}
358+
359+
)
360+
361+
select * from data
362+
363+
{% if is_incremental() %}
364+
where date_day in ({{ config.get("partitions") | join(",") }})
365+
{% endif %}
366+
-- Test comment to prevent recurrence of https://github.com/dbt-labs/dbt-bigquery/issues/896
367+
""".lstrip()
368+
369+
overwrite_copy_partitions_with_partitions_sql = """
370+
{{
371+
config(
372+
materialized="incremental",
373+
incremental_strategy='insert_overwrite',
374+
cluster_by="id",
375+
partitions=["CAST('2020-01-01' AS DATE)","'2020-01-02'"],
376+
partition_by={
377+
"field": "date_day",
378+
"data_type": "date",
379+
"copy_partitions": true
380+
}
381+
)
382+
}}
383+
384+
339385
with data as (
340386
341387
{% if not is_incremental() %}

dbt-bigquery/tests/functional/adapter/incremental/test_incremental_strategies.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
overwrite_day_sql,
2323
overwrite_day_with_copy_partitions_sql,
2424
overwrite_partitions_sql,
25+
overwrite_copy_partitions_with_partitions_sql,
2526
overwrite_range_sql,
2627
overwrite_time_sql,
2728
overwrite_day_with_time_ingestion_sql,
@@ -45,6 +46,7 @@ def models(self):
4546
"incremental_overwrite_day.sql": overwrite_day_sql,
4647
"incremental_overwrite_day_with_copy_partitions.sql": overwrite_day_with_copy_partitions_sql,
4748
"incremental_overwrite_partitions.sql": overwrite_partitions_sql,
49+
"incremental_overwrite_copy_partitions_with_partitions.sql": overwrite_copy_partitions_with_partitions_sql,
4850
"incremental_overwrite_range.sql": overwrite_range_sql,
4951
"incremental_overwrite_time.sql": overwrite_time_sql,
5052
"incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql,
@@ -67,17 +69,22 @@ def seeds(self):
6769
def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(self, project):
6870
run_dbt(["seed"])
6971
results = run_dbt()
70-
assert len(results) == 12
72+
assert len(results) == 13
7173

7274
results = run_dbt()
73-
assert len(results) == 12
75+
assert len(results) == 13
76+
7477
incremental_strategies = [
7578
("incremental_merge_range", "merge_expected"),
7679
("incremental_merge_time", "merge_expected"),
7780
("incremental_merge_time_with_require_partition_view", "merge_expected"),
7881
("incremental_overwrite_time", "incremental_overwrite_time_expected"),
7982
("incremental_overwrite_date", "incremental_overwrite_date_expected"),
8083
("incremental_overwrite_partitions", "incremental_overwrite_date_expected"),
84+
(
85+
"incremental_overwrite_copy_partitions_with_partitions",
86+
"incremental_overwrite_date_expected",
87+
),
8188
("incremental_overwrite_day", "incremental_overwrite_day_expected"),
8289
("incremental_overwrite_range", "incremental_overwrite_range_expected"),
8390
(

0 commit comments

Comments
 (0)