3333
3434from google .cloud .bigquery import _pyarrow_helpers
3535from google .cloud .bigquery import _versions_helpers
36+ from google .cloud .bigquery import retry as bq_retry
3637from google .cloud .bigquery import schema
3738
3839
@@ -740,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types):
740741 return pyarrow .RecordBatch .from_arrays (arrays , names = column_names )
741742
742743
743- def download_arrow_row_iterator (pages , bq_schema ):
744+ def download_arrow_row_iterator (pages , bq_schema , timeout = None ):
744745 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
745746
746747 Args:
@@ -751,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema):
751752 Mapping[str, Any] \
752753 ]]):
753754 A decription of the fields in result pages.
755+ timeout (Optional[float]):
756+ The number of seconds to wait for the underlying download to complete.
757+ If ``None``, wait indefinitely.
758+
754759 Yields:
755760 :class:`pyarrow.RecordBatch`
756761 The next page of records as a ``pyarrow`` record batch.
@@ -759,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema):
759764 column_names = bq_to_arrow_schema (bq_schema ) or [field .name for field in bq_schema ]
760765 arrow_types = [bq_to_arrow_data_type (field ) for field in bq_schema ]
761766
762- for page in pages :
763- yield _row_iterator_page_to_arrow (page , column_names , arrow_types )
767+ if timeout is None :
768+ for page in pages :
769+ yield _row_iterator_page_to_arrow (page , column_names , arrow_types )
770+ else :
771+ start_time = time .monotonic ()
772+ for page in pages :
773+ if time .monotonic () - start_time > timeout :
774+ raise concurrent .futures .TimeoutError ()
775+
776+ yield _row_iterator_page_to_arrow (page , column_names , arrow_types )
764777
765778
766779def _row_iterator_page_to_dataframe (page , column_names , dtypes ):
@@ -778,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes):
778791 return pandas .DataFrame (columns , columns = column_names )
779792
780793
781- def download_dataframe_row_iterator (pages , bq_schema , dtypes ):
794+ def download_dataframe_row_iterator (pages , bq_schema , dtypes , timeout = None ):
782795 """Use HTTP JSON RowIterator to construct a DataFrame.
783796
784797 Args:
@@ -792,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes):
792805 dtypes(Mapping[str, numpy.dtype]):
793806 The types of columns in result data to hint construction of the
794807 resulting DataFrame. Not all column types have to be specified.
808+ timeout (Optional[float]):
809+ The number of seconds to wait for the underlying download to complete.
810+ If ``None``, wait indefinitely.
811+
795812 Yields:
796813 :class:`pandas.DataFrame`
797814 The next page of records as a ``pandas.DataFrame`` record batch.
798815 """
799816 bq_schema = schema ._to_schema_fields (bq_schema )
800817 column_names = [field .name for field in bq_schema ]
801- for page in pages :
802- yield _row_iterator_page_to_dataframe (page , column_names , dtypes )
818+
819+ if timeout is None :
820+ for page in pages :
821+ yield _row_iterator_page_to_dataframe (page , column_names , dtypes )
822+ else :
823+ start_time = time .monotonic ()
824+ for page in pages :
825+ if time .monotonic () - start_time > timeout :
826+ raise concurrent .futures .TimeoutError ()
827+
828+ yield _row_iterator_page_to_dataframe (page , column_names , dtypes )
803829
804830
805831def _bqstorage_page_to_arrow (page ):
@@ -928,6 +954,7 @@ def _download_table_bqstorage(
928954 if "@" in table .table_id :
929955 raise ValueError ("Reading from a specific snapshot is not currently supported." )
930956
957+ start_time = time .monotonic ()
931958 requested_streams = determine_requested_streams (preserve_order , max_stream_count )
932959
933960 requested_session = bigquery_storage .types .stream .ReadSession (
@@ -944,10 +971,16 @@ def _download_table_bqstorage(
944971 ArrowSerializationOptions .CompressionCodec (1 )
945972 )
946973
974+ retry_policy = (
975+ bq_retry .DEFAULT_RETRY .with_deadline (timeout ) if timeout is not None else None
976+ )
977+
947978 session = bqstorage_client .create_read_session (
948979 parent = "projects/{}" .format (project_id ),
949980 read_session = requested_session ,
950981 max_stream_count = requested_streams ,
982+ retry = retry_policy ,
983+ timeout = timeout ,
951984 )
952985
953986 _LOGGER .debug (
@@ -983,8 +1016,6 @@ def _download_table_bqstorage(
9831016 # Manually manage the pool to control shutdown behavior on timeout.
9841017 pool = concurrent .futures .ThreadPoolExecutor (max_workers = max (1 , total_streams ))
9851018 wait_on_shutdown = True
986- start_time = time .time ()
987-
9881019 try :
9891020 # Manually submit jobs and wait for download to complete rather
9901021 # than using pool.map because pool.map continues running in the
@@ -1006,7 +1037,7 @@ def _download_table_bqstorage(
10061037 while not_done :
10071038 # Check for timeout
10081039 if timeout is not None :
1009- elapsed = time .time () - start_time
1040+ elapsed = time .monotonic () - start_time
10101041 if elapsed > timeout :
10111042 wait_on_shutdown = False
10121043 raise concurrent .futures .TimeoutError (
0 commit comments