Skip to content

Commit b022932

Browse files
authored
fix(dagster): fixes job config for trino heartbeat checker (#5453)
1 parent ac481a0 commit b022932

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

warehouse/oso_dagster/assets/sqlmesh/sqlmesh.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -256,21 +256,29 @@ async def sqlmesh_heartbeat_checker(
256256
global_config: ResourceParam[DagsterConfig],
257257
trino: TrinoResource,
258258
heartbeat: HeartBeatResource,
259-
):
259+
) -> None:
260260
from datetime import datetime, timedelta, timezone
261261

262+
now = datetime.now(timezone.utc)
263+
262264
last_heartbeat = await heartbeat.get_last_heartbeat_for("sqlmesh")
263265
if last_heartbeat is None:
264-
return
266+
context.log.info(
267+
"No heartbeat found for sqlmesh now ensuring trino shutdown."
268+
)
269+
last_heartbeat = now - timedelta(
270+
minutes=global_config.sqlmesh_trino_ttl_minutes + 1
271+
)
265272

266273
# Only scale down trino if we're in a k8s environment
267-
if global_config.k8s_enabled is False:
274+
if not global_config.k8s_enabled:
268275
return
269276

270-
now = datetime.now(timezone.utc)
271-
if now - last_heartbeat > timedelta(minutes=30):
277+
if now - last_heartbeat > timedelta(
278+
minutes=global_config.sqlmesh_trino_ttl_minutes
279+
):
272280
context.log.info(
273-
"No heartbeat detected for sqlmesh in the last 30 minutes. Ensuring that producer trino is scaled down."
281+
f"No heartbeat detected for sqlmesh in the last {global_config.sqlmesh_trino_ttl_minutes} minutes. Ensuring that producer trino is scaled down."
274282
)
275283
await trino.ensure_shutdown()
276284
else:

warehouse/oso_dagster/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class DagsterConfig(BaseSettings):
8585
sqlmesh_catalog: str = "iceberg"
8686
sqlmesh_schema: str = "oso"
8787
sqlmesh_bq_export_dataset_id: str = "oso"
88+
89+
# How long to keep trino clusters alive without heartbeats
90+
sqlmesh_trino_ttl_minutes: int = 30
91+
8892
asset_cache_enabled: bool = False
8993
asset_cache_dir: str = ""
9094
asset_cache_default_ttl_seconds: int = 60 * 15

0 commit comments

Comments
 (0)