Skip to content

Commit f26d822

Browse files
dbeatty10QMalcolm
andauthored
Fix microbatch file naming for compiled code (#11220)
* Functional test for hourly microbatch model * Use today's date for functional test for hourly microbatch model * Use today's date for functional test for hourly microbatch model * Restore to original * Only use alphanumeric characters within batch ids * Add tests for batch_id and change expected output for format_batch_start * Handle missing batch_start * Revert "Handle missing batch_start" This reverts commit 65a1db0. Reverting this because `batch_start` for `format_batch_start` cannot be `None` and `start_time` for `batch_id` cannot be `None`. * Improve BatchSize specific values for `format_batch_start` and `batch_id` methods --------- Co-authored-by: Quigley Malcolm <[email protected]>
1 parent e264675 commit f26d822

File tree

4 files changed

+157
-8
lines changed

4 files changed

+157
-8
lines changed

core/dbt/context/providers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,9 +1050,10 @@ def write(self, payload: str) -> str:
10501050
if (
10511051
isinstance(self.model, ModelNode)
10521052
and self.model.config.get("incremental_strategy") == "microbatch"
1053+
and self.model.batch is not None
10531054
):
10541055
split_suffix = MicrobatchBuilder.format_batch_start(
1055-
self.model.config.get("__dbt_internal_microbatch_event_time_start"),
1056+
self.model.batch.event_time_start,
10561057
self.model.config.batch_size,
10571058
)
10581059

core/dbt/materializations/incremental/microbatch.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,21 @@ def batch_id(start_time: datetime, batch_size: BatchSize) -> str:
199199

200200
@staticmethod
201201
def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str:
202-
return str(
203-
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
204-
)
202+
"""Format the passed in datetime based on the batch_size.
203+
204+
2024-09-17 16:06:00 + Batchsize.hour -> 2024-09-17T16
205+
2024-09-17 16:06:00 + Batchsize.day -> 2024-09-17
206+
2024-09-17 16:06:00 + Batchsize.month -> 2024-09
207+
2024-09-17 16:06:00 + Batchsize.year -> 2024
208+
"""
209+
if batch_size == BatchSize.year:
210+
return batch_start.strftime("%Y")
211+
elif batch_size == BatchSize.month:
212+
return batch_start.strftime("%Y-%m")
213+
elif batch_size == BatchSize.day:
214+
return batch_start.strftime("%Y-%m-%d")
215+
else: # batch_size == BatchSize.hour
216+
return batch_start.strftime("%Y-%m-%dT%H")
205217

206218
@staticmethod
207219
def ceiling_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime:

tests/functional/microbatch/test_microbatch.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,21 @@
5757
select * from {{ ref('input_model') }}
5858
"""
5959

60+
microbatch_model_hour_sql = """
61+
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='hour', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
62+
select * from {{ ref('input_model') }}
63+
"""
64+
65+
microbatch_model_month_sql = """
66+
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='month', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
67+
select * from {{ ref('input_model') }}
68+
"""
69+
70+
microbatch_model_year_sql = """
71+
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
72+
select * from {{ ref('input_model') }}
73+
"""
74+
6075
microbatch_model_with_pre_and_post_sql = """
6176
{{ config(
6277
materialized='incremental',
@@ -841,6 +856,111 @@ def test_run_with_event_time(self, project):
841856
)
842857

843858

859+
class TestMicrobatchCompiledRunPathsHourly(BaseMicrobatchTest):
860+
861+
@pytest.fixture(scope="class")
862+
def models(self):
863+
return {
864+
"input_model.sql": input_model_sql,
865+
"microbatch_model.sql": microbatch_model_hour_sql,
866+
}
867+
868+
def test_run_with_event_time(self, project):
869+
# run all partitions from start - 2 expected rows in output, one failed
870+
with patch_microbatch_end_time("2020-01-03 13:57:00"):
871+
run_dbt(["run"])
872+
873+
# Compiled paths - batch compilations
874+
assert read_file(
875+
project.project_root,
876+
"target",
877+
"compiled",
878+
"test",
879+
"models",
880+
"microbatch_model",
881+
"microbatch_model_2020-01-03T13.sql",
882+
)
883+
assert read_file(
884+
project.project_root,
885+
"target",
886+
"run",
887+
"test",
888+
"models",
889+
"microbatch_model",
890+
"microbatch_model_2020-01-03T13.sql",
891+
)
892+
893+
894+
class TestMicrobatchCompiledRunPathsMonthly(BaseMicrobatchTest):
895+
896+
@pytest.fixture(scope="class")
897+
def models(self):
898+
return {
899+
"input_model.sql": input_model_sql,
900+
"microbatch_model.sql": microbatch_model_month_sql,
901+
}
902+
903+
def test_run_with_event_time(self, project):
904+
# run all partitions from start - 2 expected rows in output, one failed
905+
with patch_microbatch_end_time("2020-01-03 13:57:00"):
906+
run_dbt(["run"])
907+
908+
# Compiled paths - batch compilations
909+
assert read_file(
910+
project.project_root,
911+
"target",
912+
"compiled",
913+
"test",
914+
"models",
915+
"microbatch_model",
916+
"microbatch_model_2020-01.sql",
917+
)
918+
assert read_file(
919+
project.project_root,
920+
"target",
921+
"run",
922+
"test",
923+
"models",
924+
"microbatch_model",
925+
"microbatch_model_2020-01.sql",
926+
)
927+
928+
929+
class TestMicrobatchCompiledRunPathsYearly(BaseMicrobatchTest):
930+
931+
@pytest.fixture(scope="class")
932+
def models(self):
933+
return {
934+
"input_model.sql": input_model_sql,
935+
"microbatch_model.sql": microbatch_model_year_sql,
936+
}
937+
938+
def test_run_with_event_time(self, project):
939+
# run all partitions from start - 2 expected rows in output, one failed
940+
with patch_microbatch_end_time("2020-01-03 13:57:00"):
941+
run_dbt(["run"])
942+
943+
# Compiled paths - batch compilations
944+
assert read_file(
945+
project.project_root,
946+
"target",
947+
"compiled",
948+
"test",
949+
"models",
950+
"microbatch_model",
951+
"microbatch_model_2020.sql",
952+
)
953+
assert read_file(
954+
project.project_root,
955+
"target",
956+
"run",
957+
"test",
958+
"models",
959+
"microbatch_model",
960+
"microbatch_model_2020.sql",
961+
)
962+
963+
844964
class TestMicrobatchFullRefreshConfigFalse(BaseMicrobatchTest):
845965
@pytest.fixture(scope="class")
846966
def models(self):

tests/unit/materializations/incremental/test_microbatch.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -602,16 +602,32 @@ def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestam
602602
def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp):
603603
assert MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) == expected_timestamp
604604

605+
@pytest.mark.parametrize(
606+
"batch_size,start_time,expected_formatted_start_time",
607+
[
608+
(BatchSize.year, datetime(2020, 1, 1, 1), "2020"),
609+
(BatchSize.month, datetime(2020, 1, 1, 1), "202001"),
610+
(BatchSize.day, datetime(2020, 1, 1, 1), "20200101"),
611+
(BatchSize.hour, datetime(2020, 1, 1, 1), "20200101T01"),
612+
],
613+
)
614+
def test_batch_id(
615+
self, batch_size: BatchSize, start_time: datetime, expected_formatted_start_time: str
616+
) -> None:
617+
assert MicrobatchBuilder.batch_id(start_time, batch_size) == expected_formatted_start_time
618+
605619
@pytest.mark.parametrize(
606620
"batch_size,batch_start,expected_formatted_batch_start",
607621
[
608-
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
609-
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
622+
(BatchSize.year, datetime(2020, 1, 1, 1), "2020"),
623+
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01"),
610624
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
611-
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"),
625+
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01T01"),
612626
],
613627
)
614-
def test_format_batch_start(self, batch_size, batch_start, expected_formatted_batch_start):
628+
def test_format_batch_start(
629+
self, batch_size: BatchSize, batch_start: datetime, expected_formatted_batch_start: str
630+
) -> None:
615631
assert (
616632
MicrobatchBuilder.format_batch_start(batch_start, batch_size)
617633
== expected_formatted_batch_start

0 commit comments

Comments
 (0)