Skip to content

Commit 87584c7

Browse files
authored
Fix null handling of source freshness and dbt_project vs schema spec precedence (#11698)
* Handle explicit setting of null for source freshness config * Abstract out the creation of the target config This is useful because it makes that portion of code more re-usable/portable and makes the work we are about to do easier. * Fix bug in `merge_source_freshness` where empty freshness was preferenced over `None` The issue was that during merging of freshnesses, an "empty freshness", one where all values are `None`, was being preferenced over `None`. This was problematic because an "empty freshness" indicates that a freshness was not specified at that level. While `None` means that the freshness was _explicitly_ set to `None`. As such we should preference the thing that was specifically set. * Properly get dbt_project defined freshness and don't merge with schema defined freshness Previously we were only getting the "top level" freshness from the dbt_project.yaml. This was ignoring freshness settings for the direct, source, and table set in the dbt_project.yaml. Additionally, we were merging the dbt_project.yaml freshness into the schema freshness. Long term this merging would be desireably, however before we do that we need to ensure freshness at diffrent levels within the dbt_project.yml get properly merged (currently the different levels clobber each other). Fixing that is a larger issue though. So for the time being, the schema defintion of freshness will clobber any dbt_project.yml definition of freshness. * Add changie doc * Fix whitespace to make code quality happy * Set the parsed source freshness to an empty FreshnessThreshold if None This maintains backwards compatibility
1 parent 709bd11 commit 87584c7

File tree

5 files changed

+204
-45
lines changed

5 files changed

+204
-45
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Fix source freshness set via config to handle explicit nulls
3+
time: 2025-05-30T00:58:04.94133-05:00
4+
custom:
5+
Author: QMalcolm
6+
Issue: "11685"

core/dbt/parser/sources.py

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -157,49 +157,6 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
157157
else:
158158
loaded_at_query = source.loaded_at_query
159159

160-
try:
161-
project_freshness = FreshnessThreshold.from_dict(
162-
self.root_project.sources.get("+freshness", {})
163-
)
164-
except ValueError:
165-
fire_event(
166-
FreshnessConfigProblem(
167-
msg="Could not validate `freshness` for `sources` in 'dbt_project.yml', ignoring. Please see https://docs.getdbt.com/docs/build/sources#source-data-freshness for more information.",
168-
)
169-
)
170-
project_freshness = None
171-
172-
source_freshness = source.freshness
173-
if source_freshness and (target.path, source.name) not in self._deprecations:
174-
deprecations.warn(
175-
"property-moved-to-config-deprecation",
176-
key="freshness",
177-
file=target.path,
178-
key_path=source.name,
179-
)
180-
self._deprecations.add((target.path, source.name))
181-
182-
source_config_freshness = FreshnessThreshold.from_dict(source.config.get("freshness", {}))
183-
184-
table_freshness = table.freshness
185-
if table_freshness and (target.path, table.name) not in self._deprecations:
186-
deprecations.warn(
187-
"property-moved-to-config-deprecation",
188-
key="freshness",
189-
file=target.path,
190-
key_path=table.name,
191-
)
192-
self._deprecations.add((target.path, table.name))
193-
194-
table_config_freshness = FreshnessThreshold.from_dict(table.config.get("freshness", {}))
195-
freshness = merge_source_freshness(
196-
project_freshness,
197-
source_freshness,
198-
source_config_freshness,
199-
table_freshness,
200-
table_config_freshness,
201-
)
202-
203160
quoting = source.quoting.merged(table.quoting)
204161
# path = block.path.original_file_path
205162
table_meta = table.meta or {}
@@ -249,7 +206,8 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
249206
loader=source.loader,
250207
loaded_at_field=loaded_at_field,
251208
loaded_at_query=loaded_at_query,
252-
freshness=freshness,
209+
# The setting to an empty freshness object is to maintain what we were previously doing if no freshenss was specified
210+
freshness=config.freshness or FreshnessThreshold(),
253211
quoting=quoting,
254212
resource_type=NodeType.Source,
255213
fqn=target.fqn,
@@ -366,6 +324,19 @@ def _generate_source_config(self, target: UnpatchedSourceDefinition, rendered: b
366324
# it works while source configs can only include `enabled`.
367325
precedence_configs.update(target.table.config)
368326

327+
precedence_freshness = self.calculate_freshness_from_raw_target(target)
328+
if precedence_freshness:
329+
precedence_configs["freshness"] = precedence_freshness.to_dict()
330+
elif precedence_freshness is None:
331+
precedence_configs["freshness"] = None
332+
else:
333+
# this means that the user did not set a freshness threshold in the source schema file, as such
334+
# there should be no freshness precedence
335+
precedence_configs.pop("freshness", None)
336+
337+
# Because freshness is a "object" config, the freshness from the dbt_project.yml and the freshness
338+
# from the schema file _won't_ get merged by this process. The result will be that the freshness will
339+
# come from the schema file if provided, and if not, it'll fall back to the dbt_project.yml freshness.
369340
return generator.calculate_node_config(
370341
config_call_dict={},
371342
fqn=target.fqn,
@@ -418,6 +389,58 @@ def get_unused_msg(
418389
)
419390
return unused_tables_formatted
420391

392+
def calculate_freshness_from_raw_target(
393+
self,
394+
target: UnpatchedSourceDefinition,
395+
) -> Optional[FreshnessThreshold]:
396+
source: UnparsedSourceDefinition = target.source
397+
398+
source_freshness = source.freshness
399+
if source_freshness and (target.path, source.name) not in self._deprecations:
400+
deprecations.warn(
401+
"property-moved-to-config-deprecation",
402+
key="freshness",
403+
file=target.path,
404+
key_path=source.name,
405+
)
406+
self._deprecations.add((target.path, source.name))
407+
408+
source_config_freshness_raw: Optional[Dict] = source.config.get(
409+
"freshness", {}
410+
) # Will only be None if the user explicitly set it to null
411+
source_config_freshness: Optional[FreshnessThreshold] = (
412+
FreshnessThreshold.from_dict(source_config_freshness_raw)
413+
if source_config_freshness_raw is not None
414+
else None
415+
)
416+
417+
table: UnparsedSourceTableDefinition = target.table
418+
table_freshness = table.freshness
419+
if table_freshness and (target.path, table.name) not in self._deprecations:
420+
deprecations.warn(
421+
"property-moved-to-config-deprecation",
422+
key="freshness",
423+
file=target.path,
424+
key_path=table.name,
425+
)
426+
self._deprecations.add((target.path, table.name))
427+
428+
table_config_freshness_raw: Optional[Dict] = table.config.get(
429+
"freshness", {}
430+
) # Will only be None if the user explicitly set it to null
431+
table_config_freshness: Optional[FreshnessThreshold] = (
432+
FreshnessThreshold.from_dict(table_config_freshness_raw)
433+
if table_config_freshness_raw is not None
434+
else None
435+
)
436+
437+
return merge_source_freshness(
438+
source_freshness,
439+
source_config_freshness,
440+
table_freshness,
441+
table_config_freshness,
442+
)
443+
421444

422445
def merge_freshness_time_thresholds(
423446
base: Optional[Time], update: Optional[Time]
@@ -457,7 +480,7 @@ def merge_source_freshness(
457480
merged_freshness_obj.error_after = merged_error_after
458481
merged_freshness_obj.warn_after = merged_warn_after
459482
current_merged_value = merged_freshness_obj
460-
elif base is None and update is not None:
483+
elif base is None and bool(update):
461484
# If current_merged_value (base) is None, the update becomes the new value
462485
current_merged_value = update
463486
else: # This covers cases where 'update' is None, or both 'base' and 'update' are None.

tests/functional/sources/fixtures.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
loader: custom
2222
freshness:
2323
warn_after: {count: 18, period: hour}
24+
error_after: {count: 24, period: hour}
2425
config:
2526
freshness: # default freshness, takes precedence over top-level key above
2627
warn_after: {count: 12, period: hour}
@@ -503,3 +504,33 @@
503504
loaded_at_query: "select {{current_timestamp()}}"
504505
505506
"""
507+
508+
freshness_with_explicit_null_in_table_schema_yml = """version: 2
509+
sources:
510+
- name: test_source
511+
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
512+
freshness:
513+
warn_after:
514+
count: 24
515+
period: hour
516+
quoting:
517+
identifier: True
518+
tables:
519+
- name: source_a
520+
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
521+
config:
522+
freshness: null
523+
"""
524+
525+
freshness_with_explicit_null_in_source_schema_yml = """version: 2
526+
sources:
527+
- name: test_source
528+
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
529+
config:
530+
freshness: null
531+
quoting:
532+
identifier: True
533+
tables:
534+
- name: source_a
535+
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
536+
"""

tests/functional/sources/test_source_freshness.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
filtered_models_schema_yml,
2121
freshness_via_custom_sql_schema_yml,
2222
freshness_via_metadata_schema_yml,
23+
freshness_with_explicit_null_in_source_schema_yml,
24+
freshness_with_explicit_null_in_table_schema_yml,
2325
override_freshness_models_schema_yml,
2426
)
2527

@@ -599,3 +601,23 @@ def test_source_freshness_custom_sql(self, project):
599601
"source_b": "warn",
600602
"source_c": "pass",
601603
}
604+
605+
606+
class TestSourceFreshnessExplicitNullInTable(SuccessfulSourceFreshnessTest):
607+
@pytest.fixture(scope="class")
608+
def models(self):
609+
return {"schema.yml": freshness_with_explicit_null_in_table_schema_yml}
610+
611+
def test_source_freshness_explicit_null_in_table(self, project):
612+
result = self.run_dbt_with_vars(project, ["source", "freshness"], expect_pass=True)
613+
assert {r.node.name: r.status for r in result} == {}
614+
615+
616+
class TestSourceFreshnessExplicitNullInSource(SuccessfulSourceFreshnessTest):
617+
@pytest.fixture(scope="class")
618+
def models(self):
619+
return {"schema.yml": freshness_with_explicit_null_in_source_schema_yml}
620+
621+
def test_source_freshness_explicit_null_in_source(self, project):
622+
result = self.run_dbt_with_vars(project, ["source", "freshness"], expect_pass=True)
623+
assert {r.node.name: r.status for r in result} == {}

tests/unit/parser/test_sources.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from typing import List, Optional
2+
3+
import pytest
4+
5+
from core.dbt.artifacts.resources.v1.components import FreshnessThreshold, Time
6+
from core.dbt.parser.sources import merge_source_freshness
7+
8+
9+
class TestMergeSourceFreshness:
10+
@pytest.mark.parametrize(
11+
"thresholds,expected_result",
12+
[
13+
([None, None], None),
14+
(
15+
[
16+
FreshnessThreshold(
17+
warn_after=Time(count=1, period="hour"),
18+
error_after=Time(count=1, period="day"),
19+
),
20+
None,
21+
],
22+
None,
23+
),
24+
(
25+
[
26+
FreshnessThreshold(
27+
warn_after=Time(count=1, period="hour"),
28+
error_after=Time(count=1, period="day"),
29+
),
30+
None,
31+
FreshnessThreshold(),
32+
],
33+
None,
34+
),
35+
(
36+
[
37+
FreshnessThreshold(warn_after=Time(count=1, period="hour")),
38+
FreshnessThreshold(error_after=Time(count=1, period="day")),
39+
],
40+
FreshnessThreshold(
41+
warn_after=Time(count=1, period="hour"),
42+
error_after=Time(count=1, period="day"),
43+
),
44+
),
45+
(
46+
[
47+
None,
48+
FreshnessThreshold(warn_after=Time(count=1, period="hour")),
49+
FreshnessThreshold(error_after=Time(count=1, period="day")),
50+
],
51+
FreshnessThreshold(
52+
warn_after=Time(count=1, period="hour"),
53+
error_after=Time(count=1, period="day"),
54+
),
55+
),
56+
(
57+
[
58+
FreshnessThreshold(
59+
warn_after=Time(count=1, period="hour"),
60+
error_after=Time(count=1, period="day"),
61+
),
62+
FreshnessThreshold(error_after=Time(count=48, period="hour")),
63+
],
64+
FreshnessThreshold(
65+
warn_after=Time(count=1, period="hour"),
66+
error_after=Time(count=48, period="hour"),
67+
),
68+
),
69+
],
70+
)
71+
def test_merge_source_freshness(
72+
self,
73+
thresholds: List[Optional[FreshnessThreshold]],
74+
expected_result: Optional[FreshnessThreshold],
75+
):
76+
result = merge_source_freshness(*thresholds)
77+
assert result == expected_result

0 commit comments

Comments
 (0)