Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 83bb413

Browse files
Add retry factory to consolidate retry strategies across dbt-bigquery (#1395)
* fix imports * create a retry factory and move relevant objects from connections * add on_error method for deadline retries * remove dependency on retry_and_handle from cancel_open * remove dependencies on retry_and_handle * remove timeout methods from connection manager * add retry to get_bq_table * move client factory to credentials module so that on_error can be moved to the retry factory in the retry module * move on_error factory to retry module * move client factories from python_submissions module to credentials module * create a clients module * retry all client factories by default * move polling from manual check in python_submissions module into retry_factory * move load_dataframe logic from adapter to connection manager, use the built-in timeout argument instead of a manual polling method * move upload_file logic from adapter to connection manager, use the built-in timeout argument instead of a manual polling method, remove the manual polling method * move the retry to polling for done instead of create * align new retries with original methods, simplify retry factory * create a method for the dataproc endpoint * make imports explicit, remove unused constant * update names in clients.py to follow the naming convention * update names in connections.py to follow the naming convention * update names in credentials.py to follow the naming convention * update names in python_submissions.py to follow the naming convention * update names in retry.py to follow the naming convention --------- Co-authored-by: Colin Rogers <[email protected]>
1 parent 75142ac commit 83bb413

File tree

15 files changed

+713
-715
lines changed

15 files changed

+713
-715
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Under the Hood
2+
body: Create a retry factory to simplify retry strategies across dbt-bigquery
3+
time: 2024-11-07T14:38:56.210445-05:00
4+
custom:
5+
Author: mikealfare osalama
6+
Issue: "1395"

dbt/adapters/bigquery/clients.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from google.api_core.client_info import ClientInfo
2+
from google.api_core.client_options import ClientOptions
3+
from google.api_core.retry import Retry
4+
from google.auth.exceptions import DefaultCredentialsError
5+
from google.cloud.bigquery import Client as BigQueryClient
6+
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
7+
from google.cloud.storage import Client as StorageClient
8+
9+
from dbt.adapters.events.logging import AdapterLogger
10+
11+
import dbt.adapters.bigquery.__version__ as dbt_version
12+
from dbt.adapters.bigquery.credentials import (
13+
BigQueryCredentials,
14+
create_google_credentials,
15+
set_default_credentials,
16+
)
17+
18+
19+
_logger = AdapterLogger("BigQuery")
20+
21+
22+
def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
23+
try:
24+
return _create_bigquery_client(credentials)
25+
except DefaultCredentialsError:
26+
_logger.info("Please log into GCP to continue")
27+
set_default_credentials()
28+
return _create_bigquery_client(credentials)
29+
30+
31+
@Retry() # google decorator. retries on transient errors with exponential backoff
32+
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
33+
return StorageClient(
34+
project=credentials.execution_project,
35+
credentials=create_google_credentials(credentials),
36+
)
37+
38+
39+
@Retry() # google decorator. retries on transient errors with exponential backoff
40+
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
41+
return JobControllerClient(
42+
credentials=create_google_credentials(credentials),
43+
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
44+
)
45+
46+
47+
@Retry() # google decorator. retries on transient errors with exponential backoff
48+
def create_dataproc_batch_controller_client(
49+
credentials: BigQueryCredentials,
50+
) -> BatchControllerClient:
51+
return BatchControllerClient(
52+
credentials=create_google_credentials(credentials),
53+
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
54+
)
55+
56+
57+
@Retry() # google decorator. retries on transient errors with exponential backoff
58+
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
59+
return BigQueryClient(
60+
credentials.execution_project,
61+
create_google_credentials(credentials),
62+
location=getattr(credentials, "location", None),
63+
client_info=ClientInfo(user_agent=f"dbt-bigquery-{dbt_version.version}"),
64+
client_options=ClientOptions(quota_project_id=credentials.quota_project),
65+
)
66+
67+
68+
def _dataproc_endpoint(credentials: BigQueryCredentials) -> str:
69+
return f"{credentials.dataproc_region}-dataproc.googleapis.com:443"

0 commit comments

Comments
 (0)