Skip to content

Commit 136bcd3

Browse files
committed
session attach tests
1 parent 4df6be8 commit 136bcd3

File tree

3 files changed

+49
-8
lines changed

3 files changed

+49
-8
lines changed

tests/query/test_query_session.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import pytest
2+
import threading
3+
import time
4+
from concurrent.futures import _base as b
5+
from unittest import mock
6+
27

38
from ydb.query.session import QuerySession
49

@@ -100,3 +105,38 @@ def test_two_results(self, session: QuerySession):
100105
res.append(list(result_set.rows[0].values()))
101106

102107
assert res == [[1], [2]]
108+
109+
def test_thread_leaks(self, session: QuerySession):
110+
session.create()
111+
thread_names = [t.name for t in threading.enumerate()]
112+
assert "first response attach stream thread" not in thread_names
113+
assert "attach stream thread" in thread_names
114+
115+
def test_first_resp_timeout(self, session: QuerySession):
116+
class FakeStream:
117+
def __iter__(self):
118+
return self
119+
120+
def __next__(self):
121+
time.sleep(10)
122+
return 1
123+
124+
def cancel(self):
125+
pass
126+
127+
fake_stream = mock.Mock(spec=FakeStream)
128+
129+
session._attach_call = mock.MagicMock(return_value=fake_stream)
130+
assert session._attach_call() == fake_stream
131+
132+
session._create_call()
133+
with pytest.raises(b.TimeoutError):
134+
session._attach(0.1)
135+
136+
fake_stream.cancel.assert_called()
137+
138+
thread_names = [t.name for t in threading.enumerate()]
139+
assert "first response attach stream thread" not in thread_names
140+
assert "attach stream thread" not in thread_names
141+
142+
_check_session_state_empty(session)

ydb/aio/query/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ async def _attach(self) -> None:
4747
async def get_first_response():
4848
first_response = await self._status_stream.next()
4949
if first_response.status != issues.StatusCode.SUCCESS:
50-
self._state.reset()
5150
raise RuntimeError("Failed to attach session")
5251

5352
try:
5453
await asyncio.wait_for(get_first_response(), DEFAULT_ATTACH_FIRST_RESP_TIMEOUT)
5554
except Exception as e:
55+
self._state.reset()
5656
self._status_stream.cancel()
5757
raise e
5858

ydb/query/session.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424

2525
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600
26+
DEFAULT_ATTACH_LONG_TIMEOUT = 31536000 # year
2627

2728

2829
class QuerySessionStateEnum(enum.Enum):
@@ -141,9 +142,9 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
141142
self._state = QuerySessionState(settings)
142143
self._attach_settings: BaseRequestSettings = (
143144
BaseRequestSettings()
144-
.with_operation_timeout(31536000) # year
145-
.with_cancel_after(31536000) # year
146-
.with_timeout(31536000) # year
145+
.with_operation_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
146+
.with_cancel_after(DEFAULT_ATTACH_LONG_TIMEOUT)
147+
.with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
147148
)
148149

149150
def _get_client_settings(
@@ -222,7 +223,7 @@ class QuerySession(BaseQuerySession):
222223

223224
_stream = None
224225

225-
def _attach(self) -> None:
226+
def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -> None:
226227
self._stream = self._attach_call()
227228
status_stream = _utilities.SyncResponseIterator(
228229
self._stream,
@@ -234,7 +235,6 @@ def _attach(self) -> None:
234235
def get_first_response(waiter):
235236
first_response = next(status_stream)
236237
if first_response.status != issues.StatusCode.SUCCESS:
237-
self._state.reset()
238238
raise RuntimeError("Failed to attach session")
239239
waiter.set_result(True)
240240

@@ -247,8 +247,9 @@ def get_first_response(waiter):
247247
thread.start()
248248

249249
try:
250-
waiter.result(timeout=DEFAULT_ATTACH_FIRST_RESP_TIMEOUT)
250+
waiter.result(timeout=first_resp_timeout)
251251
except Exception as e:
252+
self._state.reset()
252253
status_stream.cancel()
253254
raise e
254255

@@ -258,7 +259,7 @@ def get_first_response(waiter):
258259
threading.Thread(
259260
target=self._check_session_status_loop,
260261
args=(status_stream,),
261-
name="check session status thread",
262+
name="attach stream thread",
262263
daemon=True,
263264
).start()
264265

0 commit comments

Comments
 (0)