From de668d6c51a6819f1454fce9f20e7a28284f51b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 31 Jul 2025 12:00:58 -0500 Subject: [PATCH 1/6] chore: add private `_query_and_wait_bigframes` method Towards internal issue b/409104302 --- google/cloud/bigquery/_job_helpers.py | 138 ++++++++++++++++- google/cloud/bigquery/client.py | 38 ++++- google/cloud/bigquery/job/base.py | 23 ++- google/cloud/bigquery/query.py | 9 +- tests/unit/test_client_bigframes.py | 209 ++++++++++++++++++++++++++ 5 files changed, 402 insertions(+), 15 deletions(-) create mode 100644 tests/unit/test_client_bigframes.py diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 73d4f6e7b..f0bad095c 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -35,17 +35,21 @@ predicates where it is safe to generate a new query ID. """ +from __future__ import annotations + import copy +import dataclasses import functools import uuid import textwrap -from typing import Any, Dict, Optional, TYPE_CHECKING, Union +from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union import warnings import google.api_core.exceptions as core_exceptions from google.api_core import retry as retries from google.cloud.bigquery import job +import google.cloud.bigquery.job.query import google.cloud.bigquery.query from google.cloud.bigquery import table import google.cloud.bigquery.retry @@ -116,14 +120,20 @@ def query_jobs_insert( retry: Optional[retries.Retry], timeout: Optional[float], job_retry: Optional[retries.Retry], + callback: Callable, ) -> job.QueryJob: """Initiate a query using jobs.insert. See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert + + Args: + callback (Callable): + A callback function used by bigframes to report query progress. """ job_id_given = job_id is not None job_id_save = job_id job_config_save = job_config + query_sent_factory = QuerySentEventFactory() def do_query(): # Make a copy now, so that original doesn't get changed by the process @@ -136,6 +146,15 @@ def do_query(): try: query_job._begin(retry=retry, timeout=timeout) + callback( + query_sent_factory( + query=query, + billing_project=query_job.project, + location=query_job.location, + job_id=query_job.job_id, + request_id=None, + ) + ) except core_exceptions.Conflict as create_exc: # The thought is if someone is providing their own job IDs and they get # their job ID generation wrong, this could end up returning results for @@ -396,6 +415,7 @@ def query_and_wait( job_retry: Optional[retries.Retry], page_size: Optional[int] = None, max_results: Optional[int] = None, + callback: Callable = lambda _: None, ) -> table.RowIterator: """Run the query, wait for it to finish, and return the results. @@ -415,9 +435,8 @@ def query_and_wait( location (Optional[str]): Location where to run the job. Must match the location of the table used in the query as well as the destination table. - project (Optional[str]): - Project ID of the project of where to run the job. Defaults - to the client's project. + project (str): + Project ID of the project of where to run the job. api_timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. @@ -441,6 +460,8 @@ def query_and_wait( request. Non-positive values are ignored. max_results (Optional[int]): The maximum total number of rows from this request. + callback (Callable): + A callback function used by bigframes to report query progress. Returns: google.cloud.bigquery.table.RowIterator: @@ -479,12 +500,14 @@ def query_and_wait( retry=retry, timeout=api_timeout, job_retry=job_retry, + callback=callback, ), api_timeout=api_timeout, wait_timeout=wait_timeout, retry=retry, page_size=page_size, max_results=max_results, + callback=callback, ) path = _to_query_path(project) @@ -496,10 +519,23 @@ def query_and_wait( if client.default_job_creation_mode: request_body["jobCreationMode"] = client.default_job_creation_mode + query_sent_factory = QuerySentEventFactory() + def do_query(): - request_body["requestId"] = make_job_id() + request_id = make_job_id() + request_body["requestId"] = request_id span_attributes = {"path": path} + callback( + query_sent_factory( + query=query, + billing_project=project, + location=location, + job_id=None, + request_id=request_id, + ) + ) + # For easier testing, handle the retries ourselves. if retry is not None: response = retry(client._call_api)( @@ -542,8 +578,21 @@ def do_query(): retry=retry, page_size=page_size, max_results=max_results, + callback=callback, ) + callback( + QueryFinishedEvent( + billing_project=project, + location=query_results.location, + query_id=query_results.query_id, + job_id=query_results.job_id, + total_rows=query_results.total_rows, + total_bytes_processed=query_results.total_bytes_processed, + slot_millis=query_results.slot_millis, + destination=None, + ) + ) return table.RowIterator( client=client, api_request=functools.partial(client._call_api, retry, timeout=api_timeout), @@ -611,6 +660,7 @@ def _wait_or_cancel( retry: Optional[retries.Retry], page_size: Optional[int], max_results: Optional[int], + callback: Callable, ) -> table.RowIterator: """Wait for a job to complete and return the results. @@ -618,12 +668,35 @@ def _wait_or_cancel( the job. """ try: - return job.result( + callback( + QueryReceivedEvent( + billing_project=job.project, + location=job.location, + job_id=job.job_id, + statement_type=job.statement_type, + state=job.state, + query_plan=job.query_plan, + ) + ) + query_results = job.result( page_size=page_size, max_results=max_results, retry=retry, timeout=wait_timeout, ) + callback( + QueryFinishedEvent( + billing_project=job.project, + location=query_results.location, + query_id=query_results.query_id, + job_id=query_results.job_id, + total_rows=query_results.total_rows, + total_bytes_processed=query_results.total_bytes_processed, + slot_millis=query_results.slot_millis, + destination=job.destination, + ) + ) + return query_results except Exception: # Attempt to cancel the job since we can't return the results. try: @@ -632,3 +705,56 @@ def _wait_or_cancel( # Don't eat the original exception if cancel fails. pass raise + + +@dataclasses.dataclass(frozen=True) +class QueryFinishedEvent: + """Query finished successfully.""" + + billing_project: Optional[str] + location: Optional[str] + query_id: Optional[str] + job_id: Optional[str] + destination: Optional[table.TableReference] + total_rows: Optional[int] + total_bytes_processed: Optional[int] + slot_millis: Optional[int] + + +@dataclasses.dataclass(frozen=True) +class QueryReceivedEvent: + """Query received and acknowledged by the BigQuery API.""" + + billing_project: Optional[str] + location: Optional[str] + job_id: Optional[str] + statement_type: Optional[str] + state: Optional[str] + query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] + + +@dataclasses.dataclass(frozen=True) +class QuerySentEvent: + """Query sent to BigQuery.""" + + query: str + billing_project: Optional[str] + location: Optional[str] + job_id: Optional[str] + request_id: Optional[str] + + +class QueryRetryEvent(QuerySentEvent): + """Query sent another time because the previous failed.""" + + +class QuerySentEventFactory: + """Creates a QuerySentEvent first, then QueryRetryEvent after that.""" + + def __init__(self): + self._event_constructor = QuerySentEvent + + def __call__(self, **kwargs): + result = self._event_constructor(**kwargs) + self._event_constructor = QueryRetryEvent + return result diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 804f77ea2..d2e772b85 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3633,8 +3633,8 @@ def query_and_wait( rate-limit-exceeded errors. Passing ``None`` disables job retry. Not all jobs can be retried. page_size (Optional[int]): - The maximum number of rows in each page of results from this - request. Non-positive values are ignored. + The maximum number of rows in each page of results from the + initial jobs.query request. Non-positive values are ignored. max_results (Optional[int]): The maximum total number of rows from this request. @@ -3656,6 +3656,39 @@ def query_and_wait( :class:`~google.cloud.bigquery.job.QueryJobConfig` class. """ + return self._query_and_wait_bigframes( + query, + job_config=job_config, + location=location, + project=project, + api_timeout=api_timeout, + wait_timeout=wait_timeout, + retry=retry, + job_retry=job_retry, + page_size=page_size, + max_results=max_results, + ) + + def _query_and_wait_bigframes( + self, + query, + *, + job_config: Optional[QueryJobConfig] = None, + location: Optional[str] = None, + project: Optional[str] = None, + api_timeout: TimeoutType = DEFAULT_TIMEOUT, + wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE, + retry: retries.Retry = DEFAULT_RETRY, + job_retry: retries.Retry = DEFAULT_JOB_RETRY, + page_size: Optional[int] = None, + max_results: Optional[int] = None, + callback = lambda _: None, + ) -> RowIterator: + """See query_and_wait. + + This method has an extra callback parameter, which is used by bigframes + to create better progress bars. + """ if project is None: project = self.project @@ -3681,6 +3714,7 @@ def query_and_wait( job_retry=job_retry, page_size=page_size, max_results=max_results, + callback=callback, ) def insert_rows( diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index f007b9341..e26e45152 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -14,6 +14,8 @@ """Base classes and helpers for job classes.""" +from __future__ import annotations + from collections import namedtuple import copy import http @@ -440,9 +442,12 @@ def configuration(self) -> _JobConfig: return configuration @property - def job_id(self): + def job_id(self) -> Optional[str]: """str: ID of the job.""" - return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]) + return typing.cast( + Optional[str], + _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]), + ) @property def parent_job_id(self): @@ -493,18 +498,24 @@ def num_child_jobs(self): return int(count) if count is not None else 0 @property - def project(self): + def project(self) -> Optional[str]: """Project bound to the job. Returns: str: the project (derived from the client). """ - return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]) + return typing.cast( + Optional[str], + _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]), + ) @property - def location(self): + def location(self) -> Optional[str]: """str: Location where the job runs.""" - return _helpers._get_sub_prop(self._properties, ["jobReference", "location"]) + return typing.cast( + Optional[str], + _helpers._get_sub_prop(self._properties, ["jobReference", "location"]), + ) @property def reservation_id(self): diff --git a/google/cloud/bigquery/query.py b/google/cloud/bigquery/query.py index 4a006d621..3dbefd50c 100644 --- a/google/cloud/bigquery/query.py +++ b/google/cloud/bigquery/query.py @@ -1228,11 +1228,18 @@ def location(self): See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.job_reference + or https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.location Returns: str: Job ID of the query job. """ - return self._properties.get("jobReference", {}).get("location") + location = self._properties.get("jobReference", {}).get("location") + + # Sometimes there's no job, but we still want to get the location + # information. Prefer the value from job for backwards compatibilitity. + if not location: + location = self._properties.get("location") + return location @property def query_id(self) -> Optional[str]: diff --git a/tests/unit/test_client_bigframes.py b/tests/unit/test_client_bigframes.py new file mode 100644 index 000000000..9d78a1948 --- /dev/null +++ b/tests/unit/test_client_bigframes.py @@ -0,0 +1,209 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Client features enabling the bigframes integration.""" + +from __future__ import annotations + +from unittest import mock + +import pytest + +import google.auth.credentials +from google.api_core import exceptions +from google.cloud import bigquery +import google.cloud.bigquery.client +from google.cloud.bigquery import _job_helpers + + +PROJECT = "test-project" +LOCATION = "test-location" + + +def make_response(body, *, status_code: int = 200): + response = mock.Mock() + type(response).status_code = mock.PropertyMock(return_value=status_code) + response.json.return_value = body + return response + + +@pytest.fixture +def client(): + """A real client object with mocked API requests.""" + credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + http_session = mock.Mock() + return google.cloud.bigquery.client.Client( + project=PROJECT, + credentials=credentials, + _http=http_session, + location=LOCATION, + ) + + +def test_query_and_wait_bigframes_callback(client): + client._http.request.side_effect = [ + make_response( + { + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + "location": LOCATION, + "queryId": "abcdefg", + "totalRows": "100", + "totalBytesProcessed": "123", + "totalSlotMs": "987", + "jobComplete": True, + } + ), + ] + callback = mock.Mock() + client._query_and_wait_bigframes(query="SELECT 1", callback=callback) + callback.assert_has_calls( + [ + mock.call( + _job_helpers.QuerySentEvent( + query="SELECT 1", + billing_project=PROJECT, + location=LOCATION, + # No job ID, because a basic query is elegible for jobs.query. + job_id=None, + request_id=mock.ANY, + ) + ), + mock.call( + _job_helpers.QueryFinishedEvent( + billing_project=PROJECT, + location=LOCATION, + query_id="abcdefg", + total_rows=100, + total_bytes_processed=123, + slot_millis=987, + # No job ID or destination, because a basic query is elegible for jobs.query. + job_id=None, + destination=None, + ), + ), + ] + ) + + +def test_query_and_wait_bigframes_with_job_callback(client): + client._http.request.side_effect = [ + make_response( + { + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert + # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job + "jobReference": {}, + } + ), + make_response( + { + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert + # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job + "jobReference": {}, + "status": {"state": "DONE"}, + } + ), + ] + callback = mock.Mock() + config = bigquery.QueryJobConfig() + config.destination = "proj.dset.table" + client._query_and_wait_bigframes( + query="SELECT 1", job_config=config, callback=callback + ) + callback.assert_has_calls( + [ + mock.call( + _job_helpers.QuerySentEvent( + query="SELECT 1", + billing_project=PROJECT, + location=LOCATION, + # No job ID, because a basic query is elegible for jobs.query. + job_id=None, + request_id=mock.ANY, + ) + ), + mock.call( + _job_helpers.QueryFinishedEvent( + billing_project=PROJECT, + location=LOCATION, + query_id="abcdefg", + total_rows=100, + total_bytes_processed=123, + slot_millis=987, + # No job ID or destination, because a basic query is elegible for jobs.query. + job_id=None, + destination=None, + ), + ), + ] + ) + + +def test_query_and_wait_bigframes_with_query_retry_callbacks(client): + client._http.request.side_effect = [ + exceptions.InternalServerError( + "first try", errors=({"reason": "jobInternalError"},) + ), + make_response( + { + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + "location": LOCATION, + "queryId": "abcdefg", + "totalRows": "100", + "totalBytesProcessed": "123", + "totalSlotMs": "987", + "jobComplete": True, + } + ), + ] + callback = mock.Mock() + client._query_and_wait_bigframes(query="SELECT 1", callback=callback) + callback.assert_has_calls( + [ + mock.call( + _job_helpers.QuerySentEvent( + query="SELECT 1", + billing_project=PROJECT, + location=LOCATION, + # No job ID, because a basic query is elegible for jobs.query. + job_id=None, + request_id=mock.ANY, + ) + ), + mock.call( + _job_helpers.QueryRetryEvent( + query="SELECT 1", + billing_project=PROJECT, + location=LOCATION, + # No job ID, because a basic query is elegible for jobs.query. + job_id=None, + request_id=mock.ANY, + ) + ), + mock.call( + _job_helpers.QueryFinishedEvent( + billing_project=PROJECT, + location=LOCATION, + query_id=mock.ANY, + total_rows=100, + total_bytes_processed=123, + slot_millis=987, + # No job ID or destination, because a basic query is elegible for jobs.query. + job_id=None, + destination=None, + ), + ), + ] + ) From 75168c79b65287cf64a675e21a861952367bf08b Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Wed, 13 Aug 2025 19:32:52 +0000 Subject: [PATCH 2/6] fix unit tests --- google/cloud/bigquery/_job_helpers.py | 124 +++++++++------ google/cloud/bigquery/job/query.py | 6 + google/cloud/bigquery/table.py | 3 +- tests/unit/test_client_bigframes.py | 212 ++++++++++++++++++++++++-- 4 files changed, 280 insertions(+), 65 deletions(-) diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index f0bad095c..744118a80 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -39,6 +39,7 @@ import copy import dataclasses +import datetime import functools import uuid import textwrap @@ -120,7 +121,8 @@ def query_jobs_insert( retry: Optional[retries.Retry], timeout: Optional[float], job_retry: Optional[retries.Retry], - callback: Callable, + *, + callback: Callable = lambda _: None, ) -> job.QueryJob: """Initiate a query using jobs.insert. @@ -146,15 +148,16 @@ def do_query(): try: query_job._begin(retry=retry, timeout=timeout) - callback( - query_sent_factory( - query=query, - billing_project=query_job.project, - location=query_job.location, - job_id=query_job.job_id, - request_id=None, + if job_config is not None and not job_config.dry_run: + callback( + query_sent_factory( + query=query, + billing_project=query_job.project, + location=query_job.location, + job_id=query_job.job_id, + request_id=None, + ) ) - ) except core_exceptions.Conflict as create_exc: # The thought is if someone is providing their own job IDs and they get # their job ID generation wrong, this could end up returning results for @@ -526,15 +529,16 @@ def do_query(): request_body["requestId"] = request_id span_attributes = {"path": path} - callback( - query_sent_factory( - query=query, - billing_project=project, - location=location, - job_id=None, - request_id=request_id, + if "dryRun" not in request_body: + callback( + query_sent_factory( + query=query, + billing_project=project, + location=location, + job_id=None, + request_id=request_id, + ) ) - ) # For easier testing, handle the retries ourselves. if retry is not None: @@ -581,18 +585,25 @@ def do_query(): callback=callback, ) - callback( - QueryFinishedEvent( - billing_project=project, - location=query_results.location, - query_id=query_results.query_id, - job_id=query_results.job_id, - total_rows=query_results.total_rows, - total_bytes_processed=query_results.total_bytes_processed, - slot_millis=query_results.slot_millis, - destination=None, + if "dryRun" not in request_body: + callback( + QueryFinishedEvent( + billing_project=project, + location=query_results.location, + query_id=query_results.query_id, + job_id=query_results.job_id, + total_rows=query_results.total_rows, + total_bytes_processed=query_results.total_bytes_processed, + slot_millis=query_results.slot_millis, + destination=None, + # TODO(tswast): After + # https://github.com/googleapis/python-bigquery/pull/2260 goes in, add + # created, started, ended properties here. + created=None, + started=None, + ended=None, + ) ) - ) return table.RowIterator( client=client, api_request=functools.partial(client._call_api, retry, timeout=api_timeout), @@ -660,7 +671,8 @@ def _wait_or_cancel( retry: Optional[retries.Retry], page_size: Optional[int], max_results: Optional[int], - callback: Callable, + *, + callback: Callable = lambda _: None, ) -> table.RowIterator: """Wait for a job to complete and return the results. @@ -668,34 +680,42 @@ def _wait_or_cancel( the job. """ try: - callback( - QueryReceivedEvent( - billing_project=job.project, - location=job.location, - job_id=job.job_id, - statement_type=job.statement_type, - state=job.state, - query_plan=job.query_plan, + if not job.dry_run: + callback( + QueryReceivedEvent( + billing_project=job.project, + location=job.location, + job_id=job.job_id, + statement_type=job.statement_type, + state=job.state, + query_plan=job.query_plan, + created=job.created, + started=job.started, + ended=job.ended, + ) ) - ) query_results = job.result( page_size=page_size, max_results=max_results, retry=retry, timeout=wait_timeout, ) - callback( - QueryFinishedEvent( - billing_project=job.project, - location=query_results.location, - query_id=query_results.query_id, - job_id=query_results.job_id, - total_rows=query_results.total_rows, - total_bytes_processed=query_results.total_bytes_processed, - slot_millis=query_results.slot_millis, - destination=job.destination, + if not job.dry_run: + callback( + QueryFinishedEvent( + billing_project=job.project, + location=query_results.location, + query_id=query_results.query_id, + job_id=query_results.job_id, + total_rows=query_results.total_rows, + total_bytes_processed=query_results.total_bytes_processed, + slot_millis=query_results.slot_millis, + destination=job.destination, + created=job.created, + started=job.started, + ended=job.ended, + ) ) - ) return query_results except Exception: # Attempt to cancel the job since we can't return the results. @@ -719,6 +739,9 @@ class QueryFinishedEvent: total_rows: Optional[int] total_bytes_processed: Optional[int] slot_millis: Optional[int] + created: Optional[datetime.datetime] + started: Optional[datetime.datetime] + ended: Optional[datetime.datetime] @dataclasses.dataclass(frozen=True) @@ -731,6 +754,9 @@ class QueryReceivedEvent: statement_type: Optional[str] state: Optional[str] query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] + created: Optional[datetime.datetime] + started: Optional[datetime.datetime] + ended: Optional[datetime.datetime] @dataclasses.dataclass(frozen=True) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index ec9379ea9..d983f96d3 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1550,6 +1550,8 @@ def result( # type: ignore # (incompatible with supertype) return _EmptyRowIterator( project=self.project, location=self.location, + schema=self.schema, + total_bytes_processed=self.total_bytes_processed, # Intentionally omit job_id and query_id since this doesn't # actually correspond to a finished query job. ) @@ -1737,7 +1739,11 @@ def is_job_done(): project=self.project, job_id=self.job_id, query_id=self.query_id, + schema=self.schema, num_dml_affected_rows=self._query_results.num_dml_affected_rows, + query=self.query, + total_bytes_processed=self.total_bytes_processed, + slot_millis=self.slot_millis, ) # We know that there's at least 1 row, so only treat the response from diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index dbdde36d1..75d7eba88 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1826,7 +1826,7 @@ def __init__( page_start=_rows_page_start, next_token="pageToken", ) - schema = _to_schema_fields(schema) + schema = _to_schema_fields(schema) if schema else () self._field_to_index = _helpers._field_to_index_mapping(schema) self._page_size = page_size self._preserve_order = False @@ -2888,7 +2888,6 @@ class _EmptyRowIterator(RowIterator): statements. """ - schema = () pages = () total_rows = 0 diff --git a/tests/unit/test_client_bigframes.py b/tests/unit/test_client_bigframes.py index 9d78a1948..cef212ce9 100644 --- a/tests/unit/test_client_bigframes.py +++ b/tests/unit/test_client_bigframes.py @@ -16,6 +16,7 @@ from __future__ import annotations +import datetime from unittest import mock import pytest @@ -53,6 +54,28 @@ def client(): ) +def test_query_and_wait_bigframes_dry_run_no_callback(client): + client._http.request.side_effect = [ + make_response( + { + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + "location": LOCATION, + "queryId": "abcdefg", + "totalBytesProcessed": "123", + "jobComplete": True, + } + ), + ] + callback = mock.Mock() + job_config = bigquery.QueryJobConfig(dry_run=True) + response = client._query_and_wait_bigframes( + query="SELECT 1", job_config=job_config, callback=callback + ) + callback.assert_not_called() + assert response.total_bytes_processed == 123 + assert response.query_id == "abcdefg" + + def test_query_and_wait_bigframes_callback(client): client._http.request.side_effect = [ make_response( @@ -64,6 +87,9 @@ def test_query_and_wait_bigframes_callback(client): "totalBytesProcessed": "123", "totalSlotMs": "987", "jobComplete": True, + # TODO(tswast): After + # https://github.com/googleapis/python-bigquery/pull/2260 goes in, add + # created, started, ended properties here. } ), ] @@ -89,6 +115,9 @@ def test_query_and_wait_bigframes_callback(client): total_rows=100, total_bytes_processed=123, slot_millis=987, + created=None, + started=None, + ended=None, # No job ID or destination, because a basic query is elegible for jobs.query. job_id=None, destination=None, @@ -98,20 +127,100 @@ def test_query_and_wait_bigframes_callback(client): ) -def test_query_and_wait_bigframes_with_job_callback(client): +def _to_millis(dt: datetime.datetime) -> str: + return str( + int( + (dt - datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)) + / datetime.timedelta(milliseconds=1) + ) + ) + + +def test_query_and_wait_bigframes_with_jobs_insert_callback_empty_results(client): client._http.request.side_effect = [ + # jobs.insert because destination table present in job_config make_response( { # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job - "jobReference": {}, + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "statistics": { + "creationTime": _to_millis( + datetime.datetime( + 2025, 8, 13, 13, 7, 31, 123000, tzinfo=datetime.timezone.utc + ) + ), + "query": { + "statementType": "SELECT", + # "queryPlan": [{"name": "part1"}, {"name": "part2"}], + }, + }, + "status": { + "state": "PENDING", + }, } ), + # jobs.get waiting for query to finish make_response( { # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job - "jobReference": {}, + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "status": { + "state": "PENDING", + }, + } + ), + # jobs.getQueryResults with max_results=0 + make_response( + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "jobComplete": True, + # totalRows is intentionally missing so we end up in the _EmptyRowIterator code path. + } + ), + # jobs.get + make_response( + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "statistics": { + "creationTime": _to_millis( + datetime.datetime( + 2025, 8, 13, 13, 7, 31, 123000, tzinfo=datetime.timezone.utc + ) + ), + "startTime": _to_millis( + datetime.datetime( + 2025, 8, 13, 13, 7, 32, 123000, tzinfo=datetime.timezone.utc + ) + ), + "endTime": _to_millis( + datetime.datetime( + 2025, 8, 13, 13, 7, 33, 123000, tzinfo=datetime.timezone.utc + ) + ), + "query": { + "statementType": "SELECT", + "totalBytesProcessed": 123, + "totalSlotMs": 987, + }, + }, "status": {"state": "DONE"}, } ), @@ -127,23 +236,47 @@ def test_query_and_wait_bigframes_with_job_callback(client): mock.call( _job_helpers.QuerySentEvent( query="SELECT 1", - billing_project=PROJECT, - location=LOCATION, - # No job ID, because a basic query is elegible for jobs.query. - job_id=None, - request_id=mock.ANY, + billing_project="response-project", + location="response-location", + job_id="response-job-id", + # We use jobs.insert not jobs.query because destination is + # present on job_config. + request_id=None, + ) + ), + mock.call( + _job_helpers.QueryReceivedEvent( + billing_project="response-project", + location="response-location", + job_id="response-job-id", + statement_type="SELECT", + state="PENDING", + query_plan=[], + created=datetime.datetime( + 2025, 8, 13, 13, 7, 31, 123000, tzinfo=datetime.timezone.utc + ), + started=None, + ended=None, ) ), mock.call( _job_helpers.QueryFinishedEvent( - billing_project=PROJECT, - location=LOCATION, - query_id="abcdefg", - total_rows=100, + billing_project="response-project", + location="response-location", + job_id="response-job-id", + query_id=None, + total_rows=0, total_bytes_processed=123, slot_millis=987, - # No job ID or destination, because a basic query is elegible for jobs.query. - job_id=None, + created=datetime.datetime( + 2025, 8, 13, 13, 7, 31, 123000, tzinfo=datetime.timezone.utc + ), + started=datetime.datetime( + 2025, 8, 13, 13, 7, 32, 123000, tzinfo=datetime.timezone.utc + ), + ended=datetime.datetime( + 2025, 8, 13, 13, 7, 33, 123000, tzinfo=datetime.timezone.utc + ), destination=None, ), ), @@ -151,6 +284,51 @@ def test_query_and_wait_bigframes_with_job_callback(client): ) +def test_query_and_wait_bigframes_with_jobs_insert_dry_run_no_callback(client): + client._http.request.side_effect = [ + # jobs.insert because destination table present in job_config + make_response( + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "statistics": { + "creationTime": _to_millis( + datetime.datetime( + 2025, 8, 13, 13, 7, 31, 123000, tzinfo=datetime.timezone.utc + ) + ), + "query": { + "statementType": "SELECT", + "totalBytesProcessed": 123, + "schema": { + "fields": [ + {"name": "_f0", "type": "INTEGER"}, + ], + }, + }, + }, + "configuration": { + "dryRun": True, + }, + "status": {"state": "DONE"}, + } + ), + ] + callback = mock.Mock() + config = bigquery.QueryJobConfig() + config.destination = "proj.dset.table" + config.dry_run = True + result = client._query_and_wait_bigframes( + query="SELECT 1", job_config=config, callback=callback + ) + callback.assert_not_called() + assert result.total_bytes_processed == 123 + assert result.schema == [bigquery.SchemaField("_f0", "INTEGER")] + + def test_query_and_wait_bigframes_with_query_retry_callbacks(client): client._http.request.side_effect = [ exceptions.InternalServerError( @@ -165,6 +343,9 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): "totalBytesProcessed": "123", "totalSlotMs": "987", "jobComplete": True, + # TODO(tswast): After + # https://github.com/googleapis/python-bigquery/pull/2260 goes in, add + # created, started, ended properties here. } ), ] @@ -200,6 +381,9 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): total_rows=100, total_bytes_processed=123, slot_millis=987, + created=None, + started=None, + ended=None, # No job ID or destination, because a basic query is elegible for jobs.query. job_id=None, destination=None, From bf45bd2908a2d1b088604eb66ee65f3b4f955e7b Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 14 Aug 2025 18:58:24 +0000 Subject: [PATCH 3/6] revert type hints --- google/cloud/bigquery/job/base.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index e26e45152..f007b9341 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -14,8 +14,6 @@ """Base classes and helpers for job classes.""" -from __future__ import annotations - from collections import namedtuple import copy import http @@ -442,12 +440,9 @@ def configuration(self) -> _JobConfig: return configuration @property - def job_id(self) -> Optional[str]: + def job_id(self): """str: ID of the job.""" - return typing.cast( - Optional[str], - _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]), - ) + return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]) @property def parent_job_id(self): @@ -498,24 +493,18 @@ def num_child_jobs(self): return int(count) if count is not None else 0 @property - def project(self) -> Optional[str]: + def project(self): """Project bound to the job. Returns: str: the project (derived from the client). """ - return typing.cast( - Optional[str], - _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]), - ) + return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]) @property - def location(self) -> Optional[str]: + def location(self): """str: Location where the job runs.""" - return typing.cast( - Optional[str], - _helpers._get_sub_prop(self._properties, ["jobReference", "location"]), - ) + return _helpers._get_sub_prop(self._properties, ["jobReference", "location"]) @property def reservation_id(self): From a0293f675023c7971aac026486ffebb75627270a Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 14 Aug 2025 19:34:45 +0000 Subject: [PATCH 4/6] lint --- google/cloud/bigquery/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index d2e772b85..fbaf2b76a 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -15,6 +15,7 @@ """Client for interacting with the Google BigQuery API.""" from __future__ import absolute_import +from __future__ import annotations from __future__ import division from collections import abc as collections_abc @@ -31,6 +32,7 @@ import typing from typing import ( Any, + Callable, Dict, IO, Iterable, @@ -3682,10 +3684,10 @@ def _query_and_wait_bigframes( job_retry: retries.Retry = DEFAULT_JOB_RETRY, page_size: Optional[int] = None, max_results: Optional[int] = None, - callback = lambda _: None, + callback: Callable = lambda _: None, ) -> RowIterator: """See query_and_wait. - + This method has an extra callback parameter, which is used by bigframes to create better progress bars. """ From c4efaee10ae506a4851d686a879c27fb9f875ae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 18 Aug 2025 10:06:18 -0500 Subject: [PATCH 5/6] Apply suggestions from code review Co-authored-by: Chalmer Lowe --- google/cloud/bigquery/_job_helpers.py | 2 +- tests/unit/test_client_bigframes.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 744118a80..bfad35045 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -771,7 +771,7 @@ class QuerySentEvent: class QueryRetryEvent(QuerySentEvent): - """Query sent another time because the previous failed.""" + """Query sent another time because the previous attempt failed.""" class QuerySentEventFactory: diff --git a/tests/unit/test_client_bigframes.py b/tests/unit/test_client_bigframes.py index cef212ce9..225baba4e 100644 --- a/tests/unit/test_client_bigframes.py +++ b/tests/unit/test_client_bigframes.py @@ -102,7 +102,7 @@ def test_query_and_wait_bigframes_callback(client): query="SELECT 1", billing_project=PROJECT, location=LOCATION, - # No job ID, because a basic query is elegible for jobs.query. + # No job ID, because a basic query is eligible for jobs.query. job_id=None, request_id=mock.ANY, ) @@ -118,7 +118,7 @@ def test_query_and_wait_bigframes_callback(client): created=None, started=None, ended=None, - # No job ID or destination, because a basic query is elegible for jobs.query. + # No job ID or destination, because a basic query is eligible for jobs.query. job_id=None, destination=None, ), @@ -358,7 +358,7 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): query="SELECT 1", billing_project=PROJECT, location=LOCATION, - # No job ID, because a basic query is elegible for jobs.query. + # No job ID, because a basic query is eligible for jobs.query. job_id=None, request_id=mock.ANY, ) @@ -368,7 +368,7 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): query="SELECT 1", billing_project=PROJECT, location=LOCATION, - # No job ID, because a basic query is elegible for jobs.query. + # No job ID, because a basic query is eligible for jobs.query. job_id=None, request_id=mock.ANY, ) @@ -384,7 +384,7 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): created=None, started=None, ended=None, - # No job ID or destination, because a basic query is elegible for jobs.query. + # No job ID or destination, because a basic query is eligible for jobs.query. job_id=None, destination=None, ), From 96a9dedba65a51dcf2e532190f7ded4359161ffc Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Mon, 18 Aug 2025 15:15:27 +0000 Subject: [PATCH 6/6] populate created, started, ended --- google/cloud/bigquery/_job_helpers.py | 9 ++---- tests/unit/test_client_bigframes.py | 42 +++++++++++++++++++-------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 4d77f19d9..6fd561f8c 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -596,12 +596,9 @@ def do_query(): total_bytes_processed=query_results.total_bytes_processed, slot_millis=query_results.slot_millis, destination=None, - # TODO(tswast): After - # https://github.com/googleapis/python-bigquery/pull/2260 goes in, add - # created, started, ended properties here. - created=None, - started=None, - ended=None, + created=query_results.created, + started=query_results.started, + ended=query_results.ended, ) ) return table.RowIterator( diff --git a/tests/unit/test_client_bigframes.py b/tests/unit/test_client_bigframes.py index 225baba4e..0fcc31e40 100644 --- a/tests/unit/test_client_bigframes.py +++ b/tests/unit/test_client_bigframes.py @@ -77,6 +77,15 @@ def test_query_and_wait_bigframes_dry_run_no_callback(client): def test_query_and_wait_bigframes_callback(client): + created = datetime.datetime( + 2025, 8, 18, 10, 11, 12, 345000, tzinfo=datetime.timezone.utc + ) + started = datetime.datetime( + 2025, 8, 18, 10, 11, 13, 456000, tzinfo=datetime.timezone.utc + ) + ended = datetime.datetime( + 2025, 8, 18, 10, 11, 14, 567000, tzinfo=datetime.timezone.utc + ) client._http.request.side_effect = [ make_response( { @@ -87,9 +96,9 @@ def test_query_and_wait_bigframes_callback(client): "totalBytesProcessed": "123", "totalSlotMs": "987", "jobComplete": True, - # TODO(tswast): After - # https://github.com/googleapis/python-bigquery/pull/2260 goes in, add - # created, started, ended properties here. + "creationTime": _to_millis(created), + "startTime": _to_millis(started), + "endTime": _to_millis(ended), } ), ] @@ -115,9 +124,9 @@ def test_query_and_wait_bigframes_callback(client): total_rows=100, total_bytes_processed=123, slot_millis=987, - created=None, - started=None, - ended=None, + created=created, + started=started, + ended=ended, # No job ID or destination, because a basic query is eligible for jobs.query. job_id=None, destination=None, @@ -330,6 +339,15 @@ def test_query_and_wait_bigframes_with_jobs_insert_dry_run_no_callback(client): def test_query_and_wait_bigframes_with_query_retry_callbacks(client): + created = datetime.datetime( + 2025, 8, 18, 10, 11, 12, 345000, tzinfo=datetime.timezone.utc + ) + started = datetime.datetime( + 2025, 8, 18, 10, 11, 13, 456000, tzinfo=datetime.timezone.utc + ) + ended = datetime.datetime( + 2025, 8, 18, 10, 11, 14, 567000, tzinfo=datetime.timezone.utc + ) client._http.request.side_effect = [ exceptions.InternalServerError( "first try", errors=({"reason": "jobInternalError"},) @@ -343,9 +361,9 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): "totalBytesProcessed": "123", "totalSlotMs": "987", "jobComplete": True, - # TODO(tswast): After - # https://github.com/googleapis/python-bigquery/pull/2260 goes in, add - # created, started, ended properties here. + "creationTime": _to_millis(created), + "startTime": _to_millis(started), + "endTime": _to_millis(ended), } ), ] @@ -381,9 +399,9 @@ def test_query_and_wait_bigframes_with_query_retry_callbacks(client): total_rows=100, total_bytes_processed=123, slot_millis=987, - created=None, - started=None, - ended=None, + created=created, + started=started, + ended=ended, # No job ID or destination, because a basic query is eligible for jobs.query. job_id=None, destination=None,