Skip to content

Commit 1eef56a

Browse files
authored
feat(FIR-45961): expose async query info method (#431)
1 parent 6797f83 commit 1eef56a

File tree

9 files changed

+364
-50
lines changed

9 files changed

+364
-50
lines changed

docsrc/Connecting_and_queries.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,24 @@ will send a cancel request to the server and the query will be stopped.
672672
print(successful) # False
673673

674674

675+
Retrieving asynchronous query information
676+
-----------------------------------------
677+
678+
To get additional information about an async query, use the :py:meth:`firebolt.db.connection.Connection.get_async_query_info` method.
679+
This method returns a list of ``AsyncQueryInfo`` objects, each containing detailed information about the query execution.
680+
681+
::
682+
683+
token = cursor.async_query_token
684+
query_info_list = connection.get_async_query_info(token)
685+
686+
for query_info in query_info_list:
687+
print(f"Query ID: {query_info.query_id}")
688+
print(f"Status: {query_info.status}")
689+
print(f"Submitted time: {query_info.submitted_time}")
690+
print(f"Rows scanned: {query_info.scanned_rows}")
691+
print(f"Error message: {query_info.error_message}")
692+
675693

676694
Streaming query results
677695
==============================

src/firebolt/async_db/connection.py

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ASYNC_QUERY_STATUS_SUCCESSFUL,
1818
AsyncQueryInfo,
1919
BaseConnection,
20+
_parse_async_query_info_results,
2021
)
2122
from firebolt.common.cache import _firebolt_system_engine_cache
2223
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
@@ -92,34 +93,41 @@ def cursor(self, **kwargs: Any) -> Cursor:
9293
return c
9394

9495
# Server-side async methods
95-
async def _get_async_query_info(self, token: str) -> AsyncQueryInfo:
96+
async def get_async_query_info(self, token: str) -> List[AsyncQueryInfo]:
97+
"""
98+
Retrieve information about an asynchronous query using its token.
99+
This method fetches the status and details of an asynchronous query
100+
identified by the provided token.
101+
Args:
102+
token (str): The token identifying the asynchronous query.
103+
Returns:
104+
List[AsyncQueryInfo]: A list of AsyncQueryInfo objects containing
105+
details about the asynchronous query.
106+
"""
107+
96108
if self.cursor_type != CursorV2:
97109
raise FireboltError(
98110
"This method is only supported for connection with service account."
99111
)
100112
cursor = self.cursor()
101113
await cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token])
102-
result = await cursor.fetchone()
103-
if cursor.rowcount != 1 or not result:
114+
results = await cursor.fetchall()
115+
if not results:
104116
raise FireboltError("Unexpected result from async query status request.")
105117
columns = cursor.description
106-
result_dict = dict(zip([column.name for column in columns], result))
118+
columns_names = [column.name for column in columns]
119+
return _parse_async_query_info_results(results, columns_names)
107120

108-
if not result_dict.get("status") or not result_dict.get("query_id"):
109-
raise FireboltError(
110-
"Something went wrong - async query status request returned "
111-
"unexpected result with status and/or query id missing. "
112-
"Rerun the command and reach out to Firebolt support if "
113-
"the issue persists."
121+
def _raise_if_multiple_async_results(
122+
self, async_query_info: List[AsyncQueryInfo]
123+
) -> None:
124+
# We expect only one result in current implementation
125+
if len(async_query_info) != 1:
126+
raise NotImplementedError(
127+
"Async query status request returned more than one result. "
128+
"This is not supported yet."
114129
)
115130

116-
# Only pass the expected keys to AsyncQueryInfo
117-
filtered_result_dict = {
118-
k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields
119-
}
120-
121-
return AsyncQueryInfo(**filtered_result_dict)
122-
123131
async def is_async_query_running(self, token: str) -> bool:
124132
"""
125133
Check if an async query is still running.
@@ -130,8 +138,10 @@ async def is_async_query_running(self, token: str) -> bool:
130138
Returns:
131139
bool: True if async query is still running, False otherwise
132140
"""
133-
async_query_details = await self._get_async_query_info(token)
134-
return async_query_details.status == ASYNC_QUERY_STATUS_RUNNING
141+
async_query_info = await self.get_async_query_info(token)
142+
self._raise_if_multiple_async_results(async_query_info)
143+
# We expect only one result
144+
return async_query_info[0].status == ASYNC_QUERY_STATUS_RUNNING
135145

136146
async def is_async_query_successful(self, token: str) -> Optional[bool]:
137147
"""
@@ -144,10 +154,12 @@ async def is_async_query_successful(self, token: str) -> Optional[bool]:
144154
bool: None if the query is still running, True if successful,
145155
False otherwise
146156
"""
147-
async_query_details = await self._get_async_query_info(token)
148-
if async_query_details.status == ASYNC_QUERY_STATUS_RUNNING:
157+
async_query_info_list = await self.get_async_query_info(token)
158+
self._raise_if_multiple_async_results(async_query_info_list)
159+
async_query_info = async_query_info_list[0]
160+
if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING:
149161
return None
150-
return async_query_details.status == ASYNC_QUERY_STATUS_SUCCESSFUL
162+
return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL
151163

152164
async def cancel_async_query(self, token: str) -> None:
153165
"""
@@ -156,10 +168,10 @@ async def cancel_async_query(self, token: str) -> None:
156168
Args:
157169
token: Async query token. Can be obtained from Cursor.async_query_token.
158170
"""
159-
async_query_details = await self._get_async_query_info(token)
160-
async_query_id = async_query_details.query_id
171+
async_query_info = await self.get_async_query_info(token)
172+
self._raise_if_multiple_async_results(async_query_info)
161173
cursor = self.cursor()
162-
await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id])
174+
await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_info[0].query_id])
163175

