Skip to content

Commit 9012fd7

Browse files
committed
refactoring by plumping async through more coroutines, added notes for more work, added async_retries(breaking tests)
1 parent d1a65bf commit 9012fd7

File tree

4 files changed

+104
-87
lines changed

4 files changed

+104
-87
lines changed

google/cloud/bigquery/async_client.py

Lines changed: 87 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
from google.cloud.bigquery.client import *
22
from google.cloud.bigquery import _job_helpers
33
from google.cloud.bigquery import table
4+
from google.cloud.bigquery.retry import (
5+
DEFAULT_ASYNC_JOB_RETRY,
6+
DEFAULT_ASYNC_RETRY,
7+
DEFAULT_TIMEOUT,
8+
)
9+
from google.api_core import retry_async as retries
410
import asyncio
11+
import google.auth.transport._aiohttp_requests
512

6-
class AsyncClient(Client):
13+
14+
class AsyncClient():
715
def __init__(self, *args, **kwargs):
8-
super().__init__(*args, **kwargs)
16+
self._client = Client(*args, **kwargs)
917

1018

1119
async def query_and_wait(
@@ -17,30 +25,30 @@ async def query_and_wait(
1725
project: Optional[str] = None,
1826
api_timeout: TimeoutType = DEFAULT_TIMEOUT,
1927
wait_timeout: TimeoutType = None,
20-
retry: retries.Retry = DEFAULT_RETRY,
21-
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
28+
retry: retries.AsyncRetry = DEFAULT_ASYNC_RETRY,
29+
job_retry: retries.AsyncRetry = DEFAULT_ASYNC_JOB_RETRY,
2230
page_size: Optional[int] = None,
2331
max_results: Optional[int] = None,
2432
) -> RowIterator:
2533

2634
if project is None:
27-
project = self.project
35+
project = self._client.project
2836

2937
if location is None:
30-
location = self.location
38+
location = self._client.location
3139

3240
# if job_config is not None:
33-
# self._verify_job_config_type(job_config, QueryJobConfig)
41+
# self._client._verify_job_config_type(job_config, QueryJobConfig)
3442

3543
# if job_config is not None:
36-
# self._verify_job_config_type(job_config, QueryJobConfig)
44+
# self._client._verify_job_config_type(job_config, QueryJobConfig)
3745

3846
job_config = _job_helpers.job_config_with_defaults(
39-
job_config, self._default_query_job_config
47+
job_config, self._client._default_query_job_config
4048
)
4149

4250
return await async_query_and_wait(
43-
self,
51+
self._client,
4452
query,
4553
job_config=job_config,
4654
location=location,
@@ -63,8 +71,8 @@ async def async_query_and_wait(
6371
project: str,
6472
api_timeout: Optional[float] = None,
6573
wait_timeout: Optional[float] = None,
66-
retry: Optional[retries.Retry],
67-
job_retry: Optional[retries.Retry],
74+
retry: Optional[retries.AsyncRetry],
75+
job_retry: Optional[retries.AsyncRetry],
6876
page_size: Optional[int] = None,
6977
max_results: Optional[int] = None,
7078
) -> table.RowIterator:
@@ -73,7 +81,7 @@ async def async_query_and_wait(
7381
# cases, fallback to a jobs.insert call.
7482
if not _job_helpers._supported_by_jobs_query(job_config):
7583
return await async_wait_or_cancel(
76-
_job_helpers.query_jobs_insert(
84+
asyncio.to_thread(_job_helpers.query_jobs_insert( # throw in a background thread
7785
client=client,
7886
query=query,
7987
job_id=None,
@@ -84,7 +92,7 @@ async def async_query_and_wait(
8492
retry=retry,
8593
timeout=api_timeout,
8694
job_retry=job_retry,
87-
),
95+
)),
8896
api_timeout=api_timeout,
8997
wait_timeout=wait_timeout,
9098
retry=retry,
@@ -105,90 +113,91 @@ async def async_query_and_wait(
105113
if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
106114
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"
107115

108-
async def do_query():
109-
request_body["requestId"] = _job_helpers.make_job_id()
110-
span_attributes = {"path": path}
111-
112-
# For easier testing, handle the retries ourselves.
113-
if retry is not None:
114-
response = retry(client._call_api)(
115-
retry=None, # We're calling the retry decorator ourselves.
116-
span_name="BigQuery.query",
117-
span_attributes=span_attributes,
118-
method="POST",
119-
path=path,
120-
data=request_body,
121-
timeout=api_timeout,
122-
)
123-
else:
124-
response = client._call_api(
125-
retry=None,
126-
span_name="BigQuery.query",
127-
span_attributes=span_attributes,
128-
method="POST",
129-
path=path,
130-
data=request_body,
131-
timeout=api_timeout,
132-
)
133116

134-
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
135-
# to fetch, there will be a job ID for jobs.getQueryResults.
136-
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
137-
response
117+
request_body["requestId"] = _job_helpers.make_job_id()
118+
span_attributes = {"path": path}
119+
120+
# For easier testing, handle the retries ourselves.
121+
if retry is not None:
122+
response = retry(client._call_api)( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth)
123+
retry=None, # We're calling the retry decorator ourselves, async_retries
124+
span_name="BigQuery.query",
125+
span_attributes=span_attributes,
126+
method="POST",
127+
path=path,
128+
data=request_body,
129+
timeout=api_timeout,
138130
)
139-
page_token = query_results.page_token
140-
more_pages = page_token is not None
141-
142-
if more_pages or not query_results.complete:
143-
# TODO(swast): Avoid a call to jobs.get in some cases (few
144-
# remaining pages) by waiting for the query to finish and calling
145-
# client._list_rows_from_query_results directly. Need to update
146-
# RowIterator to fetch destination table via the job ID if needed.
147-
return await async_wait_or_cancel(
148-
_job_helpers._to_query_job(client, query, job_config, response),
149-
api_timeout=api_timeout,
150-
wait_timeout=wait_timeout,
151-
retry=retry,
152-
page_size=page_size,
153-
max_results=max_results,
154-
)
155-
156-
return table.RowIterator(
157-
client=client,
158-
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
159-
path=None,
160-
schema=query_results.schema,
161-
max_results=max_results,
131+
else:
132+
response = client._call_api(
133+
retry=None,
134+
span_name="BigQuery.query",
135+
span_attributes=span_attributes,
136+
method="POST",
137+
path=path,
138+
data=request_body,
139+
timeout=api_timeout,
140+
)
141+
142+
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
143+
# to fetch, there will be a job ID for jobs.getQueryResults.
144+
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
145+
await response
146+
)
147+
page_token = query_results.page_token
148+
more_pages = page_token is not None
149+
150+
if more_pages or not query_results.complete:
151+
# TODO(swast): Avoid a call to jobs.get in some cases (few
152+
# remaining pages) by waiting for the query to finish and calling
153+
# client._list_rows_from_query_results directly. Need to update
154+
# RowIterator to fetch destination table via the job ID if needed.
155+
result = await async_wait_or_cancel(
156+
_job_helpers._to_query_job(client, query, job_config, response),
157+
api_timeout=api_timeout,
158+
wait_timeout=wait_timeout,
159+
retry=retry,
162160
page_size=page_size,
163-
total_rows=query_results.total_rows,
164-
first_page_response=response,
165-
location=query_results.location,
166-
job_id=query_results.job_id,
167-
query_id=query_results.query_id,
168-
project=query_results.project,
169-
num_dml_affected_rows=query_results.num_dml_affected_rows,
161+
max_results=max_results,
170162
)
171163

164+
result = table.RowIterator( # async of RowIterator? async version without all the pandas stuff
165+
client=client,
166+
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
167+
path=None,
168+
schema=query_results.schema,
169+
max_results=max_results,
170+
page_size=page_size,
171+
total_rows=query_results.total_rows,
172+
first_page_response=response,
173+
location=query_results.location,
174+
job_id=query_results.job_id,
175+
query_id=query_results.query_id,
176+
project=query_results.project,
177+
num_dml_affected_rows=query_results.num_dml_affected_rows,
178+
)
179+
180+
172181
if job_retry is not None:
173-
return job_retry(do_query)()
182+
return job_retry(result) # AsyncRetries, new default objects, default_job_retry_async, default_retry_async
174183
else:
175-
return await do_query()
184+
return result
176185

177186
async def async_wait_or_cancel(
178187
job: job.QueryJob,
179188
api_timeout: Optional[float],
180189
wait_timeout: Optional[float],
181-
retry: Optional[retries.Retry],
190+
retry: Optional[retries.AsyncRetry],
182191
page_size: Optional[int],
183192
max_results: Optional[int],
184193
) -> table.RowIterator:
185194
try:
186-
return await job.result(
195+
return asyncio.to_thread(job.result( # run in a background thread
187196
page_size=page_size,
188197
max_results=max_results,
189198
retry=retry,
190199
timeout=wait_timeout,
191-
)
200+
))
192201
except Exception:
193202
# Attempt to cancel the job since we can't return the results.
194203
try:

google/cloud/bigquery/retry.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from google.api_core import exceptions
16-
from google.api_core import retry
16+
from google.api_core import retry, retry_async
1717
from google.auth import exceptions as auth_exceptions # type: ignore
1818
import requests.exceptions
1919

@@ -90,3 +90,9 @@ def _job_should_retry(exc):
9090
"""
9191
The default job retry object.
9292
"""
93+
94+
DEFAULT_ASYNC_RETRY = retry_async.AsyncRetry(predicate=_should_retry, deadline=_DEFAULT_RETRY_DEADLINE) # deadline is deprecated
95+
96+
DEFAULT_ASYNC_JOB_RETRY = retry_async.AsyncRetry(
97+
predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE # deadline is deprecated
98+
)

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@
8383
"proto-plus >= 1.15.0, <2.0.0dev",
8484
"protobuf>=3.19.5,<5.0.0dev,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", # For the legacy proto-based types.
8585
],
86+
"google-auth": [
87+
"aiohttp",
88+
]
8689
}
8790

8891
all_extras = []

tests/unit/test_async_client.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def test_ctor_defaults(self):
128128

129129
creds = _make_credentials()
130130
http = object()
131-
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
131+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)._client
132132
self.assertIsInstance(client._connection, Connection)
133133
self.assertIs(client._connection.credentials, creds)
134134
self.assertIs(client._connection.http, http)
@@ -148,7 +148,7 @@ def test_ctor_w_empty_client_options(self):
148148
credentials=creds,
149149
_http=http,
150150
client_options=client_options,
151-
)
151+
)._client
152152
self.assertEqual(
153153
client._connection.API_BASE_URL, client._connection.DEFAULT_API_ENDPOINT
154154
)
@@ -177,7 +177,7 @@ async def test_query_and_wait_defaults(self):
177177
creds = _make_credentials()
178178
http = object()
179179
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
180-
conn = client._connection = make_connection(jobs_query_response)
180+
conn = client._client._connection = make_connection(jobs_query_response)
181181

