Skip to content

Commit 7fdc5c0

Browse files
Track the notebook job status to avoid LRO (#1175)
Co-authored-by: Colin Rogers <[email protected]>
1 parent be0ab62 commit 7fdc5c0

File tree

2 files changed

+60
-13
lines changed

2 files changed

+60
-13
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Track notebook job status to avoid LRO in bigframes mode
3+
time: 2025-06-26T17:13:43.757335863Z
4+
custom:
5+
Author: jialuoo
6+
Issue: "1175"

dbt-bigquery/src/dbt/adapters/bigquery/python_submissions.py

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import inspect
22
import json
33
import re
4+
import time
45
from typing import Any, Dict, Optional, Union
56
import uuid
67

@@ -21,6 +22,7 @@
2122
from dbt.adapters.events.logging import AdapterLogger
2223
from dbt_common.exceptions import DbtRuntimeError
2324

25+
from google.api_core.operation import Operation
2426
from google.auth.transport.requests import Request
2527
from google.cloud import aiplatform_v1
2628
from google.cloud.aiplatform import gapic as aiplatform_gapic
@@ -42,6 +44,9 @@
4244
# This differs from other Python models because the typical 5-minute timeout
4345
# is insufficient for BigFrames processing.
4446
_DEFAULT_BIGFRAMES_TIMEOUT = 60 * 60
47+
# Time interval in seconds between successive polling attempts to check the
48+
# notebook job's status in BigFrames mode.
49+
_COLAB_POLL_INTERVAL = 30
4550

4651

4752
class _BigQueryPythonHelper(PythonJobHelper):
@@ -415,6 +420,39 @@ def submit(self, compiled_code: str) -> None:
415420

416421
self._submit_bigframes_job(notebook_template_id)
417422

423+
def _track_notebook_job_status(self, job_name: str) -> aiplatform_v1.NotebookExecutionJob:
424+
"""Tracks the notebook job until it completes or times out."""
425+
max_wait_time = self._polling_retry.timeout
426+
elapsed = 0
427+
428+
# Please see all the JobState from
429+
# https://cloud.google.com/php/docs/reference/cloud-ai-platform/latest/V1.JobState.
430+
terminal_states = {
431+
aiplatform_gapic.JobState.JOB_STATE_SUCCEEDED,
432+
aiplatform_gapic.JobState.JOB_STATE_PARTIALLY_SUCCEEDED,
433+
aiplatform_gapic.JobState.JOB_STATE_FAILED,
434+
aiplatform_gapic.JobState.JOB_STATE_CANCELLED,
435+
aiplatform_gapic.JobState.JOB_STATE_EXPIRED,
436+
}
437+
438+
while True:
439+
retrieved_job = self._notebook_client.get_notebook_execution_job(name=job_name)
440+
job_state = retrieved_job.job_state
441+
442+
if job_state in terminal_states:
443+
return retrieved_job
444+
445+
if elapsed >= max_wait_time:
446+
raise TimeoutError(
447+
"Operation did not complete within the designated timeout "
448+
f"of {max_wait_time} seconds. Please cancel the related "
449+
"notebook job manually via the GCP console since it might "
450+
"still be actively running."
451+
)
452+
453+
time.sleep(_COLAB_POLL_INTERVAL)
454+
elapsed += _COLAB_POLL_INTERVAL
455+
418456
def _submit_bigframes_job(
419457
self, notebook_template_id: str
420458
) -> aiplatform_v1.NotebookExecutionJob:
@@ -427,33 +465,36 @@ def _submit_bigframes_job(
427465
)
428466

429467
try:
430-
res = self._notebook_client.create_notebook_execution_job(request=request).result(
431-
timeout=self._polling_retry.timeout
468+
operation: Operation = self._notebook_client.create_notebook_execution_job(
469+
request=request
432470
)
433-
retrieved_job = self._notebook_client.get_notebook_execution_job(name=res.name)
471+
lro_name = operation.operation.name
472+
job_name = lro_name.split("/operations/")[0]
473+
retrieved_job = self._track_notebook_job_status(job_name)
434474
except TimeoutError as timeout_error:
435-
raise TimeoutError(
436-
f"The dbt operation encountered a timeout: {timeout_error}\n"
437-
"Please cancel the related notebook job manually via the GCP "
438-
"console since it might still be actively running."
439-
)
475+
raise TimeoutError(f"The dbt operation encountered a timeout: {timeout_error}")
440476
except Exception as e:
441477
raise DbtRuntimeError(f"An unexpected error occured while executing the notebook: {e}")
442478

443-
job_id = res.name.split("/")[-1]
479+
job_id = job_name.split("/")[-1]
444480
gcs_log_uri = f"{notebook_execution_job.gcs_output_uri}/{job_id}/{self._model_name}.py"
445481
self._process_gcs_log(gcs_log_uri)
446482

447-
if retrieved_job.job_state == aiplatform_gapic.JobState.JOB_STATE_FAILED:
483+
if retrieved_job.job_state == aiplatform_gapic.JobState.JOB_STATE_SUCCEEDED:
484+
_logger.info(
485+
f"Colab notebook execution job '{retrieved_job.name}' finished successfully."
486+
)
487+
elif retrieved_job.job_state == aiplatform_gapic.JobState.JOB_STATE_FAILED:
448488
raise DbtRuntimeError(
449489
f"The colab notebook execution job '{retrieved_job.name}' failed."
450490
)
451-
elif retrieved_job.job_state != aiplatform_gapic.JobState.JOB_STATE_SUCCEEDED:
491+
else:
452492
raise DbtRuntimeError(
453-
f"The colab notebook execution job '{retrieved_job.name}' finished with unexpected state: {retrieved_job.job_state.name}"
493+
f"The colab notebook execution job '{retrieved_job.name}' "
494+
f"finished with unexpected state: {retrieved_job.job_state.name}"
454495
)
455496

456-
return self._notebook_client.get_notebook_execution_job(name=res.name)
497+
return self._notebook_client.get_notebook_execution_job(name=job_name)
457498

458499

459500
def _install_packages(packages: list[str]) -> None:

0 commit comments

Comments
 (0)