Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 155 additions & 6 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,22 @@
predicates where it is safe to generate a new query ID.
"""

from __future__ import annotations

import copy
import dataclasses
import datetime
import functools
import uuid
import textwrap
from typing import Any, Dict, Optional, TYPE_CHECKING, Union
from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union
import warnings

import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import job
import google.cloud.bigquery.job.query
import google.cloud.bigquery.query
from google.cloud.bigquery import table
import google.cloud.bigquery.retry
Expand Down Expand Up @@ -116,14 +121,21 @@ def query_jobs_insert(
retry: Optional[retries.Retry],
timeout: Optional[float],
job_retry: Optional[retries.Retry],
*,
callback: Callable = lambda _: None,
) -> job.QueryJob:
"""Initiate a query using jobs.insert.

See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert

Args:
callback (Callable):
A callback function used by bigframes to report query progress.
"""
job_id_given = job_id is not None
job_id_save = job_id
job_config_save = job_config
query_sent_factory = QuerySentEventFactory()

def do_query():
# Make a copy now, so that original doesn't get changed by the process
Expand All @@ -136,6 +148,16 @@ def do_query():

try:
query_job._begin(retry=retry, timeout=timeout)
if job_config is not None and not job_config.dry_run:
callback(
query_sent_factory(
query=query,
billing_project=query_job.project,
location=query_job.location,
job_id=query_job.job_id,
request_id=None,
)
)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
Expand Down Expand Up @@ -396,6 +418,7 @@ def query_and_wait(
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
max_results: Optional[int] = None,
callback: Callable = lambda _: None,
) -> table.RowIterator:
"""Run the query, wait for it to finish, and return the results.