182182
rows = await client.query_and_wait(query)
183183

@@ -190,7 +190,6 @@ async def test_query_and_wait_defaults(self):
190190
self.assertIsNone(rows.location)
191191

192192
# Verify the request we send is to jobs.query.
193-
conn.api_request = await conn.api_request
194193
conn.api_request.assert_called_once()
195194
_, req = conn.api_request.call_args
196195
self.assertEqual(req["method"], "POST")
@@ -223,7 +222,7 @@ async def test_query_and_wait_w_default_query_job_config(self):
223222
},
224223
),
225224
)
226-
conn = client._connection = make_connection(jobs_query_response)
225+
conn = client._client._connection = make_connection(jobs_query_response)
227226

228227
future_result = client.query_and_wait(query)
229228
_ = await future_result
@@ -255,7 +254,7 @@ async def test_query_and_wait_w_job_config(self):
255254
credentials=creds,
256255
_http=http,
257256
)
258-
conn = client._connection = make_connection(jobs_query_response)
257+
conn = client._client._connection = make_connection(jobs_query_response)
259258

260259
future_result = client.query_and_wait(
261260
query,
@@ -287,7 +286,7 @@ async def test_query_and_wait_w_location(self):
287286
creds = _make_credentials()
288287
http = object()
289288
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
290-
conn = client._connection = make_connection(jobs_query_response)
289+
conn = client._client._connection = make_connection(jobs_query_response)
291290

292291
future_result = client.query_and_wait(query, location="not-the-client-location")
293292
_ = await future_result
@@ -312,7 +311,7 @@ async def test_query_and_wait_w_project(self):
312311
creds = _make_credentials()
313312
http = object()
314313
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
315-
conn = client._connection = make_connection(jobs_query_response)
314+
conn = client._client._connection = make_connection(jobs_query_response)
316315

317316
future_result = client.query_and_wait(query, project="not-the-client-project")
318317
_ = await future_result

0 commit comments

Comments
 (0)