Skip to content

Commit 9e71007

Browse files
authored
feat(FIR-44268): Server-side parametrised queries (#448)
1 parent f2a0832 commit 9e71007

File tree

18 files changed

+1445
-39
lines changed

18 files changed

+1445
-39
lines changed

src/firebolt/async_db/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Timestamp,
1919
TimestampFromTicks,
2020
)
21+
from firebolt.common.constants import ParameterStyle
2122
from firebolt.utils.exception import (
2223
DatabaseError,
2324
DataError,
@@ -34,4 +35,10 @@
3435
apilevel = "2.0"
3536
# threads may only share the module and connections, cursors should not be shared
3637
threadsafety = 2
37-
paramstyle = "qmark"
38+
paramstyle = ParameterStyle.QMARK.value
39+
"""
40+
The parameter style for SQL queries. Supported values:
41+
- 'qmark': Use ? as parameter placeholders (default, client-side substitution)
42+
- 'native': Alias for 'qmark'
43+
- 'fb_numeric': Use $1, $2, ... as placeholders (server-side, sent as query_parameters)
44+
"""

src/firebolt/async_db/connection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,13 @@ async def connect_v1(
396396
timeout=Timeout(DEFAULT_TIMEOUT_SECONDS, read=None),
397397
headers={"User-Agent": user_agent_header},
398398
)
399-
return Connection(engine_url, database, client, CursorV1, api_endpoint)
399+
return Connection(
400+
engine_url,
401+
database,
402+
client,
403+
CursorV1,
404+
api_endpoint,
405+
)
400406

401407

402408
def connect_core(

src/firebolt/async_db/cursor.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
UPDATE_ENDPOINT_HEADER,
2626
UPDATE_PARAMETERS_HEADER,
2727
CursorState,
28+
ParameterStyle,
2829
)
2930
from firebolt.common.cursor.base_cursor import (
3031
BaseCursor,
@@ -216,27 +217,65 @@ async def _do_execute(
216217
) -> None:
217218
await self._close_rowset_and_reset()
218219
self._row_set = StreamingAsyncRowSet() if streaming else InMemoryAsyncRowSet()
219-
queries: List[Union[SetParameter, str]] = (
220-
[raw_query]
221-
if skip_parsing
222-
else self._formatter.split_format_sql(raw_query, parameters)
223-
)
224-
timeout_controller = TimeoutController(timeout)
220+
# Import paramstyle from module level
221+
from firebolt.async_db import paramstyle
225222

226-
if len(queries) > 1 and async_execution:
227-
raise FireboltError(
228-
"Server side async does not support multi-statement queries"
229-
)
230223
try:
231-
for query in queries:
232-
await self._execute_single_query(
233-
query, timeout_controller, async_execution, streaming
224+
parameter_style = ParameterStyle(paramstyle)
225+
except ValueError:
226+
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")
227+
try:
228+
if parameter_style == ParameterStyle.FB_NUMERIC:
229+
await self._execute_fb_numeric(
230+
raw_query, parameters, timeout, async_execution, streaming
234231
)
232+
else:
233+
queries: List[Union[SetParameter, str]] = (
234+
[raw_query]
235+
if skip_parsing
236+
else self._formatter.split_format_sql(raw_query, parameters)
237+
)
238+
timeout_controller = TimeoutController(timeout)
239+
if len(queries) > 1 and async_execution:
240+
raise FireboltError(
241+
"Server side async does not support multi-statement queries"
242+
)
243+
for query in queries:
244+
await self._execute_single_query(
245+
query, timeout_controller, async_execution, streaming
246+
)
235247
self._state = CursorState.DONE
236248
except Exception:
237249
self._state = CursorState.ERROR
238250
raise
239251

252+
async def _execute_fb_numeric(
253+
self,
254+
query: str,
255+
parameters: Sequence[Sequence[ParameterType]],
256+
timeout: Optional[float],
257+
async_execution: bool,
258+
streaming: bool,
259+
) -> None:
260+
Cursor._log_query(query)
261+
timeout_controller = TimeoutController(timeout)
262+
timeout_controller.raise_if_timeout()
263+
query_params = self._build_fb_numeric_query_params(
264+
parameters, streaming, async_execution
265+
)
266+
resp = await self._api_request(
267+
query,
268+
query_params,
269+
timeout=timeout_controller.remaining(),
270+
)
271+
await self._raise_if_error(resp)
272+
if async_execution:
273+
await resp.aread()
274+
self._parse_async_response(resp)
275+
else:
276+
await self._parse_response_headers(resp.headers)
277+
await self._append_row_set_from_response(resp)
278+
240279
async def _execute_single_query(
241280
self,
242281
query: Union[SetParameter, str],

src/firebolt/common/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ class CursorState(Enum):
1919
CLOSED = 4
2020

2121

22+
class ParameterStyle(Enum):
23+
QMARK = "qmark" # ? as parameter placeholders (default, client-side)
24+
NATIVE = "native" # Alias for 'qmark'
25+
FB_NUMERIC = "fb_numeric" # $1, $2, ... as placeholders (server-side)
26+
27+
2228
# Parameters that should be set using USE instead of SET
2329
USE_PARAMETER_LIST = ["database", "engine"]
2430
# parameters that can only be set by the backend

src/firebolt/common/cursor/base_cursor.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from __future__ import annotations
22

3+
import json
34
import logging
45
import re
56
from types import TracebackType
6-
from typing import Any, Dict, List, Optional, Tuple, Type, Union
7+
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union
78

89
from httpx import URL, Response
910

10-
from firebolt.common._types import RawColType, SetParameter
11+
from firebolt.common._types import ParameterType, RawColType, SetParameter
1112
from firebolt.common.constants import (
1213
DISALLOWED_PARAMETER_LIST,
1314
IMMUTABLE_PARAMETER_LIST,
@@ -226,6 +227,42 @@ def _log_query(query: Union[str, SetParameter]) -> None:
226227
):
227228
logger.debug(f"Running query: {query}")
228229

230+
def _build_fb_numeric_query_params(
231+
self,
232+
parameters: Sequence[Sequence[ParameterType]],
233+
streaming: bool,
234+
async_execution: bool,
235+
) -> Dict[str, Any]:
236+
"""
237+
Build query parameters dictionary for fb_numeric paramstyle.
238+
239+
Args:
240+
parameters: A sequence of parameter sequences. For fb_numeric,
241+
only the first parameter sequence is used.
242+
streaming: Whether streaming is enabled
243+
async_execution: Whether async execution is enabled
244+
245+
Returns:
246+
Dictionary of query parameters to send with the request
247+
"""
248+
param_list = parameters[0] if parameters else []
249+
query_parameters = [
250+
{
251+
"name": f"${i+1}",
252+
"value": self._formatter.convert_parameter_for_serialization(value),
253+
}
254+
for i, value in enumerate(param_list)
255+
]
256+
257+
query_params: Dict[str, Any] = {
258+
"output_format": self._get_output_format(streaming),
259+
}
260+
if query_parameters:
261+
query_params["query_parameters"] = json.dumps(query_parameters)
262+
if async_execution:
263+
query_params["async"] = True
264+
return query_params
265+
229266
@property
230267
def engine_name(self) -> str:
231268
"""

src/firebolt/common/statement_formatter.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,34 @@ def format_value(self, value: ParameterType) -> str:
6262

6363
raise DataError(f"unsupported parameter type {type(value)}")
6464

65+
def convert_parameter_for_serialization(
66+
self, value: ParameterType
67+
) -> Union[int, float, bool, None, str, List]:
68+
"""
69+
Convert parameter values for fb_numeric paramstyle to JSON-serializable
70+
format. This is used for server-side parameter substitution.
71+
72+
Basic types (int, float, bool, None) are preserved as-is.
73+
All other types are converted to strings for JSON serialization.
74+
75+
Args:
76+
value: The parameter value to convert
77+
78+
Returns:
79+
JSON-serializable value (int, float, bool, None, or string)
80+
"""
81+
if isinstance(value, (int, float, bool)) or value is None:
82+
return value
83+
84+
if isinstance(value, Decimal):
85+
return str(value)
86+
elif isinstance(value, bytes):
87+
return value.decode("utf-8")
88+
elif isinstance(value, list):
89+
return [self.convert_parameter_for_serialization(item) for item in value]
90+
else:
91+
return str(value)
92+
6593
def format_statement(
6694
self, statement: Statement, parameters: Sequence[ParameterType]
6795
) -> str:

src/firebolt/db/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Timestamp,
1717
TimestampFromTicks,
1818
)
19+
from firebolt.common.constants import ParameterStyle
1920
from firebolt.db.connection import Connection, connect
2021
from firebolt.db.cursor import Cursor
2122
from firebolt.utils.exception import (
@@ -34,4 +35,10 @@
3435
apilevel = "2.0"
3536
# threads may only share the module and connections, cursors should not be shared
3637
threadsafety = 2
37-
paramstyle = "qmark"
38+
paramstyle = ParameterStyle.QMARK.value
39+
"""
40+
The parameter style for SQL queries. Supported values:
41+
- 'qmark': Use ? as parameter placeholders (default, client-side substitution)
42+
- 'native': Alias for 'qmark'
43+
- 'fb_numeric': Use $1, $2, ... as placeholders (server-side, sent as query_parameters)
44+
"""

src/firebolt/db/connection.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ def __init__(
224224
def cursor(self, **kwargs: Any) -> Cursor:
225225
if self.closed:
226226
raise ConnectionClosedError("Unable to create cursor: connection closed.")
227-
228227
c = self.cursor_type(client=self._client, connection=self, **kwargs)
229228
self._cursors.append(c)
230229
return c

src/firebolt/db/cursor.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
UPDATE_ENDPOINT_HEADER,
3434
UPDATE_PARAMETERS_HEADER,
3535
CursorState,
36+
ParameterStyle,
3637
)
3738
from firebolt.common.cursor.base_cursor import (
3839
BaseCursor,
@@ -222,27 +223,65 @@ def _do_execute(
222223
) -> None:
223224
self._close_rowset_and_reset()
224225
self._row_set = StreamingRowSet() if streaming else InMemoryRowSet()
225-
queries: List[Union[SetParameter, str]] = (
226-
[raw_query]
227-
if skip_parsing
228-
else self._formatter.split_format_sql(raw_query, parameters)
229-
)
230-
timeout_controller = TimeoutController(timeout)
226+
# Import paramstyle from module level
227+
from firebolt.db import paramstyle
231228

232-
if len(queries) > 1 and async_execution:
233-
raise FireboltError(
234-
"Server side async does not support multi-statement queries"
235-
)
236229
try:
237-
for query in queries:
238-
self._execute_single_query(
239-
query, timeout_controller, async_execution, streaming
230+
parameter_style = ParameterStyle(paramstyle)
231+
except ValueError:
232+
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")
233+
try:
234+
if parameter_style == ParameterStyle.FB_NUMERIC:
235+
self._execute_fb_numeric(
236+
raw_query, parameters, timeout, async_execution, streaming
240237
)
238+
else:
239+
queries: List[Union[SetParameter, str]] = (
240+
[raw_query]
241+
if skip_parsing
242+
else self._formatter.split_format_sql(raw_query, parameters)
243+
)
244+
timeout_controller = TimeoutController(timeout)
245+
if len(queries) > 1 and async_execution:
246+
raise FireboltError(
247+
"Server side async does not support multi-statement queries"
248+
)
249+
for query in queries:
250+
self._execute_single_query(
251+
query, timeout_controller, async_execution, streaming
252+
)
241253
self._state = CursorState.DONE
242254
except Exception:
243255
self._state = CursorState.ERROR
244256
raise
245257

258+
def _execute_fb_numeric(
259+
self,
260+
query: str,
261+
parameters: Sequence[Sequence[ParameterType]],
262+
timeout: Optional[float],
263+
async_execution: bool,
264+
streaming: bool,
265+
) -> None:
266+
Cursor._log_query(query)
267+
timeout_controller = TimeoutController(timeout)
268+
timeout_controller.raise_if_timeout()
269+
query_params = self._build_fb_numeric_query_params(
270+
parameters, streaming, async_execution
271+
)
272+
resp = self._api_request(
273+
query,
274+
query_params,
275+
timeout=timeout_controller.remaining(),
276+
)
277+
self._raise_if_error(resp)
278+
if async_execution:
279+
resp.read()
280+
self._parse_async_response(resp)
281+
else:
282+
self._parse_response_headers(resp.headers)
283+
self._append_row_set_from_response(resp)
284+
246285
def _execute_single_query(
247286
self,
248287
query: Union[SetParameter, str],

tests/integration/dbapi/async/V2/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from pytest import fixture
66

7+
import firebolt.async_db
78
from firebolt.async_db import Connection, connect
89
from firebolt.client.auth.base import Auth
910
from firebolt.client.auth.client_credentials import ClientCredentials
@@ -164,3 +165,12 @@ async def mixed_case_db_and_engine(
164165
await system_cursor.execute(f'DROP DATABASE "{test_db_name}"')
165166
await system_cursor.execute(f'STOP ENGINE "{test_engine_name}"')
166167
await system_cursor.execute(f'DROP ENGINE "{test_engine_name}"')
168+
169+
170+
@fixture
171+
def fb_numeric_paramstyle():
172+
"""Fixture that sets paramstyle to fb_numeric and resets it after the test."""
173+
original_paramstyle = firebolt.async_db.paramstyle
174+
firebolt.async_db.paramstyle = "fb_numeric"
175+
yield
176+
firebolt.async_db.paramstyle = original_paramstyle

0 commit comments

Comments
 (0)