Skip to content

Commit 44ae5c7

Browse files
Fix DbtCloudRunJobTrigger timeout error message and add final status check
The timeout error message in DbtCloudRunJobTrigger.run() printed self.end_time (an absolute epoch timestamp) labelled as "seconds", producing nonsensical output like "after 1771200015.8 seconds" instead of a meaningful duration. Additionally, the timeout check fired before sleeping, without a final status poll. A job completing during asyncio.sleep() could be incorrectly reported as timed out. Changes: - Move asyncio.sleep() before the timeout check so the trigger sleeps first, then evaluates the deadline. - Add a final is_still_running() call when the timeout fires so that jobs completing at the boundary are handled correctly. - Replace the misleading epoch-as-duration message with a clear "within the configured timeout" message. - Update existing timeout test and add a new test for the edge case where a job completes at the timeout boundary. Closes: #61979 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent a61e923 commit 44ae5c7

File tree

2 files changed

+45
-13
lines changed
  • providers/dbt/cloud
    • src/airflow/providers/dbt/cloud/triggers
    • tests/unit/dbt/cloud/triggers

2 files changed

+45
-13
lines changed

providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
7575
hook = DbtCloudHook(self.conn_id, **self.hook_params)
7676
try:
7777
while await self.is_still_running(hook):
78-
if self.end_time < time.time():
79-
yield TriggerEvent(
80-
{
81-
"status": "error",
82-
"message": f"Job run {self.run_id} has not reached a terminal status after "
83-
f"{self.end_time} seconds.",
84-
"run_id": self.run_id,
85-
}
86-
)
87-
return
8878
await asyncio.sleep(self.poll_interval)
79+
if self.end_time < time.time():
80+
# Final status check: the job may have completed during the sleep.
81+
if await self.is_still_running(hook):
82+
yield TriggerEvent(
83+
{
84+
"status": "error",
85+
"message": f"Job run {self.run_id} has not reached a terminal "
86+
f"status within the configured timeout.",
87+
"run_id": self.run_id,
88+
}
89+
)
90+
return
91+
# Job reached a terminal state — exit loop to handle below.
92+
break
8993
job_run_status = await hook.get_job_status(self.run_id, self.account_id)
9094
if job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
9195
yield TriggerEvent(

providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,13 @@ async def test_dbt_job_run_exception(self, mock_get_job_status, mocked_is_still_
202202
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
203203
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
204204
async def test_dbt_job_run_timeout(self, mock_get_job_status, mocked_is_still_running):
205-
"""Assert that run timeout after end_time elapsed"""
205+
"""Assert that run yields a timeout error after end_time has elapsed."""
206206
mocked_is_still_running.return_value = True
207207
mock_get_job_status.side_effect = Exception("Test exception")
208208
end_time = time.time()
209209
trigger = DbtCloudRunJobTrigger(
210210
conn_id=self.CONN_ID,
211-
poll_interval=self.POLL_INTERVAL,
211+
poll_interval=0.1,
212212
end_time=end_time,
213213
run_id=self.RUN_ID,
214214
account_id=self.ACCOUNT_ID,
@@ -219,7 +219,35 @@ async def test_dbt_job_run_timeout(self, mock_get_job_status, mocked_is_still_ru
219219
{
220220
"status": "error",
221221
"message": f"Job run {self.RUN_ID} has not reached a terminal status "
222-
f"after {end_time} seconds.",
222+
f"within the configured timeout.",
223+
"run_id": self.RUN_ID,
224+
}
225+
)
226+
assert expected == actual
227+
228+
@pytest.mark.asyncio
229+
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
230+
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
231+
async def test_dbt_job_run_timeout_but_job_completes(
232+
self, mock_get_job_status, mocked_is_still_running
233+
):
234+
"""Assert that a job completing at the timeout boundary is treated as success, not timeout."""
235+
mocked_is_still_running.side_effect = [True, False]
236+
mock_get_job_status.return_value = DbtCloudJobRunStatus.SUCCESS.value
237+
end_time = time.time()
238+
trigger = DbtCloudRunJobTrigger(
239+
conn_id=self.CONN_ID,
240+
poll_interval=0.1,
241+
end_time=end_time,
242+
run_id=self.RUN_ID,
243+
account_id=self.ACCOUNT_ID,
244+
)
245+
generator = trigger.run()
246+
actual = await generator.asend(None)
247+
expected = TriggerEvent(
248+
{
249+
"status": "success",
250+
"message": f"Job run {self.RUN_ID} has completed successfully.",
223251
"run_id": self.RUN_ID,
224252
}
225253
)

0 commit comments

Comments
 (0)