Skip to content

Commit 56a0a0f

Browse files
committed
add async _call_api, RowIterator and get_job to implementation
1 parent 6e74478 commit 56a0a0f

File tree

5 files changed

+272
-28
lines changed

5 files changed

+272
-28
lines changed

google/cloud/bigquery/async_client.py

Lines changed: 135 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,67 @@
11
from google.cloud.bigquery.client import *
2+
from google.cloud.bigquery.client import (
3+
_add_server_timeout_header,
4+
_extract_job_reference,
5+
)
6+
from google.cloud.bigquery.opentelemetry_tracing import async_create_span
27
from google.cloud.bigquery import _job_helpers
3-
from google.cloud.bigquery import table
8+
from google.cloud.bigquery.table import *
9+
from google.api_core.page_iterator import HTTPIterator
410
from google.cloud.bigquery.retry import (
511
DEFAULT_ASYNC_JOB_RETRY,
612
DEFAULT_ASYNC_RETRY,
713
DEFAULT_TIMEOUT,
814
)
915
from google.api_core import retry_async as retries
1016
import asyncio
17+
from google.auth.transport import _aiohttp_requests
18+
19+
# This code is experimental
1120

1221

1322
class AsyncClient:
1423
def __init__(self, *args, **kwargs):
1524
self._client = Client(*args, **kwargs)
1625

