Skip to content

Commit 0ef3224

Browse files
chore: Fir 30859 refactor out server side async execution support from python sdk (#370)
1 parent cecfe08 commit 0ef3224

File tree

14 files changed

+33
-1436
lines changed

14 files changed

+33
-1436
lines changed

docsrc/Connecting_and_queries.rst

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -610,87 +610,6 @@ Note, however, that it is not possible to retrieve the results of a server-side
610610
query, so these queries are best used for running DMLs and DDLs and ``SELECT``\ s should be used
611611
only for warming the cache.
612612

613-
Executing asynchronous DDL commands
614-
------------------------------------
615-
616-
.. _ddl_execution_example:
617-
618-
Executing queries server-side asynchronously is similar to executing server-side synchronous
619-
queries, but the ``execute()`` command receives an extra parameter, ``async_execution=True``.
620-
The example below uses ``cursor`` to create a new table called ``test_table``.
621-
``execute(query, async_execution=True)`` will return a query ID, which can subsequently
622-
be used to check the query status.
623-
624-
::
625-
626-
query_id = cursor.execute(
627-
"""
628-
CREATE FACT TABLE IF NOT EXISTS test_table (
629-
id INT,
630-
name TEXT
631-
)
632-
PRIMARY INDEX id;
633-
""",
634-
async_execution=True
635-
)
636-
637-
638-
To check the status of a query, send the query ID to ```get_status()``` to receive a
639-
QueryStatus enumeration object. Possible statuses are:
640-
641-
642-
* ``RUNNING``
643-
* ``ENDED_SUCCESSFULLY``
644-
* ``ENDED_UNSUCCESSFULLY``
645-
* ``NOT_READY``
646-
* ``STARTED_EXECUTION``
647-
* ``PARSE_ERROR``
648-
* ``CANCELED_EXECUTION``
649-
* ``EXECUTION_ERROR``
650-
651-
652-
Once the status of the table creation is ``ENDED_SUCCESSFULLY``, data can be inserted into it:
653-
654-
::
655-
656-
from firebolt.async_db.cursor import QueryStatus
657-
658-
query_status = cursor.get_status(query_id)
659-
660-
if query_status == QueryStatus.ENDED_SUCCESSFULLY:
661-
cursor.execute(
662-
"""
663-
INSERT INTO test_table VALUES
664-
(1, 'hello'),
665-
(2, 'world'),
666-
(3, '!');
667-
"""
668-
)
669-
670-
671-
In addition, server-side asynchronous queries can be cancelled calling ``cancel()``.
672-
673-
::
674-
675-
query_id = cursor.execute(
676-
"""
677-
CREATE FACT TABLE IF NOT EXISTS test_table (
678-
id INT,
679-
name TEXT
680-
)
681-
PRIMARY INDEX id;
682-
""",
683-
async_execution=True
684-
)
685-
686-
cursor.cancel(query_id)
687-
688-
query_status = cursor.get_status(query_id)
689-
690-
print(query_status)
691-
692-
**Returns**: ``CANCELED_EXECUTION``
693-
694613

695614
Thread safety
696615
==============================

src/firebolt/async_db/cursor.py

Lines changed: 8 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import logging
4-
import re
54
import time
65
from abc import ABCMeta, abstractmethod
76
from functools import wraps
@@ -35,7 +34,6 @@
3534
UPDATE_PARAMETERS_HEADER,
3635
BaseCursor,
3736
CursorState,
38-
QueryStatus,
3937
Statistics,
4038
_parse_update_endpoint,
4139
_parse_update_parameters,
@@ -45,7 +43,6 @@
4543
)
4644
from firebolt.common.constants import ENGINE_STATUS_RUNNING_LIST
4745
from firebolt.utils.exception import (
48-
AsyncExecutionUnavailableError,
4946
EngineNotRunningError,
5047
FireboltDatabaseError,
5148
FireboltEngineError,
@@ -57,7 +54,7 @@
5754
if TYPE_CHECKING:
5855
from firebolt.async_db.connection import Connection
5956

60-
from firebolt.utils.util import Timer, _print_error_body
57+
from firebolt.utils.util import _print_error_body
6158

6259
logger = logging.getLogger(__name__)
6360

@@ -126,12 +123,6 @@ async def _raise_if_error(self, resp: Response) -> None:
126123
async def _validate_set_parameter(self, parameter: SetParameter) -> None:
127124
"""Validate parameter by executing simple query with it."""
128125
_raise_if_internal_set_parameter(parameter)
129-
if parameter.name == "async_execution":
130-
raise AsyncExecutionUnavailableError(
131-
"It is not possible to set async_execution using a SET command. "
132-
"Instead, pass it as an argument to the execute() or "
133-
"executemany() function."
134-
)
135126
resp = await self._api_request("select 1", {parameter.name: parameter.value})
136127
# Handle invalid set parameter
137128
if resp.status_code == codes.BAD_REQUEST:
@@ -170,7 +161,6 @@ async def _do_execute(
170161
raw_query: str,
171162
parameters: Sequence[Sequence[ParameterType]],
172163
skip_parsing: bool = False,
173-
async_execution: Optional[bool] = False,
174164
) -> None:
175165
self._reset()
176166
# Allow users to manually skip parsing for performance improvement.
@@ -180,13 +170,7 @@ async def _do_execute(
180170
try:
181171
for query in queries:
182172
start_time = time.time()
183-
# Our CREATE EXTERNAL TABLE queries currently require credentials,
184-
# so we will skip logging those queries.
185-
# https://docs.firebolt.io/sql-reference/commands/create-external-table.html
186-
if isinstance(query, SetParameter) or not re.search(
187-
"aws_key_id|credentials", query, flags=re.IGNORECASE
188-
):
189-
logger.debug(f"Running query: {query}")
173+
Cursor._log_query(query)
190174

191175
# Define type for mypy
192176
row_set: Tuple[
@@ -197,35 +181,6 @@ async def _do_execute(
197181
] = (-1, None, None, None)
198182
if isinstance(query, SetParameter):
199183
await self._validate_set_parameter(query)
200-
elif async_execution:
201-
self._validate_server_side_async_settings(
202-
parameters,
203-
queries,
204-
skip_parsing,
205-
async_execution,
206-
)
207-
208-
with Timer(
209-
f"[PERFORMANCE] Running query {query[:50]} "
210-
f"{'... ' if len(query) > 50 else ''}"
211-
):
212-
response = await self._api_request(
213-
query,
214-
{
215-
"async_execution": 1,
216-
"output_format": JSON_OUTPUT_FORMAT,
217-
},
218-
)
219-
220-
await self._raise_if_error(response)
221-
if response.headers.get("content-length", "") == "0":
222-
raise OperationalError("No response to asynchronous query.")
223-
resp = response.json()
224-
if "query_id" not in resp or resp["query_id"] == "":
225-
raise OperationalError(
226-
"Invalid response to asynchronous query: missing query_id."
227-
)
228-
self._query_id = resp["query_id"]
229184
else:
230185
resp = await self._api_request(
231186
query, {"output_format": JSON_OUTPUT_FORMAT}
@@ -253,7 +208,6 @@ async def execute(
253208
query: str,
254209
parameters: Optional[Sequence[ParameterType]] = None,
255210
skip_parsing: bool = False,
256-
async_execution: Optional[bool] = False,
257211
) -> Union[int, str]:
258212
"""Prepare and execute a database query.
259213
@@ -277,21 +231,19 @@ async def execute(
277231
skip_parsing (bool): Flag to disable query parsing. This will
278232
disable parameterized, multi-statement and SET queries,
279233
while improving performance
280-
async_execution (bool): flag to determine if query should be asynchronous
281234
282235
Returns:
283236
int: Query row count.
284237
"""
285238
params_list = [parameters] if parameters else []
286-
await self._do_execute(query, params_list, skip_parsing, async_execution)
287-
return self.query_id if async_execution else self.rowcount
239+
await self._do_execute(query, params_list, skip_parsing)
240+
return self.rowcount
288241

289242
@check_not_closed
290243
async def executemany(
291244
self,
292245
query: str,
293246
parameters_seq: Sequence[Sequence[ParameterType]],
294-
async_execution: Optional[bool] = False,
295247
) -> Union[int, str]:
296248
"""Prepare and execute a database query.
297249
@@ -316,46 +268,12 @@ async def executemany(
316268
substitution parameter sets. Used to replace '?' placeholders inside a
317269
query with actual values from each set in a sequence. Resulting queries
318270
for each subset are executed sequentially.
319-
async_execution (bool): flag to determine if query should be asynchronous
320271
321272
Returns:
322-
int|str: Query row count for synchronous execution of queries,
323-
query ID string for asynchronous execution.
273+
int: Query row count.
324274
"""
325-
await self._do_execute(query, parameters_seq, async_execution=async_execution)
326-
if async_execution:
327-
return self.query_id
328-
else:
329-
return self.rowcount
330-
331-
@check_not_closed
332-
async def get_status(self, query_id: str) -> QueryStatus:
333-
"""Get status of a server-side async query. Return the state of the query."""
334-
try:
335-
resp = await self._api_request(
336-
# output_format must be empty for status to work correctly.
337-
# And set parameters will cause 400 errors.
338-
parameters={"query_id": query_id},
339-
path="status",
340-
use_set_parameters=False,
341-
)
342-
if resp.status_code == codes.BAD_REQUEST:
343-
raise OperationalError(
344-
f"Asynchronous query {query_id} status check failed: "
345-
f"{resp.status_code}."
346-
)
347-
resp_json = resp.json()
348-
if "status" not in resp_json:
349-
raise OperationalError(
350-
"Invalid response to asynchronous query: missing status."
351-
)
352-
except Exception:
353-
self._state = CursorState.ERROR
354-
raise
355-
# Remember that query_id might be empty.
356-
if resp_json["status"] == "":
357-
return QueryStatus.NOT_READY
358-
return QueryStatus[resp_json["status"]]
275+
await self._do_execute(query, parameters_seq)
276+
return self.rowcount
359277

360278
@abstractmethod
361279
async def is_db_available(self, database: str) -> bool:
@@ -389,15 +307,6 @@ async def fetchall(self) -> List[List[ColType]]:
389307
async def nextset(self) -> None:
390308
return super().nextset()
391309

392-
@check_not_closed
393-
async def cancel(self, query_id: str) -> None:
394-
"""Cancel a server-side async query."""
395-
await self._api_request(
396-
parameters={"query_id": query_id},
397-
path="cancel",
398-
use_set_parameters=False,
399-
)
400-
401310
# Iteration support
402311
@check_not_closed
403312
@check_query_executed
@@ -557,7 +466,7 @@ def __init__(
557466
async def _api_request(
558467
self,
559468
query: Optional[str] = "",
560-
parameters: Optional[dict[str, Any]] = {},
469+
parameters: Optional[dict[str, Any]] = None,
561470
path: Optional[str] = "",
562471
use_set_parameters: Optional[bool] = True,
563472
) -> Response:

src/firebolt/common/base_cursor.py

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
from __future__ import annotations
22

33
import logging
4+
import re
45
from dataclasses import dataclass, fields
56
from enum import Enum
67
from functools import wraps
78
from types import TracebackType
8-
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
9+
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
910

1011
from httpx import URL, Response
1112

1213
from firebolt.common._types import (
1314
ColType,
1415
Column,
15-
ParameterType,
1616
RawColType,
1717
SetParameter,
1818
parse_type,
1919
parse_value,
2020
)
2121
from firebolt.utils.exception import (
22-
AsyncExecutionUnavailableError,
2322
ConfigurationError,
2423
CursorClosedError,
2524
DataError,
@@ -40,19 +39,6 @@ class CursorState(Enum):
4039
CLOSED = 4
4140

4241

43-
class QueryStatus(Enum):
44-
"""Enumeration of query responses on server-side async queries."""
45-
46-
RUNNING = 1
47-
ENDED_SUCCESSFULLY = 2
48-
ENDED_UNSUCCESSFULLY = 3
49-
NOT_READY = 4
50-
STARTED_EXECUTION = 5
51-
PARSE_ERROR = 6
52-
CANCELED_EXECUTION = 7
53-
EXECUTION_ERROR = 8
54-
55-
5642
# Parameters that should be set using USE instead of SET
5743
USE_PARAMETER_LIST = ["database", "engine"]
5844
# parameters that can only be set by the backend
@@ -325,6 +311,16 @@ def _update_server_parameters(self, parameters: Dict[str, Any]) -> None:
325311
for key, value in parameters.items():
326312
self.parameters[key] = value
327313

314+
@staticmethod
315+
def _log_query(query: Union[str, SetParameter]) -> None:
316+
# Our CREATE EXTERNAL TABLE queries currently require credentials,
317+
# so we will skip logging those queries.
318+
# https://docs.firebolt.io/sql-reference/commands/create-external-table.html
319+
if isinstance(query, SetParameter) or not re.search(
320+
"aws_key_id|credentials", query, flags=re.IGNORECASE
321+
):
322+
logger.debug(f"Running query: {query}")
323+
328324
@property
329325
def engine_name(self) -> str:
330326
"""
@@ -382,33 +378,6 @@ def _append_row_set(
382378
# Populate values for first set
383379
self._pop_next_set()
384380

385-
def _validate_server_side_async_settings(
386-
self,
387-
parameters: Sequence[Sequence[ParameterType]],
388-
queries: List[Union[SetParameter, str]],
389-
skip_parsing: bool = False,
390-
async_execution: Optional[bool] = False,
391-
) -> None:
392-
if async_execution and self._set_parameters.get("use_standard_sql", "1") == "0":
393-
raise AsyncExecutionUnavailableError(
394-
"It is not possible to execute queries asynchronously if "
395-
"use_standard_sql=0."
396-
)
397-
if parameters and skip_parsing:
398-
logger.warning(
399-
"Query formatting parameters are provided but skip_parsing "
400-
"is specified. They will be ignored."
401-
)
402-
non_set_queries = 0
403-
for query in queries:
404-
if type(query) is not SetParameter:
405-
non_set_queries += 1
406-
if non_set_queries > 1 and async_execution:
407-
raise AsyncExecutionUnavailableError(
408-
"It is not possible to execute multi-statement "
409-
"queries asynchronously."
410-
)
411-
412381
def _parse_row(self, row: List[RawColType]) -> List[ColType]:
413382
"""Parse a single data row based on query column types."""
414383
assert len(row) == len(self.description)

0 commit comments

Comments
 (0)