164176
# Context manager support
165177
async def __aenter__(self) -> Connection:

src/firebolt/common/base_connection.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from collections import namedtuple
22
from typing import Any, List, Type
33

4-
from firebolt.utils.exception import ConnectionClosedError
4+
from firebolt.common._types import ColType
5+
from firebolt.utils.exception import ConnectionClosedError, FireboltError
56

67
ASYNC_QUERY_STATUS_RUNNING = "RUNNING"
78
ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY"
@@ -27,6 +28,30 @@
2728
)
2829

2930

31+
def _parse_async_query_info_results(
32+
result: List[List[ColType]], columns_names: List[str]
33+
) -> List[AsyncQueryInfo]:
34+
async_query_infos = []
35+
for row in result:
36+
result_dict = dict(zip(columns_names, row))
37+
38+
if not result_dict.get("status") or not result_dict.get("query_id"):
39+
raise FireboltError(
40+
"Something went wrong - async query status request returned "
41+
"unexpected result with status and/or query id missing. "
42+
"Rerun the command and reach out to Firebolt support if "
43+
"the issue persists."
44+
)
45+
46+
# Only pass the expected keys to AsyncQueryInfo
47+
filtered_result_dict = {
48+
k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields
49+
}
50+
51+
async_query_infos.append(AsyncQueryInfo(**filtered_result_dict))
52+
return async_query_infos
53+
54+
3055
class BaseConnection:
3156
def __init__(self, cursor_type: Type) -> None:
3257
self.cursor_type = cursor_type

src/firebolt/db/connection.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ASYNC_QUERY_STATUS_SUCCESSFUL,
1717
AsyncQueryInfo,
1818
BaseConnection,
19+
_parse_async_query_info_results,
1920
)
2021
from firebolt.common.cache import _firebolt_system_engine_cache
2122
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
@@ -230,34 +231,41 @@ def close(self) -> None:
230231

231232
# Server-side async methods
232233

233-
def _get_async_query_info(self, token: str) -> AsyncQueryInfo:
234+
def get_async_query_info(self, token: str) -> List[AsyncQueryInfo]:
235+
"""
236+
Retrieve information about an asynchronous query using its token.
237+
This method fetches the status and details of an asynchronous query
238+
identified by the provided token.
239+
Args:
240+
token (str): The token identifying the asynchronous query.
241+
Returns:
242+
List[AsyncQueryInfo]: A list of AsyncQueryInfo objects containing
243+
details about the asynchronous query.
244+
"""
245+
234246
if self.cursor_type != CursorV2:
235247
raise FireboltError(
236248
"This method is only supported for connection with service account."
237249
)
238250
cursor = self.cursor()
239251
cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token])
240-
result = cursor.fetchone()
241-
if cursor.rowcount != 1 or not result:
252+
results = cursor.fetchall()
253+
if not results:
242254
raise FireboltError("Unexpected result from async query status request.")
243255
columns = cursor.description
244-
result_dict = dict(zip([column.name for column in columns], result))
256+
columns_names = [column.name for column in columns]
257+
return _parse_async_query_info_results(results, columns_names)
245258

