Skip to content

Commit 145e74e

Browse files
Add an update to allow microbatching to handle the datetime data type. This is not supported in Fabric so isn't covered by the upstream adapater.
DBT passes in event times using a UTC string that datetime column types cannot parse. By converting this to a datetimeoffset it can be filtered against datetime, datetime2 and datetimeoffset columns which can exist in SQL Server.
1 parent 32c2e3d commit 145e74e

File tree

3 files changed

+104
-1
lines changed

3 files changed

+104
-1
lines changed

dbt/adapters/sqlserver/sqlserver_relation.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from dataclasses import dataclass, field
22
from typing import Optional, Type
33

4-
from dbt.adapters.base.relation import BaseRelation
4+
from dbt.adapters.base.relation import BaseRelation, EventTimeFilter
55
from dbt.adapters.utils import classproperty
66
from dbt_common.exceptions import DbtRuntimeError
77

@@ -49,3 +49,28 @@ def __post_init__(self):
4949

5050
def relation_max_name_length(self):
5151
return MAX_CHARACTERS_IN_IDENTIFIER
52+
53+
def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str:
54+
"""
55+
Returns "" if start and end are both None
56+
"""
57+
filter = ""
58+
if event_time_filter.start and event_time_filter.end:
59+
filter = (
60+
f"{event_time_filter.field_name} >="
61+
f" parse('{event_time_filter.start}' as datetimeoffset)"
62+
f" and {event_time_filter.field_name} <"
63+
f" parse('{event_time_filter.end}' as datetimeoffset)"
64+
)
65+
elif event_time_filter.start:
66+
filter = (
67+
f"{event_time_filter.field_name} >="
68+
f" parse('{event_time_filter.start}' as datetimeoffset)"
69+
)
70+
elif event_time_filter.end:
71+
filter = (
72+
f"{event_time_filter.field_name} <"
73+
f" parse('{event_time_filter.end}' as datetimeoffset)"
74+
)
75+
76+
return filter
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{% macro sqlserver__get_incremental_microbatch_sql(arg_dict) %}
2+
{%- set target = arg_dict["target_relation"] -%}
3+
{%- set source = arg_dict["temp_relation"] -%}
4+
{%- set dest_columns = arg_dict["dest_columns"] -%}
5+
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
6+
7+
{#-- Add additional incremental_predicates to filter for batch --#}
8+
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
9+
{{ log("incremenal append event start time > DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= parse('" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "' as datetimeoffset)") }}
10+
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= parse('" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "' as datetimeoffset)") %}
11+
{% endif %}
12+
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
13+
{{ log("incremenal append event end time < DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < parse('" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "' as datetimeoffset)") }}
14+
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < parse('" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "' as datetimeoffset)") %}
15+
{% endif %}
16+
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}
17+
18+
delete DBT_INTERNAL_TARGET from {{ target }} AS DBT_INTERNAL_TARGET
19+
where (
20+
{% for predicate in incremental_predicates %}
21+
{%- if not loop.first %}and {% endif -%} {{ predicate }}
22+
{% endfor %}
23+
);
24+
25+
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
26+
insert into {{ target }} ({{ dest_cols_csv }})
27+
(
28+
select {{ dest_cols_csv }}
29+
from {{ source }}
30+
)
31+
{% endmacro %}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import pytest
2+
from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch
3+
4+
_microbatch_model_no_unique_id_sql_datetime = """
5+
{{ config(materialized='incremental', incremental_strategy='microbatch',
6+
event_time='event_time', batch_size='day', begin='2020-01-01 00:00:00') }}
7+
select * from {{ ref('input_model') }}
8+
"""
9+
10+
_input_model_sql_datetime = """
11+
{{ config(materialized='table', event_time='event_time') }}
12+
select 1 as id, '2020-01-01 00:00:00' as event_time
13+
union all
14+
select 2 as id, '2020-01-02 00:00:00' as event_time
15+
union all
16+
select 3 as id, '2020-01-03 00:00:00' as event_time
17+
"""
18+
19+
20+
class TestSQLServerMicrobatchDateTime(BaseMicrobatch):
21+
"""
22+
Setup a version of the microbatch testing that uses a datetime column as the event_time
23+
This is to test that the microbatch strategy can handle datetime columns when passing in
24+
event times as UTC strings
25+
"""
26+
27+
@pytest.fixture(scope="class")
28+
def microbatch_model_sql(self) -> str:
29+
return _microbatch_model_no_unique_id_sql_datetime
30+
31+
@pytest.fixture(scope="class")
32+
def input_model_sql(self) -> str:
33+
"""
34+
This is the SQL that defines the input model to the microbatch model,
35+
including any {{ config(..) }}. event_time is a required configuration of this input
36+
"""
37+
return _input_model_sql_datetime
38+
39+
@pytest.fixture(scope="class")
40+
def insert_two_rows_sql(self, project) -> str:
41+
test_schema_relation = project.adapter.Relation.create(
42+
database=project.database, schema=project.test_schema
43+
)
44+
return (
45+
f"insert into {test_schema_relation}.input_model (id, event_time) "
46+
f"values (4, '2020-01-04 00:00:00'), (5, '2020-01-05 00:00:00')"
47+
)

0 commit comments

Comments
 (0)