Skip to content

Commit dcecd0e

Browse files
committed
QueryService stats support
1 parent 30b4191 commit dcecd0e

File tree

7 files changed

+121
-12
lines changed

7 files changed

+121
-12
lines changed

tests/query/test_query_session.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from concurrent.futures import _base as b
55
from unittest import mock
66

7-
7+
from ydb.query.base import QueryStatsMode
88
from ydb.query.session import QuerySession
99

1010

@@ -143,3 +143,34 @@ def cancel(self):
143143
assert "attach stream thread" not in thread_names
144144

145145
_check_session_state_empty(session)
146+
147+
@pytest.mark.parametrize(
148+
"stats_mode",
149+
[
150+
None,
151+
QueryStatsMode.UNSPECIFIED,
152+
QueryStatsMode.NONE,
153+
QueryStatsMode.BASIC,
154+
QueryStatsMode.FULL,
155+
QueryStatsMode.PROFILE,
156+
],
157+
)
158+
def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode):
159+
session.create()
160+
161+
for _ in session.execute("SELECT 1; SELECT 2; SELECT 3;", stats_mode=stats_mode):
162+
pass
163+
164+
stats = session.last_query_stats
165+
166+
if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]:
167+
assert stats is None
168+
return
169+
170+
assert stats is not None
171+
assert len(stats.query_phases) > 0
172+
173+
if stats_mode != QueryStatsMode.BASIC:
174+
assert len(stats.query_plan) > 0
175+
else:
176+
assert stats.query_plan == ""

tests/query/test_query_transaction.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22

3+
from ydb.query.base import QueryStatsMode
34
from ydb.query.transaction import QueryTxContext
45
from ydb.query.transaction import QueryTxStateEnum
56

@@ -104,3 +105,32 @@ def test_tx_identity_after_begin_works(self, tx: QueryTxContext):
104105

105106
assert identity.tx_id == tx.tx_id
106107
assert identity.session_id == tx.session_id
108+
109+
@pytest.mark.parametrize(
110+
"stats_mode",
111+
[
112+
None,
113+
QueryStatsMode.UNSPECIFIED,
114+
QueryStatsMode.NONE,
115+
QueryStatsMode.BASIC,
116+
QueryStatsMode.FULL,
117+
QueryStatsMode.PROFILE,
118+
],
119+
)
120+
def test_stats_mode(self, tx: QueryTxContext, stats_mode: QueryStatsMode):
121+
for _ in tx.execute("SELECT 1; SELECT 2; SELECT 3;", commit_tx=True, stats_mode=stats_mode):
122+
pass
123+
124+
stats = tx.last_query_stats
125+
126+
if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]:
127+
assert stats is None
128+
return
129+
130+
assert stats is not None
131+
assert len(stats.query_phases) > 0
132+
133+
if stats_mode != QueryStatsMode.BASIC:
134+
assert len(stats.query_plan) > 0
135+
else:
136+
assert stats.query_plan == ""

ydb/aio/query/session.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ async def execute(
115115
parameters: dict = None,
116116
syntax: base.QuerySyntax = None,
117117
exec_mode: base.QueryExecMode = None,
118+
stats_mode: Optional[base.QueryStatsMode] = None,
118119
concurrent_result_sets: bool = False,
119120
settings: Optional[BaseRequestSettings] = None,
120121
) -> AsyncResponseContextIterator:
@@ -133,10 +134,11 @@ async def execute(
133134

134135
stream_it = await self._execute_call(
135136
query=query,
137+
parameters=parameters,
136138
commit_tx=True,
137139
syntax=syntax,
138140
exec_mode=exec_mode,
139-
parameters=parameters,
141+
stats_mode=stats_mode,
140142
concurrent_result_sets=concurrent_result_sets,
141143
settings=settings,
142144
)
@@ -147,6 +149,7 @@ async def execute(
147149
rpc_state=None,
148150
response_pb=resp,
149151
session_state=self._state,
152+
session=self,
150153
settings=self._settings,
151154
),
152155
)