246-
if not result_dict.get("status") or not result_dict.get("query_id"):
247-
raise FireboltError(
248-
"Something went wrong - async query status request returned "
249-
"unexpected result with status and/or query id missing. "
250-
"Rerun the command and reach out to Firebolt support if "
251-
"the issue persists."
259+
def _raise_if_multiple_async_results(
260+
self, async_query_info: List[AsyncQueryInfo]
261+
) -> None:
262+
# We expect only one result in current implementation
263+
if len(async_query_info) != 1:
264+
raise NotImplementedError(
265+
"Async query status request returned more than one result. "
266+
"This is not supported yet."
252267
)
253268

254-
# Only pass the expected keys to AsyncQueryInfo
255-
filtered_result_dict = {
256-
k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields
257-
}
258-
259-
return AsyncQueryInfo(**filtered_result_dict)
260-
261269
def is_async_query_running(self, token: str) -> bool:
262270
"""
263271
Check if an async query is still running.
@@ -268,7 +276,10 @@ def is_async_query_running(self, token: str) -> bool:
268276
Returns:
269277
bool: True if async query is still running, False otherwise
270278
"""
271-
return self._get_async_query_info(token).status == ASYNC_QUERY_STATUS_RUNNING
279+
async_query_info = self.get_async_query_info(token)
280+
self._raise_if_multiple_async_results(async_query_info)
281+
# We expect only one result
282+
return async_query_info[0].status == ASYNC_QUERY_STATUS_RUNNING
272283

273284
def is_async_query_successful(self, token: str) -> Optional[bool]:
274285
"""
@@ -281,7 +292,9 @@ def is_async_query_successful(self, token: str) -> Optional[bool]:
281292
bool: None if the query is still running, True if successful,
282293
False otherwise
283294
"""
284-
async_query_info = self._get_async_query_info(token)
295+
async_query_info_list = self.get_async_query_info(token)
296+
self._raise_if_multiple_async_results(async_query_info_list)
297+
async_query_info = async_query_info_list[0]
285298
if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING:
286299
return None
287300
return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL
@@ -293,9 +306,10 @@ def cancel_async_query(self, token: str) -> None:
293306
Args:
294307
token: Async query token. Can be obtained from Cursor.async_query_token.
295308
"""
296-
async_query_id = self._get_async_query_info(token).query_id
309+
async_query_info = self.get_async_query_info(token)
310+
self._raise_if_multiple_async_results(async_query_info)
297311
cursor = self.cursor()
298-
cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id])
312+
cursor.execute(ASYNC_QUERY_CANCEL, [async_query_info[0].query_id])
299313

300314
# Context manager support
301315
def __enter__(self) -> Connection:

tests/integration/dbapi/async/V2/test_server_async.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from pytest import raises
66

7-
from firebolt.db import Connection
7+
from firebolt.async_db import Connection
88
from firebolt.utils.exception import FireboltError, FireboltStructuredError
99

1010
LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec
@@ -30,6 +30,21 @@ async def test_insert_async(connection: Connection) -> None:
3030
await cursor.execute(f"SELECT * FROM {table_name}")
3131
result = await cursor.fetchall()
3232
assert result == [[1, "test"]]
33+
info = await connection.get_async_query_info(token)
34+
assert len(info) == 1
35+
# Verify query id is showing in query history
36+
for _ in range(3):
37+
await cursor.execute(
38+
"SELECT 1 FROM information_schema.engine_query_history WHERE status='STARTED_EXECUTION' AND query_id = ?",
39+
[info[0].query_id],
40+
)
41+
query_history_result = await cursor.fetchall()
42+
if len(query_history_result) != 0:
43+
break
44+
# Sometimes it takes a while for the query history to be updated
45+
# so we will retry a few times
46+
time.sleep(10)
47+
assert len(query_history_result) == 1
3348
finally:
3449
await cursor.execute(f"DROP TABLE {table_name}")
3550

@@ -50,7 +65,7 @@ async def test_insert_async_running(connection: Connection) -> None:
5065

5166

5267
async def test_check_async_execution_from_another_connection(
53-
connection_factory: Callable[..., Connection]
68+
connection_factory: Callable[..., Connection],
5469
) -> None:
5570
connection_1 = await connection_factory()
5671
connection_2 = await connection_factory()

tests/integration/dbapi/sync/V2/test_server_async.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ def test_insert_async(connection: Connection) -> None:
2828
cursor.execute(f"SELECT * FROM {table_name}")
2929
result = cursor.fetchall()
3030
assert result == [[1, "test"]]
31+
info = connection.get_async_query_info(token)
32+
assert len(info) == 1
33+
# Verify query id is showing in query history
34+
for _ in range(3):
35+
cursor.execute(
36+
"SELECT 1 FROM information_schema.engine_query_history WHERE status='STARTED_EXECUTION' AND query_id = ?",
37+
[info[0].query_id],
38+
)
39+
query_history_result = cursor.fetchall()
40+
if len(query_history_result) != 0:
41+
break
42+
# Sometimes it takes a while for the query history to be updated
43+
# so we will retry a few times
44+
time.sleep(10)
45+
assert len(query_history_result) == 1
3146
finally:
3247
cursor.execute(f"DROP TABLE {table_name}")
3348

@@ -48,7 +63,7 @@ def test_insert_async_running(connection: Connection) -> None:
4863

4964

5065
def test_check_async_execution_from_another_connection(
51-
connection_factory: Callable[..., Connection]
66+
connection_factory: Callable[..., Connection],
5267
) -> None:
5368
connection_1 = connection_factory()
5469
connection_2 = connection_factory()

0 commit comments

Comments
 (0)