@@ -1385,7 +1385,10 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
13851385 raise
13861386
13871387 def _reload_query_results (
1388- self , retry : "retries.Retry" = DEFAULT_RETRY , timeout : Optional [float ] = None
1388+ self ,
1389+ retry : "retries.Retry" = DEFAULT_RETRY ,
1390+ timeout : Optional [float ] = None ,
1391+ page_size : int = 0 ,
13891392 ):
13901393 """Refresh the cached query results unless already cached and complete.
13911394
@@ -1395,6 +1398,9 @@ def _reload_query_results(
13951398 timeout (Optional[float]):
13961399 The number of seconds to wait for the underlying HTTP transport
13971400 before using ``retry``.
1401+ page_size (int):
1402+ Maximum number of rows in a single response. See maxResults in
1403+ the jobs.getQueryResults REST API.
13981404 """
13991405 # Optimization: avoid a call to jobs.getQueryResults if it's already
14001406 # been fetched, e.g. from jobs.query first page of results.
@@ -1425,7 +1431,14 @@ def _reload_query_results(
14251431
14261432 # If an explicit timeout is not given, fall back to the transport timeout
14271433 # stored in _blocking_poll() in the process of polling for job completion.
1428- transport_timeout = timeout if timeout is not None else self ._transport_timeout
1434+ if timeout is not None :
1435+ transport_timeout = timeout
1436+ else :
1437+ transport_timeout = self ._transport_timeout
1438+
1439+ # Handle PollingJob._DEFAULT_VALUE.
1440+ if not isinstance (transport_timeout , (float , int )):
1441+ transport_timeout = None
14291442
14301443 self ._query_results = self ._client ._get_query_results (
14311444 self .job_id ,
@@ -1434,6 +1447,7 @@ def _reload_query_results(
14341447 timeout_ms = timeout_ms ,
14351448 location = self .location ,
14361449 timeout = transport_timeout ,
1450+ page_size = page_size ,
14371451 )
14381452
14391453 def result ( # type: ignore # (incompatible with supertype)
@@ -1515,11 +1529,25 @@ def result( # type: ignore # (incompatible with supertype)
15151529 # actually correspond to a finished query job.
15161530 )
15171531
1532+ # Setting max_results should be equivalent to setting page_size with
1533+ # regards to allowing the user to tune how many results to download
1534+ # while we wait for the query to finish. See internal issue:
1535+ # 344008814.
1536+ if page_size is None and max_results is not None :
1537+ page_size = max_results
1538+
15181539 # When timeout has default sentinel value ``object()``, do not pass
15191540 # anything to invoke default timeouts in subsequent calls.
1520- kwargs : Dict [str , Union [_helpers .TimeoutType , object ]] = {}
1541+ done_kwargs : Dict [str , Union [_helpers .TimeoutType , object ]] = {}
1542+ reload_query_results_kwargs : Dict [str , Union [_helpers .TimeoutType , object ]] = {}
1543+ list_rows_kwargs : Dict [str , Union [_helpers .TimeoutType , object ]] = {}
15211544 if type (timeout ) is not object :
1522- kwargs ["timeout" ] = timeout
1545+ done_kwargs ["timeout" ] = timeout
1546+ list_rows_kwargs ["timeout" ] = timeout
1547+ reload_query_results_kwargs ["timeout" ] = timeout
1548+
1549+ if page_size is not None :
1550+ reload_query_results_kwargs ["page_size" ] = page_size
15231551
15241552 try :
15251553 retry_do_query = getattr (self , "_retry_do_query" , None )
@@ -1562,7 +1590,7 @@ def is_job_done():
15621590 # rateLimitExceeded errors are ambiguous. We want to know if
15631591 # the query job failed and not just the call to
15641592 # jobs.getQueryResults.
1565- if self .done (retry = retry , ** kwargs ):
1593+ if self .done (retry = retry , ** done_kwargs ):
15661594 # If it's already failed, we might as well stop.
15671595 job_failed_exception = self .exception ()
15681596 if job_failed_exception is not None :
@@ -1599,14 +1627,16 @@ def is_job_done():
15991627 # response from the REST API. This ensures we aren't
16001628 # making any extra API calls if the previous loop
16011629 # iteration fetched the finished job.
1602- self ._reload_query_results (retry = retry , ** kwargs )
1630+ self ._reload_query_results (
1631+ retry = retry , ** reload_query_results_kwargs
1632+ )
16031633 return True
16041634
16051635 # Call jobs.getQueryResults with max results set to 0 just to
16061636 # wait for the query to finish. Unlike most methods,
16071637 # jobs.getQueryResults hangs as long as it can to ensure we
16081638 # know when the query has finished as soon as possible.
1609- self ._reload_query_results (retry = retry , ** kwargs )
1639+ self ._reload_query_results (retry = retry , ** reload_query_results_kwargs )
16101640
16111641 # Even if the query is finished now according to
16121642 # jobs.getQueryResults, we'll want to reload the job status if
@@ -1679,8 +1709,9 @@ def is_job_done():
16791709 # We know that there's at least 1 row, so only treat the response from
16801710 # jobs.getQueryResults / jobs.query as the first page of the
16811711 # RowIterator response if there are any rows in it. This prevents us
1682- # from stopping the iteration early because we're missing rows and
1683- # there's no next page token.
1712+ # from stopping the iteration early in the cases where we set
1713+ # maxResults=0. In that case, we're missing rows and there's no next
1714+ # page token.
16841715 first_page_response = self ._query_results ._properties
16851716 if "rows" not in first_page_response :
16861717 first_page_response = None
@@ -1699,7 +1730,7 @@ def is_job_done():
16991730 query_id = self .query_id ,
17001731 first_page_response = first_page_response ,
17011732 num_dml_affected_rows = self ._query_results .num_dml_affected_rows ,
1702- ** kwargs ,
1733+ ** list_rows_kwargs ,
17031734 )
17041735 rows ._preserve_order = _contains_order_by (self .query )
17051736 return rows
0 commit comments