Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit b1b5183

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents de2bca1 + ae0f91c commit b1b5183

File tree

7 files changed

+161
-6
lines changed

7 files changed

+161
-6
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Add Microbatch Strategy to dbt-spark
3+
time: 2024-09-25T23:22:38.216277+01:00
4+
custom:
5+
Author: michelleark
6+
Issue: "1354"

.github/workflows/integration.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ permissions: read-all
6767

6868
# will cancel previous workflows triggered by the same event and for the same ref for PRs or same SHA otherwise
6969
concurrency:
70-
group: ${{ github.workflow }}-${{ github.event_name }}-${{ contains(github.event_name, 'pull_request') && github.event.pull_request.head.ref || github.sha }}
70+
group: ${{ github.workflow }}-${{ github.event_name }}-${{ contains(github.event_name, 'pull_request') && github.event.pull_request.head.ref || github.sha }}-${{ github.actor }}
7171
cancel-in-progress: true
7272

7373
# sets default shell to bash, for all operating systems
@@ -128,6 +128,7 @@ jobs:
128128
- 'tests/**'
129129
- 'dev-requirements.txt'
130130
- '.github/**'
131+
- '*.py'
131132
132133
- name: Generate integration test matrix
133134
id: generate-matrix

dbt/include/bigquery/macros/materializations/incremental.sql

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44

55
{% set invalid_strategy_msg -%}
66
Invalid incremental strategy provided: {{ strategy }}
7-
Expected one of: 'merge', 'insert_overwrite'
7+
Expected one of: 'merge', 'insert_overwrite', 'microbatch'
88
{%- endset %}
9-
{% if strategy not in ['merge', 'insert_overwrite'] %}
9+
{% if strategy not in ['merge', 'insert_overwrite', 'microbatch'] %}
1010
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
1111
{% endif %}
1212

13+
{% if strategy == 'microbatch' %}
14+
{% do bq_validate_microbatch_config(config) %}
15+
{% endif %}
16+
1317
{% do return(strategy) %}
1418
{% endmacro %}
1519

@@ -48,8 +52,13 @@
4852
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
4953
) %}
5054

51-
{% else %} {# strategy == 'merge' #}
55+
{% elif strategy == 'microbatch' %}
5256

57+
{% set build_sql = bq_generate_microbatch_build_sql(
58+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
59+
) %}
60+
61+
{% else %} {# strategy == 'merge' #}
5362
{% set build_sql = bq_generate_incremental_merge_build_sql(
5463
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_predicates
5564
) %}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{% macro bq_validate_microbatch_config(config) %}
2+
{% if config.get("partition_by") is none %}
3+
{% set missing_partition_msg -%}
4+
The 'microbatch' strategy requires a `partition_by` config.
5+
{%- endset %}
6+
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
7+
{% endif %}
8+
9+
{% if config.get("partition_by").granularity != config.get('batch_size') %}
10+
{% set invalid_partition_by_granularity_msg -%}
11+
The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`.
12+
Got:
13+
`batch_size`: {{ config.get('batch_size') }}
14+
`partition_by.granularity`: {{ config.get("partition_by").granularity }}
15+
{%- endset %}
16+
{% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %}
17+
{% endif %}
18+
{% endmacro %}
19+
20+
{% macro bq_generate_microbatch_build_sql(
21+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
22+
) %}
23+
{% set build_sql = bq_insert_overwrite_sql(
24+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
25+
) %}
26+
27+
{{ return(build_sql) }}
28+
{% endmacro %}

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def _dbt_bigquery_version() -> str:
5050
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
5151
include_package_data=True,
5252
install_requires=[
53-
"dbt-common>=1.0.4,<2.0",
54-
"dbt-adapters>=1.1.1,<2.0",
53+
"dbt-common>=1.10,<2.0",
54+
"dbt-adapters>=1.7,<2.0",
5555
# 3.20 introduced pyarrow>=3.0 under the `pandas` extra
5656
"google-cloud-bigquery[pandas]>=3.0,<4.0",
5757
"google-cloud-storage~=2.4",

tests/functional/adapter/incremental/incremental_strategy_fixtures.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,3 +555,59 @@
555555
556556
select * from data
557557
""".lstrip()
558+
559+
microbatch_model_no_unique_id_sql = """
560+
{{ config(
561+
materialized='incremental',
562+
incremental_strategy='microbatch',
563+
partition_by={
564+
'field': 'event_time',
565+
'data_type': 'timestamp',
566+
'granularity': 'day'
567+
},
568+
event_time='event_time',
569+
batch_size='day',
570+
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
571+
)
572+
}}
573+
select * from {{ ref('input_model') }}
574+
"""
575+
576+
microbatch_input_sql = """
577+
{{ config(materialized='table', event_time='event_time') }}
578+
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
579+
union all
580+
select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time
581+
union all
582+
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
583+
"""
584+
585+
microbatch_model_no_partition_by_sql = """
586+
{{ config(
587+
materialized='incremental',
588+
incremental_strategy='microbatch',
589+
event_time='event_time',
590+
batch_size='day',
591+
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
592+
)
593+
}}
594+
select * from {{ ref('input_model') }}
595+
"""
596+
597+
598+
microbatch_model_invalid_partition_by_sql = """
599+
{{ config(
600+
materialized='incremental',
601+
incremental_strategy='microbatch',
602+
event_time='event_time',
603+
batch_size='day',
604+
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0),
605+
partition_by={
606+
'field': 'event_time',
607+
'data_type': 'timestamp',
608+
'granularity': 'hour'
609+
}
610+
)
611+
}}
612+
select * from {{ ref('input_model') }}
613+
"""
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import os
2+
import pytest
3+
from unittest import mock
4+
5+
from dbt.tests.util import run_dbt_and_capture
6+
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
7+
BaseMicrobatch,
8+
patch_microbatch_end_time,
9+
)
10+
11+
from tests.functional.adapter.incremental.incremental_strategy_fixtures import (
12+
microbatch_model_no_unique_id_sql,
13+
microbatch_input_sql,
14+
microbatch_model_no_partition_by_sql,
15+
microbatch_model_invalid_partition_by_sql,
16+
)
17+
18+
19+
class TestBigQueryMicrobatch(BaseMicrobatch):
20+
@pytest.fixture(scope="class")
21+
def microbatch_model_sql(self) -> str:
22+
return microbatch_model_no_unique_id_sql
23+
24+
25+
class TestBigQueryMicrobatchMissingPartitionBy:
26+
@pytest.fixture(scope="class")
27+
def models(self) -> str:
28+
return {
29+
"microbatch.sql": microbatch_model_no_partition_by_sql,
30+
"input_model.sql": microbatch_input_sql,
31+
}
32+
33+
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
34+
def test_execution_failure_no_partition_by(self, project):
35+
with patch_microbatch_end_time("2020-01-03 13:57:00"):
36+
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
37+
assert "The 'microbatch' strategy requires a `partition_by` config" in stdout
38+
39+
40+
class TestBigQueryMicrobatchInvalidPartitionByGranularity:
41+
@pytest.fixture(scope="class")
42+
def models(self) -> str:
43+
return {
44+
"microbatch.sql": microbatch_model_invalid_partition_by_sql,
45+
"input_model.sql": microbatch_input_sql,
46+
}
47+
48+
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
49+
def test_execution_failure_no_partition_by(self, project):
50+
with patch_microbatch_end_time("2020-01-03 13:57:00"):
51+
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
52+
assert (
53+
"The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`"
54+
in stdout
55+
)

0 commit comments

Comments
 (0)