Skip to content

Commit 33721ee

Browse files
committed
work-in-progress: callbacks for query finished
1 parent 91c8cce commit 33721ee

File tree

2 files changed

+105
-31
lines changed

2 files changed

+105
-31
lines changed

google/cloud/bigquery/_job_helpers.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ def query_and_wait(
506506
retry=retry,
507507
page_size=page_size,
508508
max_results=max_results,
509+
callback=callback,
509510
)
510511

511512
path = _to_query_path(project)
@@ -516,7 +517,7 @@ def query_and_wait(
516517
request_body["maxResults"] = page_size or max_results
517518
if client.default_job_creation_mode:
518519
request_body["jobCreationMode"] = client.default_job_creation_mode
519-
520+
520521
query_sent_factory = QuerySentEventFactory()
521522

522523
def do_query():
@@ -576,6 +577,7 @@ def do_query():
576577
retry=retry,
577578
page_size=page_size,
578579
max_results=max_results,
580+
callback=callback,
579581
)
580582

581583
callback(
@@ -657,19 +659,33 @@ def _wait_or_cancel(
657659
retry: Optional[retries.Retry],
658660
page_size: Optional[int],
659661
max_results: Optional[int],
662+
callback: Callable,
660663
) -> table.RowIterator:
661664
"""Wait for a job to complete and return the results.
662665
663666
If we can't return the results within the ``wait_timeout``, try to cancel
664667
the job.
665668
"""
666669
try:
667-
return job.result(
670+
query_results = job.result(
668671
page_size=page_size,
669672
max_results=max_results,
670673
retry=retry,
671674
timeout=wait_timeout,
672675
)
676+
callback(
677+
QueryCompleteEvent(
678+
billing_project=job.project,
679+
location=query_results.location,
680+
query_id=query_results.query_id,
681+
job_id=query_results.job_id,
682+
total_rows=query_results.total_rows,
683+
total_bytes_processed=query_results.total_bytes_processed,
684+
slot_millis=query_results.slot_millis,
685+
destination=job.destination,
686+
)
687+
)
688+
return query_results
673689
except Exception:
674690
# Attempt to cancel the job since we can't return the results.
675691
try:
@@ -683,6 +699,7 @@ def _wait_or_cancel(
683699
@dataclasses.dataclass(frozen=True)
684700
class QueryCompleteEvent:
685701
"""Query finished successfully."""
702+
686703
billing_project: str
687704
location: Optional[str]
688705
query_id: Optional[str]
@@ -691,11 +708,12 @@ class QueryCompleteEvent:
691708
total_rows: Optional[int]
692709
total_bytes_processed: Optional[int]
693710
slot_millis: Optional[int]
694-
711+
695712

696713
@dataclasses.dataclass(frozen=True)
697714
class QuerySentEvent:
698715
"""Query sent to BigQuery."""
716+
699717
query: str
700718
billing_project: str
701719
location: Optional[str]
@@ -712,7 +730,7 @@ class QuerySentEventFactory:
712730

713731
def __init__(self):
714732
self._event_constructor = QuerySentEvent
715-
733+
716734
def __call__(self, **kwargs):
717735
result = self._event_constructor(**kwargs)
718736
self._event_constructor = QueryRetryEvent

tests/unit/test_client_bigframes.py

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import google.auth.credentials
2424
from google.api_core import exceptions
25+
from google.cloud import bigquery
2526
import google.cloud.bigquery.client
2627
from google.cloud.bigquery import _job_helpers
2728

@@ -40,10 +41,15 @@ def make_response(body, *, status_code: int = 200):
4041
@pytest.fixture
4142
def client():
4243
"""A real client object with mocked API requests."""
43-
credentials = mock.create_autospec(google.auth.credentials.Credentials, instance=True)
44+
credentials = mock.create_autospec(
45+
google.auth.credentials.Credentials, instance=True
46+
)
4447
http_session = mock.Mock()
4548
return google.cloud.bigquery.client.Client(
46-
project=PROJECT, credentials=credentials, _http=http_session, location=LOCATION,
49+
project=PROJECT,
50+
credentials=credentials,
51+
_http=http_session,
52+
location=LOCATION,
4753
)
4854

4955

@@ -66,39 +72,89 @@ def test_query_and_wait_bigframes_callback(client):
6672
callback.assert_has_calls(
6773
[
6874
mock.call(
69-
_job_helpers.QuerySentEvent(
70-
query="SELECT 1",
71-
billing_project=PROJECT,
72-
location=LOCATION,
73-
# No job ID, because a basic query is elegible for jobs.query.
74-
job_id=None,
75-
request_id=mock.ANY,
76-
)
75+
_job_helpers.QuerySentEvent(
76+
query="SELECT 1",
77+
billing_project=PROJECT,
78+
location=LOCATION,
79+
# No job ID, because a basic query is elegible for jobs.query.
80+
job_id=None,
81+
request_id=mock.ANY,
82+
)
83+
),
84+
mock.call(
85+
_job_helpers.QueryCompleteEvent(
86+
billing_project=PROJECT,
87+
location=LOCATION,
88+
query_id="abcdefg",
89+
total_rows=100,
90+
total_bytes_processed=123,
91+
slot_millis=987,
92+
# No job ID or destination, because a basic query is elegible for jobs.query.
93+
job_id=None,
94+
destination=None,
95+
),
96+
),
97+
]
98+
)
99+
100+
101+
def test_query_and_wait_bigframes_with_job_callback(client):
102+
client._http.request.side_effect = [
103+
make_response(
104+
{
105+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
106+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
107+
"jobReference": {},
108+
}
109+
),
110+
make_response(
111+
{
112+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
113+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
114+
"jobReference": {},
115+
"status": {"state": "DONE"},
116+
}
117+
),
118+
]
119+
callback = mock.Mock()
120+
config = bigquery.QueryJobConfig()
121+
config.destination = "proj.dset.table"
122+
client._query_and_wait_bigframes(
123+
query="SELECT 1", job_config=config, callback=callback
124+
)
125+
callback.assert_has_calls(
126+
[
127+
mock.call(
128+
_job_helpers.QuerySentEvent(
129+
query="SELECT 1",
130+
billing_project=PROJECT,
131+
location=LOCATION,
132+
# No job ID, because a basic query is elegible for jobs.query.
133+
job_id=None,
134+
request_id=mock.ANY,
135+
)
77136
),
78137
mock.call(
79-
_job_helpers.QueryCompleteEvent(
138+
_job_helpers.QueryCompleteEvent(
80139
billing_project=PROJECT,
81140
location=LOCATION,
82141
query_id="abcdefg",
83142
total_rows=100,
84143
total_bytes_processed=123,
85144
slot_millis=987,
86145
# No job ID or destination, because a basic query is elegible for jobs.query.
87-
job_id=None,
146+
job_id=None,
88147
destination=None,
89-
),
148+
),
90149
),
91150
]
92151
)
93152

94153

95-
def test_query_and_wait_bigframes_with_job_retry_callbacks(client):
154+
def test_query_and_wait_bigframes_with_query_retry_callbacks(client):
96155
client._http.request.side_effect = [
97156
exceptions.InternalServerError(
98-
"first try",
99-
errors=(
100-
{"reason": "jobInternalError"},
101-
)
157+
"first try", errors=({"reason": "jobInternalError"},)
102158
),
103159
make_response(
104160
{
@@ -117,37 +173,37 @@ def test_query_and_wait_bigframes_with_job_retry_callbacks(client):
117173
callback.assert_has_calls(
118174
[
119175
mock.call(
120-
_job_helpers.QuerySentEvent(
176+
_job_helpers.QuerySentEvent(
121177
query="SELECT 1",
122178
billing_project=PROJECT,
123179
location=LOCATION,
124180
# No job ID, because a basic query is elegible for jobs.query.
125-
job_id=None,
181+
job_id=None,
126182
request_id=mock.ANY,
127-
)
183+
)
128184
),
129185
mock.call(
130-
_job_helpers.QueryRetryEvent(
186+
_job_helpers.QueryRetryEvent(
131187
query="SELECT 1",
132188
billing_project=PROJECT,
133189
location=LOCATION,
134190
# No job ID, because a basic query is elegible for jobs.query.
135-
job_id=None,
191+
job_id=None,
136192
request_id=mock.ANY,
137-
)
193+
)
138194
),
139195
mock.call(
140-
_job_helpers.QueryCompleteEvent(
196+
_job_helpers.QueryCompleteEvent(
141197
billing_project=PROJECT,
142198
location=LOCATION,
143199
query_id=mock.ANY,
144200
total_rows=100,
145201
total_bytes_processed=123,
146202
slot_millis=987,
147203
# No job ID or destination, because a basic query is elegible for jobs.query.
148-
job_id=None,
204+
job_id=None,
149205
destination=None,
150-
),
206+
),
151207
),
152208
]
153209
)

0 commit comments

Comments
 (0)