Skip to content

Commit d1a65bf

Browse files
committed
refactor to make it closer to synchronous execution for testing
1 parent 4e78888 commit d1a65bf

File tree

1 file changed

+22
-48
lines changed

1 file changed

+22
-48
lines changed

google/cloud/bigquery/async_client.py

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from google.cloud.bigquery import _job_helpers
33
from google.cloud.bigquery import table
44
import asyncio
5-
from google.api_core import gapic_v1, retry_async
65

76
class AsyncClient(Client):
87
def __init__(self, *args, **kwargs):
@@ -110,44 +109,27 @@ async def do_query():
110109
request_body["requestId"] = _job_helpers.make_job_id()
111110
span_attributes = {"path": path}
112111

113-
# Wrap the RPC method; this adds retry and timeout information,
114-
# and friendly error handling.
115-
rpc = gapic_v1.method_async.wrap_method(
116-
client._call_api,
117-
default_retry=retry_async.AsyncRetry(
118-
initial=0.1,
119-
maximum=60.0,
120-
multiplier=1.3,
121-
predicate=retries.if_exception_type(
122-
core_exceptions.ServiceUnavailable,
123-
),
124-
deadline=60.0,
125-
),
126-
default_timeout=60.0,
127-
client_info=DEFAULT_CLIENT_INFO,
128-
)
129-
130112
# For easier testing, handle the retries ourselves.
131-
# if retry is not None:
132-
# response = retry(client._call_api)(
133-
# retry=None, # We're calling the retry decorator ourselves.
134-
# span_name="BigQuery.query",
135-
# span_attributes=span_attributes,
136-
# method="POST",
137-
# path=path,
138-
# data=request_body,
139-
# timeout=api_timeout,
140-
# )
141-
# else:
142-
response = await rpc(
143-
retry=None,
144-
span_name="BigQuery.query",
145-
span_attributes=span_attributes,
146-
method="POST",
147-
path=path,
148-
data=request_body,
149-
timeout=api_timeout,
150-
)
113+
if retry is not None:
114+
response = retry(client._call_api)(
115+
retry=None, # We're calling the retry decorator ourselves.
116+
span_name="BigQuery.query",
117+
span_attributes=span_attributes,
118+
method="POST",
119+
path=path,
120+
data=request_body,
121+
timeout=api_timeout,
122+
)
123+
else:
124+
response = client._call_api(
125+
retry=None,
126+
span_name="BigQuery.query",
127+
span_attributes=span_attributes,
128+
method="POST",
129+
path=path,
130+
data=request_body,
131+
timeout=api_timeout,
132+
)
151133

152134
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
153135
# to fetch, there will be a job ID for jobs.getQueryResults.
@@ -186,12 +168,11 @@ async def do_query():
186168
project=query_results.project,
187169
num_dml_affected_rows=query_results.num_dml_affected_rows,
188170
)
189-
190171

191172
if job_retry is not None:
192173
return job_retry(do_query)()
193174
else:
194-
return do_query()
175+
return await do_query()
195176

196177
async def async_wait_or_cancel(
197178
job: job.QueryJob,
@@ -215,11 +196,4 @@ async def async_wait_or_cancel(
215196
except Exception:
216197
# Don't eat the original exception if cancel fails.
217198
pass
218-
raise
219-
220-
221-
DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
222-
"3.17.2"
223-
)
224-
225-
__all__ = ("AsyncClient",)
199+
raise

0 commit comments

Comments
 (0)