Expand All @@ -415,9 +438,8 @@ def query_and_wait(
location (Optional[str]):
Location where to run the job. Must match the location of the
table used in the query as well as the destination table.
project (Optional[str]):
Project ID of the project of where to run the job. Defaults
to the client's project.
project (str):
Project ID of the project of where to run the job.
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Expand All @@ -441,6 +463,8 @@ def query_and_wait(
request. Non-positive values are ignored.
max_results (Optional[int]):
The maximum total number of rows from this request.
callback (Callable):
A callback function used by bigframes to report query progress.

Returns:
google.cloud.bigquery.table.RowIterator:
Expand Down Expand Up @@ -479,12 +503,14 @@ def query_and_wait(
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
callback=callback,
),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
callback=callback,
)

path = _to_query_path(project)
Expand All @@ -496,10 +522,24 @@ def query_and_wait(
if client.default_job_creation_mode:
request_body["jobCreationMode"] = client.default_job_creation_mode

query_sent_factory = QuerySentEventFactory()

def do_query():
request_body["requestId"] = make_job_id()
request_id = make_job_id()
request_body["requestId"] = request_id
span_attributes = {"path": path}

if "dryRun" not in request_body:
callback(
query_sent_factory(
query=query,
billing_project=project,
location=location,
job_id=None,
request_id=request_id,
)
)

# For easier testing, handle the retries ourselves.
if retry is not None:
response = retry(client._call_api)(
Expand Down Expand Up @@ -542,8 +582,25 @@ def do_query():
retry=retry,
page_size=page_size,
max_results=max_results,
callback=callback,
)

if "dryRun" not in request_body:
callback(
QueryFinishedEvent(
billing_project=project,
location=query_results.location,
query_id=query_results.query_id,
job_id=query_results.job_id,
total_rows=query_results.total_rows,
total_bytes_processed=query_results.total_bytes_processed,
slot_millis=query_results.slot_millis,
destination=None,
created=query_results.created,
started=query_results.started,
ended=query_results.ended,
)
)
return table.RowIterator(
client=client,
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
Expand Down Expand Up @@ -614,19 +671,52 @@ def _wait_or_cancel(
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
*,
callback: Callable = lambda _: None,
) -> table.RowIterator:
"""Wait for a job to complete and return the results.

If we can't return the results within the ``wait_timeout``, try to cancel
the job.
"""
try:
return job.result(
if not job.dry_run:
callback(
QueryReceivedEvent(
billing_project=job.project,
location=job.location,
job_id=job.job_id,
statement_type=job.statement_type,
state=job.state,
query_plan=job.query_plan,
created=job.created,
started=job.started,
ended=job.ended,
)
)
query_results = job.result(
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
)
if not job.dry_run:
callback(
QueryFinishedEvent(
billing_project=job.project,
location=query_results.location,
query_id=query_results.query_id,
job_id=query_results.job_id,
total_rows=query_results.total_rows,
total_bytes_processed=query_results.total_bytes_processed,
slot_millis=query_results.slot_millis,
destination=job.destination,
created=job.created,
started=job.started,
ended=job.ended,
)
)
return query_results
except Exception:
# Attempt to cancel the job since we can't return the results.
try:
Expand All @@ -635,3 +725,62 @@ def _wait_or_cancel(
# Don't eat the original exception if cancel fails.
pass
raise


@dataclasses.dataclass(frozen=True)
class QueryFinishedEvent:
"""Query finished successfully."""

billing_project: Optional[str]
location: Optional[str]
query_id: Optional[str]
job_id: Optional[str]
destination: Optional[table.TableReference]
total_rows: Optional[int]
total_bytes_processed: Optional[int]
slot_millis: Optional[int]
created: Optional[datetime.datetime]
started: Optional[datetime.datetime]
ended: Optional[datetime.datetime]


@dataclasses.dataclass(frozen=True)
class QueryReceivedEvent:
"""Query received and acknowledged by the BigQuery API."""

billing_project: Optional[str]
location: Optional[str]
job_id: Optional[str]
statement_type: Optional[str]
state: Optional[str]
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]]
created: Optional[datetime.datetime]
started: Optional[datetime.datetime]
ended: Optional[datetime.datetime]


@dataclasses.dataclass(frozen=True)
class QuerySentEvent:
"""Query sent to BigQuery."""

query: str
billing_project: Optional[str]
location: Optional[str]
job_id: Optional[str]
request_id: Optional[str]


class QueryRetryEvent(QuerySentEvent):
"""Query sent another time because the previous attempt failed."""


class QuerySentEventFactory:
"""Creates a QuerySentEvent first, then QueryRetryEvent after that."""

def __init__(self):
self._event_constructor = QuerySentEvent

def __call__(self, **kwargs):
result = self._event_constructor(**kwargs)
self._event_constructor = QueryRetryEvent
return result
40 changes: 38 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Client for interacting with the Google BigQuery API."""

from __future__ import absolute_import
from __future__ import annotations
from __future__ import division

from collections import abc as collections_abc
Expand All @@ -31,6 +32,7 @@
import typing
from typing import (
Any,
Callable,
Dict,
IO,
Iterable,
Expand Down Expand Up @@ -3633,8 +3635,8 @@ def query_and_wait(
rate-limit-exceeded errors. Passing ``None`` disables
job retry. Not all jobs can be retried.
page_size (Optional[int]):
The maximum number of rows in each page of results from this
request. Non-positive values are ignored.
The maximum number of rows in each page of results from the
initial jobs.query request. Non-positive values are ignored.
max_results (Optional[int]):
The maximum total number of rows from this request.

Expand All @@ -3656,6 +3658,39 @@ def query_and_wait(
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
return self._query_and_wait_bigframes(
query,
job_config=job_config,
location=location,
project=project,
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
job_retry=job_retry,
page_size=page_size,
max_results=max_results,
)

def _query_and_wait_bigframes(
self,
query,
*,
job_config: Optional[QueryJobConfig] = None,
location: Optional[str] = None,
project: Optional[str] = None,
api_timeout: TimeoutType = DEFAULT_TIMEOUT,
wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
retry: retries.Retry = DEFAULT_RETRY,
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
callback: Callable = lambda _: None,
) -> RowIterator:
"""See query_and_wait.

This method has an extra callback parameter, which is used by bigframes
to create better progress bars.
"""
if project is None:
project = self.project

Expand All @@ -3681,6 +3716,7 @@ def query_and_wait(
job_retry=job_retry,
page_size=page_size,
max_results=max_results,
callback=callback,
)

def insert_rows(
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,8 @@ def result( # type: ignore # (incompatible with supertype)
return _EmptyRowIterator(
project=self.project,
location=self.location,
schema=self.schema,
total_bytes_processed=self.total_bytes_processed,
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)
Expand Down Expand Up @@ -1737,7 +1739,11 @@ def is_job_done():
project=self.project,
job_id=self.job_id,
query_id=self.query_id,
schema=self.schema,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
query=self.query,
total_bytes_processed=self.total_bytes_processed,
slot_millis=self.slot_millis,
)

# We know that there's at least 1 row, so only treat the response from
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,11 +1228,18 @@ def location(self):

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.job_reference
or https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.location

Returns:
str: Job ID of the query job.
"""
return self._properties.get("jobReference", {}).get("location")
location = self._properties.get("jobReference", {}).get("location")

# Sometimes there's no job, but we still want to get the location
# information. Prefer the value from job for backwards compatibilitity.
if not location:
location = self._properties.get("location")
return location

@property
def query_id(self) -> Optional[str]:
Expand Down
Loading