Skip to content

Commit 4f67ba2

Browse files
authored
fix: add timeout parameter to to_dataframe and to_arrow met… (#2354)
### Description This PR adds a `timeout` parameter to the `to_dataframe()` and `to_arrow()` methods (and their corresponding `*_iterable`, `*_geodataframe` and `QueryJob` wrappers) in the BigQuery client library. This addresses an issue where these methods could hang indefinitely if the underlying BigQuery Storage API stream blocked (e.g., due to firewall issues or network interruptions) during the download phase. The added `timeout` parameter ensures that the download operation respects the specified time limit and raises a `concurrent.futures.TimeoutError` if it exceeds the duration. ### Changes - Modified `google/cloud/bigquery/_pandas_helpers.py`: - Updated `_download_table_bqstorage` to accept a `timeout` argument. - Implemented a timeout check within the result processing loop. - Updated wrapper functions `download_dataframe_bqstorage` and `download_arrow_bqstorage` to accept and pass the `timeout` parameter. - Modified `google/cloud/bigquery/table.py`: - Updated `RowIterator` methods (`to_arrow_iterable`, `to_arrow`, `to_dataframe_iterable`, `to_dataframe`, `to_geodataframe`) to accept and pass `timeout`. - Updated `_EmptyRowIterator` methods to match the `RowIterator` signature, preventing `TypeError` when a timeout is provided for empty result sets. - Modified `google/cloud/bigquery/job/query.py`: - Updated `QueryJob` methods (`to_arrow`, `to_dataframe`, `to_geodataframe`) to accept `timeout` and pass it to the result iterator. - Updated unit tests in `tests/unit/job/test_query_pandas.py`, `tests/unit/test_table.py`, and `tests/unit/test_table_pandas.py` to reflect the signature changes. Fixes internal bug: b/468091307
1 parent 40b4cbf commit 4f67ba2

File tree

7 files changed

+257
-55
lines changed

7 files changed

+257
-55
lines changed

google/cloud/bigquery/_pandas_helpers.py

Lines changed: 75 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import logging
2727
import queue
2828
import threading
29+
import time
2930
import warnings
3031
from typing import Any, Union, Optional, Callable, Generator, List
3132

@@ -869,6 +870,7 @@ def _download_table_bqstorage(
869870
max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
870871
max_stream_count: Optional[int] = None,
871872
download_state: Optional[_DownloadState] = None,
873+
timeout: Optional[float] = None,
872874
) -> Generator[Any, None, None]:
873875
"""Downloads a BigQuery table using the BigQuery Storage API.
874876
@@ -899,13 +901,18 @@ def _download_table_bqstorage(
899901
download_state (Optional[_DownloadState]):
900902
A threadsafe state object which can be used to observe the
901903
behavior of the worker threads created by this method.
904+
timeout (Optional[float]):
905+
The number of seconds to wait for the download to complete.
906+
If None, wait indefinitely.
902907
903908
Yields:
904909
pandas.DataFrame: Pandas DataFrames, one for each chunk of data
905910
downloaded from BigQuery.
906911
907912
Raises:
908913
ValueError: If attempting to read from a specific partition or snapshot.
914+
concurrent.futures.TimeoutError:
915+
If the download does not complete within the specified timeout.
909916
910917
Note:
911918
This method requires the `google-cloud-bigquery-storage` library
@@ -973,60 +980,73 @@ def _download_table_bqstorage(
973980

974981
worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)
975982

976-
with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
977-
try:
978-
# Manually submit jobs and wait for download to complete rather
979-
# than using pool.map because pool.map continues running in the
980-
# background even if there is an exception on the main thread.
981-
# See: https://github.com/googleapis/google-cloud-python/pull/7698
982-
not_done = [
983-
pool.submit(
984-
_download_table_bqstorage_stream,
985-
download_state,
986-
bqstorage_client,
987-
session,
988-
stream,
989-
worker_queue,
990-
page_to_item,
991-
)
992-
for stream in session.streams
993-
]
994-
995-
while not_done:
996-
# Don't block on the worker threads. For performance reasons,
997-
# we want to block on the queue's get method, instead. This
998-
# prevents the queue from filling up, because the main thread
999-
# has smaller gaps in time between calls to the queue's get
1000-
# method. For a detailed explanation, see:
1001-
# https://friendliness.dev/2019/06/18/python-nowait/
1002-
done, not_done = _nowait(not_done)
1003-
for future in done:
1004-
# Call result() on any finished threads to raise any
1005-
# exceptions encountered.
1006-
future.result()
983+
# Manually manage the pool to control shutdown behavior on timeout.
984+
pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
985+
wait_on_shutdown = True
986+
start_time = time.time()
1007987

1008-
try:
1009-
frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
1010-
yield frame
1011-
except queue.Empty: # pragma: NO COVER
1012-
continue
988+
try:
989+
# Manually submit jobs and wait for download to complete rather
990+
# than using pool.map because pool.map continues running in the
991+
# background even if there is an exception on the main thread.
992+
# See: https://github.com/googleapis/google-cloud-python/pull/7698
993+
not_done = [
994+
pool.submit(
995+
_download_table_bqstorage_stream,
996+
download_state,
997+
bqstorage_client,
998+
session,
999+
stream,
1000+
worker_queue,
1001+
page_to_item,
1002+
)
1003+
for stream in session.streams
1004+
]
1005+
1006+
while not_done:
1007+
# Check for timeout
1008+
if timeout is not None:
1009+
elapsed = time.time() - start_time
1010+
if elapsed > timeout:
1011+
wait_on_shutdown = False
1012+
raise concurrent.futures.TimeoutError(
1013+
f"Download timed out after {timeout} seconds."
1014+
)
1015+
1016+
# Don't block on the worker threads. For performance reasons,
1017+
# we want to block on the queue's get method, instead. This
1018+
# prevents the queue from filling up, because the main thread
1019+
# has smaller gaps in time between calls to the queue's get
1020+
# method. For a detailed explanation, see:
1021+
# https://friendliness.dev/2019/06/18/python-nowait/
1022+
done, not_done = _nowait(not_done)
1023+
for future in done:
1024+
# Call result() on any finished threads to raise any
1025+
# exceptions encountered.
1026+
future.result()
1027+
1028+
try:
1029+
frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
1030+
yield frame
1031+
except queue.Empty: # pragma: NO COVER
1032+
continue
10131033

1014-
# Return any remaining values after the workers finished.
1015-
while True: # pragma: NO COVER
1016-
try:
1017-
frame = worker_queue.get_nowait()
1018-
yield frame
1019-
except queue.Empty: # pragma: NO COVER
1020-
break
1021-
finally:
1022-
# No need for a lock because reading/replacing a variable is
1023-
# defined to be an atomic operation in the Python language
1024-
# definition (enforced by the global interpreter lock).
1025-
download_state.done = True
1034+
# Return any remaining values after the workers finished.
1035+
while True: # pragma: NO COVER
1036+
try:
1037+
frame = worker_queue.get_nowait()
1038+
yield frame
1039+
except queue.Empty: # pragma: NO COVER
1040+
break
1041+
finally:
1042+
# No need for a lock because reading/replacing a variable is
1043+
# defined to be an atomic operation in the Python language
1044+
# definition (enforced by the global interpreter lock).
1045+
download_state.done = True
10261046

1027-
# Shutdown all background threads, now that they should know to
1028-
# exit early.
1029-
pool.shutdown(wait=True)
1047+
# Shutdown all background threads, now that they should know to
1048+
# exit early.
1049+
pool.shutdown(wait=wait_on_shutdown)
10301050

10311051

10321052
def download_arrow_bqstorage(
@@ -1037,6 +1057,7 @@ def download_arrow_bqstorage(
10371057
selected_fields=None,
10381058
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
10391059
max_stream_count=None,
1060+
timeout=None,
10401061
):
10411062
return _download_table_bqstorage(
10421063
project_id,
@@ -1047,6 +1068,7 @@ def download_arrow_bqstorage(
10471068
page_to_item=_bqstorage_page_to_arrow,
10481069
max_queue_size=max_queue_size,
10491070
max_stream_count=max_stream_count,
1071+
timeout=timeout,
10501072
)
10511073

10521074

@@ -1060,6 +1082,7 @@ def download_dataframe_bqstorage(
10601082
selected_fields=None,
10611083
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
10621084
max_stream_count=None,
1085+
timeout=None,
10631086
):
10641087
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
10651088
return _download_table_bqstorage(
@@ -1071,6 +1094,7 @@ def download_dataframe_bqstorage(
10711094
page_to_item=page_to_item,
10721095
max_queue_size=max_queue_size,
10731096
max_stream_count=max_stream_count,
1097+
timeout=timeout,
10741098
)
10751099

10761100

google/cloud/bigquery/job/query.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1857,6 +1857,7 @@ def to_arrow(
18571857
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
18581858
create_bqstorage_client: bool = True,
18591859
max_results: Optional[int] = None,
1860+
timeout: Optional[float] = None,
18601861
) -> "pyarrow.Table":
18611862
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
18621863
table or query.
@@ -1904,6 +1905,10 @@ def to_arrow(
19041905
19051906
.. versionadded:: 2.21.0
19061907
1908+
timeout (Optional[float]):
1909+
The number of seconds to wait for the underlying download to complete.
1910+
If ``None``, wait indefinitely.
1911+
19071912
Returns:
19081913
pyarrow.Table
19091914
A :class:`pyarrow.Table` populated with row data and column
@@ -1921,6 +1926,7 @@ def to_arrow(
19211926
progress_bar_type=progress_bar_type,
19221927
bqstorage_client=bqstorage_client,
19231928
create_bqstorage_client=create_bqstorage_client,
1929+
timeout=timeout,
19241930
)
19251931

19261932
# If changing the signature of this method, make sure to apply the same
@@ -1949,6 +1955,7 @@ def to_dataframe(
19491955
range_timestamp_dtype: Union[
19501956
Any, None
19511957
] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
1958+
timeout: Optional[float] = None,
19521959
) -> "pandas.DataFrame":
19531960
"""Return a pandas DataFrame from a QueryJob
19541961
@@ -2141,6 +2148,10 @@ def to_dataframe(
21412148
21422149
.. versionadded:: 3.21.0
21432150
2151+
timeout (Optional[float]):
2152+
The number of seconds to wait for the underlying download to complete.
2153+
If ``None``, wait indefinitely.
2154+
21442155
Returns:
21452156
pandas.DataFrame:
21462157
A :class:`~pandas.DataFrame` populated with row data
@@ -2174,6 +2185,7 @@ def to_dataframe(
21742185
range_date_dtype=range_date_dtype,
21752186
range_datetime_dtype=range_datetime_dtype,
21762187
range_timestamp_dtype=range_timestamp_dtype,
2188+
timeout=timeout,
21772189
)
21782190

21792191
# If changing the signature of this method, make sure to apply the same
@@ -2191,6 +2203,7 @@ def to_geodataframe(
21912203
int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
21922204
float_dtype: Union[Any, None] = None,
21932205
string_dtype: Union[Any, None] = None,
2206+
timeout: Optional[float] = None,
21942207
) -> "geopandas.GeoDataFrame":
21952208
"""Return a GeoPandas GeoDataFrame from a QueryJob
21962209
@@ -2269,6 +2282,9 @@ def to_geodataframe(
22692282
then the data type will be ``numpy.dtype("object")``. BigQuery String
22702283
type can be found at:
22712284
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2285+
timeout (Optional[float]):
2286+
The number of seconds to wait for the underlying download to complete.
2287+
If ``None``, wait indefinitely.
22722288
22732289
Returns:
22742290
geopandas.GeoDataFrame:
@@ -2296,6 +2312,7 @@ def to_geodataframe(
22962312
int_dtype=int_dtype,
22972313
float_dtype=float_dtype,
22982314
string_dtype=string_dtype,
2315+
timeout=timeout,
22992316
)
23002317

23012318
def __iter__(self):

0 commit comments

Comments
 (0)