Skip to content

Commit e1c384e

Browse files
authored
refactor: move query and wait logic to separate module (#720)
This prepares the way for using the `query_and_wait` method built-in to the client library when available. Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-pandas/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Towards #710 🦕
1 parent 0f54519 commit e1c384e

File tree

5 files changed

+286
-197
lines changed

5 files changed

+286
-197
lines changed

pandas_gbq/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,10 @@ class PerformanceWarning(RuntimeWarning):
3535
Such warnings can occur when dependencies for the requested feature
3636
aren't up-to-date.
3737
"""
38+
39+
40+
class QueryTimeout(ValueError):
41+
"""
42+
Raised when the query request exceeds the timeoutMs value specified in the
43+
BigQuery configuration.
44+
"""

pandas_gbq/gbq.py

Lines changed: 19 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# license that can be found in the LICENSE file.
44

55
import copy
6-
import concurrent.futures
76
from datetime import datetime
87
import logging
98
import re
@@ -20,8 +19,9 @@
2019
if typing.TYPE_CHECKING: # pragma: NO COVER
2120
import pandas
2221

23-
from pandas_gbq.exceptions import AccessDenied, GenericGBQException
22+
from pandas_gbq.exceptions import GenericGBQException, QueryTimeout
2423
from pandas_gbq.features import FEATURES
24+
import pandas_gbq.query
2525
import pandas_gbq.schema
2626
import pandas_gbq.timestamp
2727

@@ -74,8 +74,6 @@ class DatasetCreationError(ValueError):
7474
Raised when the create dataset method fails
7575
"""
7676

77-
pass
78-
7977

8078
class InvalidColumnOrder(ValueError):
8179
"""
@@ -84,8 +82,6 @@ class InvalidColumnOrder(ValueError):
8482
returned by BigQuery.
8583
"""
8684

87-
pass
88-
8985

9086
class InvalidIndexColumn(ValueError):
9187
"""
@@ -94,17 +90,13 @@ class InvalidIndexColumn(ValueError):
9490
returned by BigQuery.
9591
"""
9692

97-
pass
98-
9993

10094
class InvalidPageToken(ValueError):
10195
"""
10296
Raised when Google BigQuery fails to return,
10397
or returns a duplicate page token.
10498
"""
10599

106-
pass
107-
108100

109101
class InvalidSchema(ValueError):
110102
"""
@@ -127,17 +119,6 @@ class NotFoundException(ValueError):
127119
not be found.
128120
"""
129121

130-
pass
131-
132-
133-
class QueryTimeout(ValueError):
134-
"""
135-
Raised when the query request exceeds the timeoutMs value specified in the
136-
BigQuery configuration.
137-
"""
138-
139-
pass
140-
141122

142123
class TableCreationError(ValueError):
143124
"""
@@ -340,10 +321,6 @@ def __init__(
340321
self.client = self.get_client()
341322
self.use_bqstorage_api = use_bqstorage_api
342323

343-
# BQ Queries costs $5 per TB. First 1 TB per month is free
344-
# see here for more: https://cloud.google.com/bigquery/pricing
345-
self.query_price_for_TB = 5.0 / 2**40 # USD/TB
346-
347324
def _start_timer(self):
348325
self.start = time.time()
349326

@@ -355,16 +332,6 @@ def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6):
355332
if sec > overlong:
356333
logger.info("{} {} {}".format(prefix, sec, postfix))
357334

358-
# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
359-
@staticmethod
360-
def sizeof_fmt(num, suffix="B"):
361-
fmt = "%3.1f %s%s"
362-
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
363-
if abs(num) < 1024.0:
364-
return fmt % (num, unit, suffix)
365-
num /= 1024.0
366-
return fmt % (num, "Y", suffix)
367-
368335
def get_client(self):
369336
import google.api_core.client_info
370337
import pandas
@@ -421,46 +388,10 @@ def download_table(
421388
user_dtypes=dtypes,
422389
)
423390

424-
def _wait_for_query_job(self, query_reply, timeout_ms):
425-
"""Wait for query to complete, pausing occasionally to update progress.
426-
427-
Args:
428-
query_reply (QueryJob):
429-
A query job which has started.
430-
431-
timeout_ms (Optional[int]):
432-
How long to wait before cancelling the query.
433-
"""
434-
# Wait at most 10 seconds so we can show progress.
435-
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
436-
# Include a tqdm progress bar here instead of a stream of log messages.
437-
timeout_sec = 10.0
438-
if timeout_ms:
439-
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)
440-
441-
while query_reply.state != "DONE":
442-
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")
443-
444-
if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
445-
self.client.cancel_job(
446-
query_reply.job_id, location=query_reply.location
447-
)
448-
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))
449-
450-
try:
451-
query_reply.result(timeout=timeout_sec)
452-
except concurrent.futures.TimeoutError:
453-
# Use our own timeout logic
454-
pass
455-
except self.http_error as ex:
456-
self.process_http_error(ex)
457-
458391
def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
459-
from google.auth.exceptions import RefreshError
460392
from google.cloud import bigquery
461-
import pandas
462393

463-
job_config = {
394+
job_config_dict = {
464395
"query": {
465396
"useLegacySql": self.dialect
466397
== "legacy"
@@ -470,74 +401,27 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
470401
}
471402
config = kwargs.get("configuration")
472403
if config is not None:
473-
job_config.update(config)
474-
475-
self._start_timer()
404+
job_config_dict.update(config)
476405

477-
try:
478-
logger.debug("Requesting query... ")
479-
query_reply = self.client.query(
480-
query,
481-
job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
482-
location=self.location,
483-
project=self.project_id,
484-
)
485-
logger.debug("Query running...")
486-
except (RefreshError, ValueError) as ex:
487-
if self.private_key:
488-
raise AccessDenied(
489-
f"The service account credentials are not valid: {ex}"
490-
)
491-
else:
492-
raise AccessDenied(
493-
"The credentials have been revoked or expired, "
494-
f"please re-run the application to re-authorize: {ex}"
495-
)
496-
except self.http_error as ex:
497-
self.process_http_error(ex)
498-
499-
job_id = query_reply.job_id
500-
logger.debug("Job ID: %s" % job_id)
501-
502-
timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
503-
"timeoutMs"
504-
)
406+
timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[
407+
"query"
408+
].get("timeoutMs")
505409
timeout_ms = int(timeout_ms) if timeout_ms else None
506-
self._wait_for_query_job(query_reply, timeout_ms)
507410

508-
if query_reply.cache_hit:
509-
logger.debug("Query done.\nCache hit.\n")
510-
else:
511-
bytes_processed = query_reply.total_bytes_processed or 0
512-
bytes_billed = query_reply.total_bytes_billed or 0
513-
logger.debug(
514-
"Query done.\nProcessed: {} Billed: {}".format(
515-
self.sizeof_fmt(bytes_processed),
516-
self.sizeof_fmt(bytes_billed),
517-
)
518-
)
519-
logger.debug(
520-
"Standard price: ${:,.2f} USD\n".format(
521-
bytes_billed * self.query_price_for_TB
522-
)
523-
)
411+
self._start_timer()
412+
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
413+
rows_iter = pandas_gbq.query.query_and_wait(
414+
self,
415+
self.client,
416+
query,
417+
location=self.location,
418+
project_id=self.project_id,
419+
job_config=job_config,
420+
max_results=max_results,
421+
timeout_ms=timeout_ms,
422+
)
524423

525424
dtypes = kwargs.get("dtypes")
526-
527-
# Ensure destination is populated.
528-
try:
529-
query_reply.result()
530-
except self.http_error as ex:
531-
self.process_http_error(ex)
532-
533-
# Avoid attempting to download results from DML queries, which have no
534-
# destination.
535-
if query_reply.destination is None:
536-
return pandas.DataFrame()
537-
538-
rows_iter = self.client.list_rows(
539-
query_reply.destination, max_results=max_results
540-
)
541425
return self._download_results(
542426
rows_iter,
543427
max_results=max_results,

0 commit comments

Comments
 (0)