Skip to content

Commit 1ab6124

Browse files
authored
Merge pull request #17 from rogiervandergeer/fix/ca-refresh-end-offset
Fix: Continuous Aggregate watermark in the future on full-refresh
2 parents e6fc06d + 7ca6e74 commit 1ab6124

File tree

2 files changed

+78
-1
lines changed

2 files changed

+78
-1
lines changed

dbt/include/timescaledb/macros/relations/continuous_aggregate.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{% macro do_refresh_continuous_aggregate(relation) %}
22
{% call statement('refresh', fetch_result=False, auto_begin=False) %}
3+
{%- set end_offset = config.get('refresh_policy', {}).get('end_offset') -%}
34
{{ adapter.marker_run_outside_transaction() }}
4-
call refresh_continuous_aggregate('{{ relation }}', null, null);
5+
call refresh_continuous_aggregate('{{ relation }}', null, {{ end_offset or 'null' }});
56
{% endcall %}
67
{% endmacro %}
78

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from typing import Any
2+
3+
import pytest
4+
5+
from dbt.tests.fixtures.project import TestProjInfo
6+
from dbt.tests.util import (
7+
check_result_nodes_by_name,
8+
run_dbt,
9+
)
10+
11+
12+
class TestContinuousAggregateWatermark:
13+
@pytest.fixture(scope="class")
14+
def project_config_update(self) -> dict[str, Any]:
15+
return {
16+
"name": "continuous_aggregate_tests",
17+
"models": {
18+
"continuous_aggregate_tests": {
19+
"base": {"+materialized": "hypertable", "+main_dimension": "time_column"},
20+
"test_model": {
21+
"+materialized": "continuous_aggregate",
22+
"+refresh_policy": {
23+
"start_offset": "interval '1 month'",
24+
"end_offset": "interval '1 day'",
25+
"schedule_interval": "interval '3 day'",
26+
},
27+
},
28+
}
29+
},
30+
}
31+
32+
@pytest.fixture(scope="class")
33+
def models(self) -> dict[str, Any]:
34+
return {
35+
"base.sql": "select current_timestamp as time_column, 1 as value",
36+
"test_model.sql": """
37+
select
38+
time_bucket(interval '1 day', time_column) as bucket,
39+
sum(value) as total_value
40+
from {{ ref('base') }}
41+
group by 1
42+
""",
43+
}
44+
45+
def test_continuous_aggregate_watermark(self, project: TestProjInfo, unique_schema: str) -> None:
46+
results = run_dbt(["run"])
47+
assert len(results) == 2
48+
check_result_nodes_by_name(results, ["base", "test_model"])
49+
50+
materialization_hypertable_id = project.run_sql(
51+
f"""
52+
SELECT id FROM _timescaledb_catalog.hypertable
53+
WHERE table_name=(
54+
SELECT materialization_hypertable_name
55+
FROM timescaledb_information.continuous_aggregates
56+
WHERE view_schema = '{unique_schema}'
57+
AND view_name = 'test_model'
58+
)
59+
""",
60+
fetch="one",
61+
)[0]
62+
63+
is_watermark_valid = project.run_sql(
64+
f"""
65+
WITH w AS (
66+
SELECT _timescaledb_functions.cagg_watermark({materialization_hypertable_id}) as watermark_raw
67+
)
68+
SELECT
69+
w.watermark_raw IS NULL OR
70+
_timescaledb_functions.to_timestamp(w.watermark_raw) < now()
71+
FROM w
72+
""",
73+
fetch="one",
74+
)[0]
75+
76+
assert is_watermark_valid, "Watermark is in the future"

0 commit comments

Comments
 (0)