Skip to content

Commit 91c8cce

Browse files
committed
add QueryCompleteEvent
1 parent fc8fc1e commit 91c8cce

File tree

3 files changed

+151
-9
lines changed

3 files changed

+151
-9
lines changed

google/cloud/bigquery/_job_helpers.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def query_jobs_insert(
132132
job_id_given = job_id is not None
133133
job_id_save = job_id
134134
job_config_save = job_config
135+
query_sent_factory = QuerySentEventFactory()
135136

136137
def do_query():
137138
# Make a copy now, so that original doesn't get changed by the process
@@ -145,7 +146,7 @@ def do_query():
145146
try:
146147
query_job._begin(retry=retry, timeout=timeout)
147148
callback(
148-
QuerySentEvent(
149+
query_sent_factory(
149150
query=query,
150151
billing_project=query_job.project,
151152
location=query_job.location,
@@ -515,14 +516,16 @@ def query_and_wait(
515516
request_body["maxResults"] = page_size or max_results
516517
if client.default_job_creation_mode:
517518
request_body["jobCreationMode"] = client.default_job_creation_mode
519+
520+
query_sent_factory = QuerySentEventFactory()
518521

519522
def do_query():
520523
request_id = make_job_id()
521524
request_body["requestId"] = request_id
522525
span_attributes = {"path": path}
523526

524527
callback(
525-
QuerySentEvent(
528+
query_sent_factory(
526529
query=query,
527530
billing_project=project,
528531
location=location,
@@ -575,6 +578,18 @@ def do_query():
575578
max_results=max_results,
576579
)
577580

581+
callback(
582+
QueryCompleteEvent(
583+
billing_project=project,
584+
location=query_results.location,
585+
query_id=query_results.query_id,
586+
job_id=query_results.job_id,
587+
total_rows=query_results.total_rows,
588+
total_bytes_processed=query_results.total_bytes_processed,
589+
slot_millis=query_results.slot_millis,
590+
destination=None,
591+
)
592+
)
578593
return table.RowIterator(
579594
client=client,
580595
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
@@ -665,10 +680,40 @@ def _wait_or_cancel(
665680
raise
666681

667682

683+
@dataclasses.dataclass(frozen=True)
684+
class QueryCompleteEvent:
685+
"""Query finished successfully."""
686+
billing_project: str
687+
location: Optional[str]
688+
query_id: Optional[str]
689+
job_id: Optional[str]
690+
destination: Optional[table.TableReference]
691+
total_rows: Optional[int]
692+
total_bytes_processed: Optional[int]
693+
slot_millis: Optional[int]
694+
695+
668696
@dataclasses.dataclass(frozen=True)
669697
class QuerySentEvent:
698+
"""Query sent to BigQuery."""
670699
query: str
671700
billing_project: str
672701
location: Optional[str]
673702
job_id: Optional[str]
674703
request_id: Optional[str]
704+
705+
706+
class QueryRetryEvent(QuerySentEvent):
707+
"""Query sent another time because the previous failed."""
708+
709+
710+
class QuerySentEventFactory:
711+
"""Creates a QuerySentEvent first, then QueryRetryEvent after that."""
712+
713+
def __init__(self):
714+
self._event_constructor = QuerySentEvent
715+
716+
def __call__(self, **kwargs):
717+
result = self._event_constructor(**kwargs)
718+
self._event_constructor = QueryRetryEvent
719+
return result

google/cloud/bigquery/query.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1228,11 +1228,18 @@ def location(self):
12281228
12291229
See:
12301230
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.job_reference
1231+
or https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.location
12311232
12321233
Returns:
12331234
str: Job ID of the query job.
12341235
"""
1235-
return self._properties.get("jobReference", {}).get("location")
1236+
location = self._properties.get("jobReference", {}).get("location")
1237+
1238+
# Sometimes there's no job, but we still want to get the location
1239+
# information. Prefer the value from job for backwards compatibilitity.
1240+
if not location:
1241+
location = self._properties.get("location")
1242+
return location
12361243

12371244
@property
12381245
def query_id(self) -> Optional[str]:

tests/unit/test_client_bigframes.py

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import pytest
2222

2323
import google.auth.credentials
24+
from google.api_core import exceptions
2425
import google.cloud.bigquery.client
2526
from google.cloud.bigquery import _job_helpers
2627

@@ -32,7 +33,7 @@
3233
def make_response(body, *, status_code: int = 200):
3334
response = mock.Mock()
3435
type(response).status_code = mock.PropertyMock(return_value=status_code)
35-
response.json.return_value = {}
36+
response.json.return_value = body
3637
return response
3738

3839

@@ -49,15 +50,104 @@ def client():
4950
def test_query_and_wait_bigframes_callback(client):
5051
client._http.request.side_effect = [
5152
make_response(
52-
{"jobComplete": True}
53+
{
54+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
55+
"location": LOCATION,
56+
"queryId": "abcdefg",
57+
"totalRows": "100",
58+
"totalBytesProcessed": "123",
59+
"totalSlotMs": "987",
60+
"jobComplete": True,
61+
}
5362
),
5463
]
5564
callback = mock.Mock()
5665
client._query_and_wait_bigframes(query="SELECT 1", callback=callback)
5766
callback.assert_has_calls(
58-
mock.call(
59-
_job_helpers.QuerySentEvent(
60-
query="SELECT 1",
67+
[
68+
mock.call(
69+
_job_helpers.QuerySentEvent(
70+
query="SELECT 1",
71+
billing_project=PROJECT,
72+
location=LOCATION,
73+
# No job ID, because a basic query is elegible for jobs.query.
74+
job_id=None,
75+
request_id=mock.ANY,
76+
)
6177
),
62-
)
78+
mock.call(
79+
_job_helpers.QueryCompleteEvent(
80+
billing_project=PROJECT,
81+
location=LOCATION,
82+
query_id="abcdefg",
83+
total_rows=100,
84+
total_bytes_processed=123,
85+
slot_millis=987,
86+
# No job ID or destination, because a basic query is elegible for jobs.query.
87+
job_id=None,
88+
destination=None,
89+
),
90+
),
91+
]
92+
)
93+
94+
95+
def test_query_and_wait_bigframes_with_job_retry_callbacks(client):
96+
client._http.request.side_effect = [
97+
exceptions.InternalServerError(
98+
"first try",
99+
errors=(
100+
{"reason": "jobInternalError"},
101+
)
102+
),
103+
make_response(
104+
{
105+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
106+
"location": LOCATION,
107+
"queryId": "abcdefg",
108+
"totalRows": "100",
109+
"totalBytesProcessed": "123",
110+
"totalSlotMs": "987",
111+
"jobComplete": True,
112+
}
113+
),
114+
]
115+
callback = mock.Mock()
116+
client._query_and_wait_bigframes(query="SELECT 1", callback=callback)
117+
callback.assert_has_calls(
118+
[
119+
mock.call(
120+
_job_helpers.QuerySentEvent(
121+
query="SELECT 1",
122+
billing_project=PROJECT,
123+
location=LOCATION,
124+
# No job ID, because a basic query is elegible for jobs.query.
125+
job_id=None,
126+
request_id=mock.ANY,
127+
)
128+
),
129+
mock.call(
130+
_job_helpers.QueryRetryEvent(
131+
query="SELECT 1",
132+
billing_project=PROJECT,
133+
location=LOCATION,
134+
# No job ID, because a basic query is elegible for jobs.query.
135+
job_id=None,
136+
request_id=mock.ANY,
137+
)
138+
),
139+
mock.call(
140+
_job_helpers.QueryCompleteEvent(
141+
billing_project=PROJECT,
142+
location=LOCATION,
143+
query_id=mock.ANY,
144+
total_rows=100,
145+
total_bytes_processed=123,
146+
slot_millis=987,
147+
# No job ID or destination, because a basic query is elegible for jobs.query.
148+
job_id=None,
149+
destination=None,
150+
),
151+
),
152+
]
63153
)

0 commit comments

Comments
 (0)