Skip to content

Commit adfa7b6

Browse files
committed
chore: add private _query_and_wait_bigframes method
1 parent d219989 commit adfa7b6

File tree

3 files changed

+136
-7
lines changed

3 files changed

+136
-7
lines changed

google/cloud/bigquery/_job_helpers.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@
3535
predicates where it is safe to generate a new query ID.
3636
"""
3737

38+
from __future__ import annotations
39+
3840
import copy
41+
import dataclasses
3942
import functools
4043
import uuid
4144
import textwrap
42-
from typing import Any, Dict, Optional, TYPE_CHECKING, Union
45+
from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union
4346
import warnings
4447

4548
import google.api_core.exceptions as core_exceptions
@@ -116,10 +119,15 @@ def query_jobs_insert(
116119
retry: Optional[retries.Retry],
117120
timeout: Optional[float],
118121
job_retry: Optional[retries.Retry],
122+
callback: Callable,
119123
) -> job.QueryJob:
120124
"""Initiate a query using jobs.insert.
121125
122126
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
127+
128+
Args:
129+
callback (Callable):
130+
A callback function used by bigframes to report query progress.
123131
"""
124132
job_id_given = job_id is not None
125133
job_id_save = job_id
@@ -136,6 +144,15 @@ def do_query():
136144

137145
try:
138146
query_job._begin(retry=retry, timeout=timeout)
147+
callback(
148+
QueryRequestedEvent(
149+
query=query,
150+
billing_project=query_job.project,
151+
location=query_job.location,
152+
job_id=query_job.job_id,
153+
request_id=None,
154+
)
155+
)
139156
except core_exceptions.Conflict as create_exc:
140157
# The thought is if someone is providing their own job IDs and they get
141158
# their job ID generation wrong, this could end up returning results for
@@ -396,6 +413,7 @@ def query_and_wait(
396413
job_retry: Optional[retries.Retry],
397414
page_size: Optional[int] = None,
398415
max_results: Optional[int] = None,
416+
callback: Callable = lambda _: None,
399417
) -> table.RowIterator:
400418
"""Run the query, wait for it to finish, and return the results.
401419
@@ -415,9 +433,8 @@ def query_and_wait(
415433
location (Optional[str]):
416434
Location where to run the job. Must match the location of the
417435
table used in the query as well as the destination table.
418-
project (Optional[str]):
419-
Project ID of the project of where to run the job. Defaults
420-
to the client's project.
436+
project (str):
437+
Project ID of the project of where to run the job.
421438
api_timeout (Optional[float]):
422439
The number of seconds to wait for the underlying HTTP transport
423440
before using ``retry``.
@@ -441,6 +458,8 @@ def query_and_wait(
441458
request. Non-positive values are ignored.
442459
max_results (Optional[int]):
443460
The maximum total number of rows from this request.
461+
callback (Callable):
462+
A callback function used by bigframes to report query progress.
444463
445464
Returns:
446465
google.cloud.bigquery.table.RowIterator:
@@ -479,6 +498,7 @@ def query_and_wait(
479498
retry=retry,
480499
timeout=api_timeout,
481500
job_retry=job_retry,
501+
callback=callback,
482502
),
483503
api_timeout=api_timeout,
484504
wait_timeout=wait_timeout,
@@ -497,9 +517,20 @@ def query_and_wait(
497517
request_body["jobCreationMode"] = client.default_job_creation_mode
498518

499519
def do_query():
500-
request_body["requestId"] = make_job_id()
520+
request_id = make_job_id()
521+
request_body["requestId"] = request_id
501522
span_attributes = {"path": path}
502523

524+
callback(
525+
QueryRequestedEvent(
526+
query=query,
527+
billing_project=project,
528+
location=location,
529+
job_id=None,
530+
request_id=request_id,
531+
)
532+
)
533+
503534
# For easier testing, handle the retries ourselves.
504535
if retry is not None:
505536
response = retry(client._call_api)(
@@ -632,3 +663,12 @@ def _wait_or_cancel(
632663
# Don't eat the original exception if cancel fails.
633664
pass
634665
raise
666+
667+
668+
@dataclasses.dataclass(frozen=True)
669+
class QueryRequestedEvent:
670+
query: str
671+
billing_project: str
672+
location: Optional[str]
673+
job_id: Optional[str]
674+
request_id: Optional[str]

google/cloud/bigquery/client.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3633,8 +3633,8 @@ def query_and_wait(
36333633
rate-limit-exceeded errors. Passing ``None`` disables
36343634
job retry. Not all jobs can be retried.
36353635
page_size (Optional[int]):
3636-
The maximum number of rows in each page of results from this
3637-
request. Non-positive values are ignored.
3636+
The maximum number of rows in each page of results from the
3637+
initial jobs.query request. Non-positive values are ignored.
36383638
max_results (Optional[int]):
36393639
The maximum total number of rows from this request.
36403640
@@ -3656,6 +3656,39 @@ def query_and_wait(
36563656
:class:`~google.cloud.bigquery.job.QueryJobConfig`
36573657
class.
36583658
"""
3659+
return self._query_and_wait_bigframes(
3660+
query,
3661+
job_config=job_config,
3662+
location=location,
3663+
project=project,
3664+
api_timeout=api_timeout,
3665+
wait_timeout=wait_timeout,
3666+
retry=retry,
3667+
job_retry=job_retry,
3668+
page_size=page_size,
3669+
max_results=max_results,
3670+
)
3671+
3672+
def _query_and_wait_bigframes(
3673+
self,
3674+
query,
3675+
*,
3676+
job_config: Optional[QueryJobConfig] = None,
3677+
location: Optional[str] = None,
3678+
project: Optional[str] = None,
3679+
api_timeout: TimeoutType = DEFAULT_TIMEOUT,
3680+
wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
3681+
retry: retries.Retry = DEFAULT_RETRY,
3682+
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3683+
page_size: Optional[int] = None,
3684+
max_results: Optional[int] = None,
3685+
callback = lambda _: None,
3686+
) -> RowIterator:
3687+
"""See query_and_wait.
3688+
3689+
This method has an extra callback parameter, which is used by bigframes
3690+
to create better progress bars.
3691+
"""
36593692
if project is None:
36603693
project = self.project
36613694

@@ -3681,6 +3714,7 @@ def query_and_wait(
36813714
job_retry=job_retry,
36823715
page_size=page_size,
36833716
max_results=max_results,
3717+
callback=callback,
36843718
)
36853719

36863720
def insert_rows(
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for Client features enabling the bigframes integration."""
16+
17+
from __future__ import annotations
18+
19+
from unittest import mock
20+
21+
import pytest
22+
23+
import google.auth.credentials
24+
import google.cloud.bigquery.client
25+
26+
27+
PROJECT = "test-project"
28+
LOCATION = "test-location"
29+
30+
31+
def make_response(body, *, status_code: int = 200):
32+
response = mock.Mock()
33+
type(response).status_code = mock.PropertyMock(return_value=status_code)
34+
response.json.return_value = {}
35+
return response
36+
37+
38+
@pytest.fixture
39+
def client():
40+
"""A real client object with mocked API requests."""
41+
credentials = mock.create_autospec(google.auth.credentials.Credentials, instance=True)
42+
http_session = mock.Mock()
43+
return google.cloud.bigquery.client.Client(
44+
project=PROJECT, credentials=credentials, _http=http_session, location=LOCATION,
45+
)
46+
47+
48+
def test_query_and_wait_bigframes_callback(client):
49+
client._http.request.side_effect = [
50+
make_response(
51+
{"jobComplete": True}
52+
),
53+
]
54+
callback = lambda _: None
55+
client._query_and_wait_bigframes(query="SELECT 1", callback=callback)

0 commit comments

Comments
 (0)