26+
async def get_job(
27+
self,
28+
job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
29+
project: Optional[str] = None,
30+
location: Optional[str] = None,
31+
retry: retries.AsyncRetry = DEFAULT_ASYNC_RETRY,
32+
timeout: TimeoutType = DEFAULT_TIMEOUT,
33+
) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
34+
extra_params = {"projection": "full"}
35+
36+
project, location, job_id = _extract_job_reference(
37+
job_id, project=project, location=location
38+
)
39+
40+
if project is None:
41+
project = self._client.project
42+
43+
if location is None:
44+
location = self._client.location
45+
46+
if location is not None:
47+
extra_params["location"] = location
48+
49+
path = "/projects/{}/jobs/{}".format(project, job_id)
50+
51+
span_attributes = {"path": path, "job_id": job_id, "location": location}
52+
53+
resource = await self._call_api(
54+
retry,
55+
span_name="BigQuery.getJob",
56+
span_attributes=span_attributes,
57+
method="GET",
58+
path=path,
59+
query_params=extra_params,
60+
timeout=timeout,
61+
)
62+
63+
return await asyncio.to_thread(self._client.job_from_resource(await resource))
64+
1765
async def query_and_wait(
1866
self,
1967
query,
@@ -46,7 +94,7 @@ async def query_and_wait(
4694
)
4795

4896
return await async_query_and_wait(
49-
self._client,
97+
self,
5098
query,
5199
job_config=job_config,
52100
location=location,
@@ -59,9 +107,41 @@ async def query_and_wait(
59107
max_results=max_results,
60108
)
61109

110+
async def _call_api(
111+
self,
112+
retry: Optional[retries.AsyncRetry] = None,
113+
span_name: Optional[str] = None,
114+
span_attributes: Optional[Dict] = None,
115+
job_ref=None,
116+
headers: Optional[Dict[str, str]] = None,
117+
**kwargs,
118+
):
119+
kwargs = _add_server_timeout_header(headers, kwargs)
120+
121+
# Prepare the asynchronous request function
122+
# async with _aiohttp_requests.Request(**kwargs) as response:
123+
# response.raise_for_status()
124+
# response = await response.json() # or response.text()
125+
126+
async_call = functools.partial(self._client._connection.api_request, **kwargs)
127+
128+
if retry:
129+
async_call = retry(async_call)
130+
131+
if span_name is not None:
132+
async with async_create_span(
133+
name=span_name,
134+
attributes=span_attributes,
135+
client=self._client,
136+
job_ref=job_ref,
137+
):
138+
return async_call() # Await the asynchronous call
139+
140+
return async_call() # Await the asynchronous call
141+
62142

63143
async def async_query_and_wait(
64-
client: "Client",
144+
client: "AsyncClient",
65145
query: str,
66146
*,
67147
job_config: Optional[job.QueryJobConfig],
@@ -73,14 +153,12 @@ async def async_query_and_wait(
73153
job_retry: Optional[retries.AsyncRetry],
74154
page_size: Optional[int] = None,
75155
max_results: Optional[int] = None,
76-
) -> table.RowIterator:
77-
# Some API parameters aren't supported by the jobs.query API. In these
78-
# cases, fallback to a jobs.insert call.
156+
) -> RowIterator:
79157
if not _job_helpers._supported_by_jobs_query(job_config):
80158
return await async_wait_or_cancel(
81159
asyncio.to_thread(
82160
_job_helpers.query_jobs_insert(
83-
client=client,
161+
client=client._client,
84162
query=query,
85163
job_id=None,
86164
job_id_prefix=None,
@@ -116,7 +194,7 @@ async def async_query_and_wait(
116194
span_attributes = {"path": path}
117195

118196
if retry is not None:
119-
response = client._call_api( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth), add back retry()
197+
response = await client._call_api( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth), add back retry()
120198
retry=None, # We're calling the retry decorator ourselves, async_retries, need to implement after making HTTP calls async
121199
span_name="BigQuery.query",
122200
span_attributes=span_attributes,
@@ -127,7 +205,7 @@ async def async_query_and_wait(
127205
)
128206

129207
else:
130-
response = client._call_api(
208+
response = await client._call_api(
131209
retry=None,
132210
span_name="BigQuery.query",
133211
span_attributes=span_attributes,
@@ -149,17 +227,28 @@ async def async_query_and_wait(
149227
# client._list_rows_from_query_results directly. Need to update
150228
# RowIterator to fetch destination table via the job ID if needed.
151229
result = await async_wait_or_cancel(
152-
_job_helpers._to_query_job(client, query, job_config, response),
153-
api_timeout=api_timeout,
154-
wait_timeout=wait_timeout,
155-
retry=retry,
156-
page_size=page_size,
157-
max_results=max_results,
230+
asyncio.to_thread(
231+
_job_helpers._to_query_job(client._client, query, job_config, response),
232+
api_timeout=api_timeout,
233+
wait_timeout=wait_timeout,
234+
retry=retry,
235+
page_size=page_size,
236+
max_results=max_results,
237+
)
238+
)
239+
240+
def api_request(*args, **kwargs):
241+
return client._call_api(
242+
span_name="BigQuery.query",
243+
span_attributes=span_attributes,
244+
*args,
245+
timeout=api_timeout,
246+
**kwargs,
158247
)
159248

160-
result = table.RowIterator( # async of RowIterator? async version without all the pandas stuff
161-
client=client,
162-
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
249+
result = AsyncRowIterator( # async of RowIterator? async version without all the pandas stuff
250+
client=client._client,
251+
api_request=api_request,
163252
path=None,
164253
schema=query_results.schema,
165254
max_results=max_results,
@@ -186,10 +275,10 @@ async def async_wait_or_cancel(
186275
retry: Optional[retries.AsyncRetry],
187276
page_size: Optional[int],
188277
max_results: Optional[int],
189-
) -> table.RowIterator:
278+
) -> RowIterator:
190279
try:
191280
return asyncio.to_thread(
192-
job.result( # run in a background thread
281+
job.result(
193282
page_size=page_size,
194283
max_results=max_results,
195284
retry=retry,
@@ -204,3 +293,29 @@ async def async_wait_or_cancel(
204293
# Don't eat the original exception if cancel fails.
205294
pass
206295
raise
296+
297+
298+
class AsyncRowIterator(RowIterator):
299+
async def _get_next_page_response(self):
300+
"""Asynchronous version of fetching the next response page."""
301+
if self._first_page_response:
302+
rows = self._first_page_response.get(self._items_key, [])[
303+
: self.max_results
304+
]
305+
response = {
306+
self._items_key: rows,
307+
}
308+
if self._next_token in self._first_page_response:
309+
response[self._next_token] = self._first_page_response[self._next_token]
310+
311+
self._first_page_response = None
312+
return response
313+
314+
params = self._get_query_params()
315+
if self._page_size is not None:
316+
if self.page_number and "startIndex" in params:
317+
del params["startIndex"]
318+
params["maxResults"] = self._page_size
319+
return await self.api_request(
320+
method=self._HTTP_METHOD, path=self.path, query_params=params
321+
)

google/cloud/bigquery/opentelemetry_tracing.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import logging
16-
from contextlib import contextmanager
16+
from contextlib import contextmanager, asynccontextmanager
1717
from google.api_core.exceptions import GoogleAPICallError # type: ignore
1818

1919
logger = logging.getLogger(__name__)
@@ -86,6 +86,37 @@ def create_span(name, attributes=None, client=None, job_ref=None):
8686
raise
8787

8888

89+
@asynccontextmanager
90+
async def async_create_span(name, attributes=None, client=None, job_ref=None):
91+
"""Asynchronous context manager for creating and exporting OpenTelemetry spans."""
92+
global _warned_telemetry
93+
final_attributes = _get_final_span_attributes(attributes, client, job_ref)
94+
95+
if not HAS_OPENTELEMETRY:
96+
if not _warned_telemetry:
97+
logger.debug(
98+
"This service is instrumented using OpenTelemetry. "
99+
"OpenTelemetry or one of its components could not be imported; "
100+
"please add compatible versions of opentelemetry-api and "
101+
"opentelemetry-instrumentation packages in order to get BigQuery "
102+
"Tracing data."
103+
)
104+
_warned_telemetry = True
105+
yield None
106+
return
107+
tracer = trace.get_tracer(__name__)
108+
109+
async with tracer.start_as_current_span(
110+
name=name, attributes=final_attributes
111+
) as span:
112+
try:
113+
yield span
114+
except GoogleAPICallError as error:
115+
if error.code is not None:
116+
span.set_status(Status(http_status_to_status_code(error.code)))
117+
raise
118+
119+
89120
def _get_final_span_attributes(attributes=None, client=None, job_ref=None):
90121
"""Compiles attributes from: client, job_ref, user-provided attributes.
91122

noxfile.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ def default(session, install_extras=True):
8080
constraints_path,
8181
)
8282

83-
if install_extras and session.python in ["3.11", "3.12"]:
84-
install_target = ".[bqstorage,ipywidgets,pandas,tqdm,opentelemetry]"
83+
if install_extras and session.python in ["3.12"]:
84+
install_target = ".[bqstorage,ipywidgets,pandas,tqdm,opentelemetry,aiohttp]"
8585
elif install_extras:
8686
install_target = ".[all]"
8787
else:
@@ -188,8 +188,8 @@ def system(session):
188188
# Data Catalog needed for the column ACL test with a real Policy Tag.
189189
session.install("google-cloud-datacatalog", "-c", constraints_path)
190190

191-
if session.python in ["3.11", "3.12"]:
192-
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry]"
191+
if session.python in ["3.12"]:
192+
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry,aiohttp]" # look at geopandas to see if it supports 3.11/3.12 (up to 3.11)
193193
else:
194194
extras = "[all]"
195195
session.install("-e", f".{extras}", "-c", constraints_path)
@@ -254,8 +254,8 @@ def snippets(session):
254254
session.install("google-cloud-storage", "-c", constraints_path)
255255
session.install("grpcio", "-c", constraints_path)
256256

257-
if session.python in ["3.11", "3.12"]:
258-
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry]"
257+
if session.python in ["3.12"]:
258+
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry,aiohttp]"
259259
else:
260260
extras = "[all]"
261261
session.install("-e", f".{extras}", "-c", constraints_path)

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
# NOTE: Maintainers, please do not require google-cloud-core>=2.x.x
3838
# Until this issue is closed
3939
# https://github.com/googleapis/google-cloud-python/issues/10566
40-
"google-auth >= 2.14.1, <3.0.0dev",
4140
"google-cloud-core >= 1.6.0, <3.0.0dev",
4241
"google-resumable-media >= 0.6.0, < 3.0dev",
4342
"packaging >= 20.0.0",

0 commit comments

Comments
 (0)