Skip to content

Commit 8e4665e

Browse files
authored
added timeout for Direct_Read with Bigquery IO (#35284)
* added timeout for Direct_Read with Bigquery IO * changed the default to 1 day * minor comment * use None as the timeout default
1 parent 3736c9b commit 8e4665e

File tree

1 file changed

+23
-9
lines changed

1 file changed

+23
-9
lines changed

sdks/python/apache_beam/io/gcp/bigquery.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,8 @@ def __init__(
987987
kms_key: Optional[str] = None,
988988
temp_dataset: Optional[DatasetReference] = None,
989989
temp_table: Optional[TableReference] = None,
990-
use_native_datetime: Optional[bool] = False):
990+
use_native_datetime: Optional[bool] = False,
991+
timeout: Optional[float] = None):
991992

992993
if table is not None and query is not None:
993994
raise ValueError(
@@ -1022,6 +1023,7 @@ def __init__(
10221023
self.temp_table = temp_table
10231024
self.query_priority = query_priority
10241025
self.use_native_datetime = use_native_datetime
1026+
self.timeout = timeout
10251027
self._job_name = job_name or 'BQ_DIRECT_READ_JOB'
10261028
self._step_name = step_name
10271029
self._source_uuid = unique_id
@@ -1213,7 +1215,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
12131215

12141216
self.split_result = [
12151217
_CustomBigQueryStorageStreamSource(
1216-
stream.name, self.use_native_datetime)
1218+
stream.name, self.use_native_datetime, self.timeout)
12171219
for stream in read_session.streams
12181220
]
12191221

@@ -1245,9 +1247,13 @@ class _CustomBigQueryStorageStreamSource(BoundedSource):
12451247
THROTTLE_COUNTER = Metrics.counter(__name__, 'cumulativeThrottlingSeconds')
12461248

12471249
def __init__(
1248-
self, read_stream_name: str, use_native_datetime: Optional[bool] = True):
1250+
self,
1251+
read_stream_name: str,
1252+
use_native_datetime: Optional[bool] = True,
1253+
timeout: Optional[float] = None):
12491254
self.read_stream_name = read_stream_name
12501255
self.use_native_datetime = use_native_datetime
1256+
self.timeout = timeout
12511257

12521258
def display_data(self):
12531259
return {
@@ -1307,10 +1313,12 @@ def retry_delay_callback(delay):
13071313
def read_arrow(self):
13081314

13091315
storage_client = bq_storage.BigQueryReadClient()
1316+
read_rows_kwargs = {'retry_delay_callback': self.retry_delay_callback}
1317+
if self.timeout is not None:
1318+
read_rows_kwargs['timeout'] = self.timeout
13101319
row_iter = iter(
1311-
storage_client.read_rows(
1312-
self.read_stream_name,
1313-
retry_delay_callback=self.retry_delay_callback).rows())
1320+
storage_client.read_rows(self.read_stream_name,
1321+
**read_rows_kwargs).rows())
13141322
row = next(row_iter, None)
13151323
# Handling the case where the user might provide very selective filters
13161324
# which can result in read_rows_response being empty.
@@ -1324,10 +1332,11 @@ def read_arrow(self):
13241332

13251333
def read_avro(self):
13261334
storage_client = bq_storage.BigQueryReadClient()
1335+
read_rows_kwargs = {'retry_delay_callback': self.retry_delay_callback}
1336+
if self.timeout is not None:
1337+
read_rows_kwargs['timeout'] = self.timeout
13271338
read_rows_iterator = iter(
1328-
storage_client.read_rows(
1329-
self.read_stream_name,
1330-
retry_delay_callback=self.retry_delay_callback))
1339+
storage_client.read_rows(self.read_stream_name, **read_rows_kwargs))
13311340
# Handling the case where the user might provide very selective filters
13321341
# which can result in read_rows_response being empty.
13331342
first_read_rows_response = next(read_rows_iterator, None)
@@ -2732,6 +2741,8 @@ class ReadFromBigQuery(PTransform):
27322741
directly from BigQuery storage using the BigQuery Read API
27332742
(https://cloud.google.com/bigquery/docs/reference/storage). If
27342743
unspecified, the default is currently EXPORT.
2744+
timeout (float): The timeout for the read operation in seconds. This only
2745+
impacts DIRECT_READ. If None, the client default will be used.
27352746
use_native_datetime (bool): By default this transform exports BigQuery
27362747
DATETIME fields as formatted strings (for example:
27372748
2021-01-01T12:59:59). If :data:`True`, BigQuery DATETIME fields will
@@ -2824,13 +2835,16 @@ def __init__(
28242835
method=None,
28252836
use_native_datetime=False,
28262837
output_type=None,
2838+
timeout=None,
28272839
*args,
28282840
**kwargs):
28292841
self.method = method or ReadFromBigQuery.Method.EXPORT
28302842
self.use_native_datetime = use_native_datetime
28312843
self.output_type = output_type
28322844
self._args = args
28332845
self._kwargs = kwargs
2846+
if timeout is not None:
2847+
self._kwargs['timeout'] = timeout
28342848

28352849
if self.method == ReadFromBigQuery.Method.EXPORT \
28362850
and self.use_native_datetime is True:

0 commit comments

Comments
 (0)