Skip to content

Commit 41caeac

Browse files
committed
Add missing ability to configure QueryClientSettings
1 parent a1a49da commit 41caeac

File tree

5 files changed

+130
-5
lines changed

5 files changed

+130
-5
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import ydb
2+
3+
4+
def writer_example(driver: ydb.Driver, topic: str):
5+
session_pool = ydb.QuerySessionPool(driver)
6+
7+
def callee(session: ydb.QuerySession):
8+
tx = session.transaction()
9+
tx.begin()
10+
11+
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(topic, tx=tx)
12+
# tx_writer: ydb.TopicWriter = driver.topic_client.writer(topic)
13+
14+
def prepare_messages(result_sets):
15+
# some messages based on query result
16+
return ["1"]
17+
18+
with tx.execute(query="select 1") as result_sets:
19+
messages = prepare_messages(result_sets)
20+
21+
tx_writer.write(messages) # wait only on commit
22+
# tx_writer.flush() -> maybe
23+
24+
tx.commit() # <----------------------
25+
26+
session_pool.retry_operation_sync(callee)
27+
28+
29+
# =============================================================================
30+
31+
32+
def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
33+
session_pool = ydb.QuerySessionPool(driver)
34+
35+
def callee(session: ydb.QuerySession):
36+
tx = session.transaction().begin()
37+
38+
batch = reader.receive_batch_with_tx(tx=tx)
39+
# this batch will be commited on tx commit
40+
# important to commit in correct order
41+
42+
def prepare_query(batch):
43+
# some query based on batch
44+
return "select 1"
45+
46+
with tx.execute(
47+
query=prepare_query(batch),
48+
commit_tx=True,
49+
) as result_sets:
50+
# handle results
51+
pass
52+
53+
session_pool.retry_operation_sync(callee)
54+
55+
# =============================================================================
56+
57+
def writer_example_update(driver: ydb.Driver, topic: str):
58+
session_pool = ydb.QuerySessionPool(driver)
59+
60+
def callee(tx: ydb.QueryTxContext):
61+
tx_writer: ydb.TopicWriter = driver.topic_client.tx_writer(topic, tx=tx)
62+
63+
def prepare_messages(result_sets):
64+
# some messages based on query result
65+
return ["1"]
66+
67+
with tx.execute(query="select 1") as result_sets:
68+
messages = prepare_messages(result_sets)
69+
70+
tx_writer.write(messages)
71+
72+
session_pool.retry_operation_tx(callee)
73+
74+
75+
# =============================================================================
76+
77+
78+
def reader_example_update(driver: ydb.Driver, reader: ydb.TopicReader):
79+
session_pool = ydb.QuerySessionPool(driver)
80+
81+
def callee(tx: ydb.QueryTxContext):
82+
batch = reader.receive_batch_with_tx(tx=tx)
83+
# this batch will be commited on tx commit
84+
85+
def prepare_query(batch):
86+
# some query based on patch
87+
return "select 1"
88+
89+
with tx.execute(
90+
query=prepare_query(batch)
91+
) as result_sets:
92+
# handle results
93+
pass
94+
95+
session_pool.retry_operation_tx(callee)

ydb/aio/query/pool.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
RetrySettings,
1414
retry_operation_async,
1515
)
16+
from ...query.base import QueryClientSettings
1617
from ... import convert
1718
from ..._grpc.grpcwrapper import common_utils
1819

@@ -22,10 +23,16 @@
2223
class QuerySessionPool:
2324
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
2425

25-
def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
26+
def __init__(
27+
self,
28+
driver: common_utils.SupportedDriverType,
29+
size: int = 100,
30+
query_client_settings: Optional[QueryClientSettings] = None,
31+
):
2632
"""
2733
:param driver: A driver instance
2834
:param size: Size of session pool
35+
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
2936
"""
3037

3138
self._driver = driver
@@ -35,9 +42,10 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3542
self._current_size = 0
3643
self._waiters = 0
3744
self._loop = asyncio.get_running_loop()
45+
self._query_client_settings = query_client_settings
3846

3947
async def _create_new_session(self):
40-
session = QuerySession(self._driver)
48+
session = QuerySession(self._driver, settings=self._query_client_settings)
4149
await session.create()
4250
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
4351
return session

ydb/driver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class DriverConfig(object):
8989
"secure_channel",
9090
"table_client_settings",
9191
"topic_client_settings",
92+
"query_client_settings",
9293
"endpoints",
9394
"primary_user_agent",
9495
"tracer",
@@ -112,6 +113,7 @@ def __init__(
112113
grpc_keep_alive_timeout=None,
113114
table_client_settings=None,
114115
topic_client_settings=None,
116+
query_client_settings=None,
115117
endpoints=None,
116118
primary_user_agent="python-library",
117119
tracer=None,
@@ -159,6 +161,7 @@ def __init__(
159161
self.grpc_keep_alive_timeout = grpc_keep_alive_timeout
160162
self.table_client_settings = table_client_settings
161163
self.topic_client_settings = topic_client_settings
164+
self.query_client_settings = query_client_settings
162165
self.primary_user_agent = primary_user_agent
163166
self.tracer = tracer if tracer is not None else tracing.Tracer(None)
164167
self.grpc_lb_policy_name = grpc_lb_policy_name

ydb/query/pool.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import threading
99
import queue
1010

11+
from .base import QueryClientSettings
1112
from .session import (
1213
QuerySession,
1314
)
@@ -27,10 +28,16 @@
2728
class QuerySessionPool:
2829
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
2930

30-
def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
31+
def __init__(
32+
self,
33+
driver: common_utils.SupportedDriverType,
34+
size: int = 100,
35+
query_client_settings: Optional[QueryClientSettings] = None,
36+
):
3137
"""
3238
:param driver: A driver instance.
3339
:param size: Max size of Session Pool.
40+
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
3441
"""
3542

3643
self._driver = driver
@@ -39,9 +46,10 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3946
self._size = size
4047
self._should_stop = threading.Event()
4148
self._lock = threading.RLock()
49+
self._query_client_settings = query_client_settings
4250

4351
def _create_new_session(self, timeout: Optional[float]):
44-
session = QuerySession(self._driver)
52+
session = QuerySession(self._driver, settings=self._query_client_settings)
4553
session.create(settings=BaseRequestSettings().with_timeout(timeout))
4654
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
4755
return session

ydb/query/session.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,20 @@ class BaseQuerySession:
134134

135135
def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
136136
self._driver = driver
137-
self._settings = settings if settings is not None else base.QueryClientSettings()
137+
self._settings = self._get_client_settings(driver, settings)
138138
self._state = QuerySessionState(settings)
139139

140+
def _get_client_settings(
141+
self,
142+
driver: common_utils.SupportedDriverType,
143+
settings: Optional[base.QueryClientSettings] = None,
144+
) -> base.QueryClientSettings:
145+
if settings is not None:
146+
return settings
147+
if driver._driver_config.query_client_settings is not None:
148+
return driver._driver_config.query_client_settings
149+
return base.QueryClientSettings()
150+
140151
def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
141152
return self._driver(
142153
_apis.ydb_query.CreateSessionRequest(),

0 commit comments

Comments
 (0)