ydb/aio/query/transaction.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ async def execute(
140140
commit_tx: Optional[bool] = False,
141141
syntax: Optional[base.QuerySyntax] = None,
142142
exec_mode: Optional[base.QueryExecMode] = None,
143+
stats_mode: Optional[base.QueryStatsMode] = None,
143144
concurrent_result_sets: Optional[bool] = False,
144145
settings: Optional[BaseRequestSettings] = None,
145146
) -> AsyncResponseContextIterator:
@@ -156,6 +157,11 @@ async def execute(
156157
2) QueryExecMode.EXPLAIN;
157158
3) QueryExecMode.VALIDATE;
158159
4) QueryExecMode.PARSE.
160+
:param stats_mode: Mode of query statistics to gather, which is a one from the following choises:
161+
1) QueryStatsMode:NONE, which is default;
162+
2) QueryStatsMode.BASIC;
163+
3) QueryStatsMode.FULL;
164+
4) QueryStatsMode.PROFILE;
159165
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
160166
161167
:return: Iterator with result sets
@@ -164,10 +170,11 @@ async def execute(
164170

165171
stream_it = await self._execute_call(
166172
query=query,
173+
parameters=parameters,
167174
commit_tx=commit_tx,
168175
syntax=syntax,
169176
exec_mode=exec_mode,
170-
parameters=parameters,
177+
stats_mode=stats_mode,
171178
concurrent_result_sets=concurrent_result_sets,
172179
settings=settings,
173180
)

ydb/query/base.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
if typing.TYPE_CHECKING:
2727
from .transaction import BaseQueryTxContext
28+
from .session import BaseQuerySession
2829

2930

3031
class QuerySyntax(enum.IntEnum):
@@ -41,7 +42,7 @@ class QueryExecMode(enum.IntEnum):
4142
EXECUTE = 50
4243

4344

44-
class StatsMode(enum.IntEnum):
45+
class QueryStatsMode(enum.IntEnum):
4546
UNSPECIFIED = 0
4647
NONE = 10
4748
BASIC = 20
@@ -132,12 +133,13 @@ def create_execute_query_request(
132133
tx_mode: Optional[BaseQueryTxMode],
133134
syntax: Optional[QuerySyntax],
134135
exec_mode: Optional[QueryExecMode],
136+
stats_mode: Optional[QueryStatsMode],
135137
parameters: Optional[dict],
136138
concurrent_result_sets: Optional[bool],
137139
) -> ydb_query.ExecuteQueryRequest:
138140
syntax = QuerySyntax.YQL_V1 if not syntax else syntax
139141
exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
140-
stats_mode = StatsMode.NONE # TODO: choise is not supported yet
142+
stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode
141143

142144
tx_control = None
143145
if not tx_id and not tx_mode:
@@ -189,6 +191,7 @@ def wrap_execute_query_response(
189191
response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
190192
session_state: IQuerySessionState,
191193
tx: Optional["BaseQueryTxContext"] = None,
194+
session: Optional["BaseQuerySession"] = None,
192195
commit_tx: Optional[bool] = False,
193196
settings: Optional[QueryClientSettings] = None,
194197
) -> convert.ResultSet:
@@ -198,6 +201,12 @@ def wrap_execute_query_response(
198201
elif tx and response_pb.tx_meta and not tx.tx_id:
199202
tx._move_to_beginned(response_pb.tx_meta.id)
200203

204+
if response_pb.HasField("exec_stats"):
205+
if tx is not None:
206+
tx._last_query_stats = response_pb.exec_stats
207+
if session is not None:
208+
session._last_query_stats = response_pb.exec_stats
209+
201210
if response_pb.HasField("result_set"):
202211
return convert.ResultSet.from_message(response_pb.result_set, settings)
203212

ydb/query/session.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
147147
.with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
148148
)
149149

150+
self._last_query_stats = None
151+
152+
@property
153+
def last_query_stats(self):
154+
return self._last_query_stats
155+
150156
def _get_client_settings(
151157
self,
152158
driver: common_utils.SupportedDriverType,
@@ -189,22 +195,26 @@ def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
189195
def _execute_call(
190196
self,
191197
query: str,
198+
parameters: dict = None,
192199
commit_tx: bool = False,
193200
syntax: base.QuerySyntax = None,
194201
exec_mode: base.QueryExecMode = None,
195-
parameters: dict = None,
202+
stats_mode: Optional[base.QueryStatsMode] = None,
196203
concurrent_result_sets: bool = False,
197204
settings: Optional[BaseRequestSettings] = None,
198205
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
206+
self._last_query_stats = None
207+
199208
request = base.create_execute_query_request(
200209
query=query,
201-
session_id=self._state.session_id,
210+
parameters=parameters,
202211
commit_tx=commit_tx,
212+
session_id=self._state.session_id,
203213
tx_mode=None,
204214
tx_id=None,
205215
syntax=syntax,
206216
exec_mode=exec_mode,
207-
parameters=parameters,
217+
stats_mode=stats_mode,
208218
concurrent_result_sets=concurrent_result_sets,
209219
)
210220

@@ -320,6 +330,7 @@ def execute(
320330
syntax: base.QuerySyntax = None,
321331
exec_mode: base.QueryExecMode = None,
322332
concurrent_result_sets: bool = False,
333+
stats_mode: Optional[base.QueryStatsMode] = None,
323334
settings: Optional[BaseRequestSettings] = None,
324335
) -> base.SyncResponseContextIterator:
325336
"""Sends a query to Query Service
@@ -337,10 +348,11 @@ def execute(
337348

338349
stream_it = self._execute_call(
339350
query=query,
351+
parameters=parameters,
340352
commit_tx=True,
341353
syntax=syntax,
342354
exec_mode=exec_mode,
343-
parameters=parameters,
355+
stats_mode=stats_mode,
344356
concurrent_result_sets=concurrent_result_sets,
345357
settings=settings,
346358
)
@@ -351,6 +363,7 @@ def execute(
351363
rpc_state=None,
352364
response_pb=resp,
353365
session_state=self._state,
366+
session=self,
354367
settings=self._settings,
355368
),
356369
)

ydb/query/transaction.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ def __init__(self, driver, session_state, session, tx_mode):
210210
self.session = session
211211
self._prev_stream = None
212212
self._external_error = None
213+
self._last_query_stats = None
213214

214215
@property
215216
def session_id(self) -> str:
@@ -229,6 +230,10 @@ def tx_id(self) -> Optional[str]:
229230
"""
230231
return self._tx_state.tx_id
231232

233+
@property
234+
def last_query_stats(self):
235+
return self._last_query_stats
236+
232237
def _tx_identity(self) -> _ydb_topic.TransactionIdentity:
233238
if not self.tx_id:
234239
raise RuntimeError("Unable to get tx identity without started tx.")
@@ -283,25 +288,29 @@ def _rollback_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryT
283288
def _execute_call(
284289
self,
285290
query: str,
291+
parameters: Optional[dict],
286292
commit_tx: Optional[bool],
287293
syntax: Optional[base.QuerySyntax],
288294
exec_mode: Optional[base.QueryExecMode],
289-
parameters: Optional[dict],
295+
stats_mode: Optional[base.QueryStatsMode],
290296
concurrent_result_sets: Optional[bool],
291297
settings: Optional[BaseRequestSettings],
292298
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
293299
self._tx_state._check_tx_ready_to_use()
294300
self._check_external_error_set()
295301

302+
self._last_query_stats = None
303+
296304
request = base.create_execute_query_request(
297305
query=query,
298-
session_id=self._session_state.session_id,
306+
parameters=parameters,
299307
commit_tx=commit_tx,
308+
session_id=self._session_state.session_id,
300309
tx_id=self._tx_state.tx_id,
301310
tx_mode=self._tx_state.tx_mode,
302311
syntax=syntax,
303312
exec_mode=exec_mode,
304-
parameters=parameters,
313+
stats_mode=stats_mode,
305314
concurrent_result_sets=concurrent_result_sets,
306315
)
307316

@@ -449,6 +458,7 @@ def execute(
449458
commit_tx: Optional[bool] = False,
450459
syntax: Optional[base.QuerySyntax] = None,
451460
exec_mode: Optional[base.QueryExecMode] = None,
461+
stats_mode: Optional[base.QueryStatsMode] = None,
452462
concurrent_result_sets: Optional[bool] = False,
453463
settings: Optional[BaseRequestSettings] = None,
454464
) -> base.SyncResponseContextIterator:
@@ -465,6 +475,11 @@ def execute(
465475
2) QueryExecMode.EXPLAIN;
466476
3) QueryExecMode.VALIDATE;
467477
4) QueryExecMode.PARSE.
478+
:param stats_mode: Mode of query statistics to gather, which is a one from the following choises:
479+
1) QueryStatsMode:NONE, which is default;
480+
2) QueryStatsMode.BASIC;
481+
3) QueryStatsMode.FULL;
482+
4) QueryStatsMode.PROFILE;
468483
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
469484
:param settings: An additional request settings BaseRequestSettings;
470485
@@ -477,6 +492,7 @@ def execute(
477492
commit_tx=commit_tx,
478493
syntax=syntax,
479494
exec_mode=exec_mode,
495+
stats_mode=stats_mode,
480496
parameters=parameters,
481497
concurrent_result_sets=concurrent_result_sets,
482498
settings=settings,

0 commit comments

Comments
 (0)