Skip to content

Commit 7cbbb99

Browse files
committed
Fix attach timeouts
1 parent 2872be4 commit 7cbbb99

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

ydb/aio/query/session.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from ...query import base
1616
from ...query.session import (
1717
BaseQuerySession,
18+
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
1819
QuerySessionStateEnum,
1920
)
2021

@@ -43,9 +44,17 @@ async def _attach(self) -> None:
4344
lambda response: common_utils.ServerStatus.from_proto(response),
4445
)
4546

46-
first_response = await self._status_stream.next()
47-
if first_response.status != issues.StatusCode.SUCCESS:
48-
pass
47+
async def get_first_response():
48+
first_response = await self._status_stream.next()
49+
if first_response.status != issues.StatusCode.SUCCESS:
50+
self._state.reset()
51+
raise RuntimeError("Failed to attach session")
52+
53+
try:
54+
await asyncio.wait_for(get_first_response(), DEFAULT_ATTACH_FIRST_RESP_TIMEOUT)
55+
except Exception as e:
56+
self._status_stream.cancel()
57+
raise e
4958

5059
self._state.set_attached(True)
5160
self._state._change_state(QuerySessionStateEnum.CREATED)

ydb/query/session.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
logger = logging.getLogger(__name__)
2323

2424

25+
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600
26+
27+
2528
class QuerySessionStateEnum(enum.Enum):
2629
NOT_INITIALIZED = "NOT_INITIALIZED"
2730
CREATED = "CREATED"
@@ -136,6 +139,12 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
136139
self._driver = driver
137140
self._settings = self._get_client_settings(driver, settings)
138141
self._state = QuerySessionState(settings)
142+
self._attach_settings: BaseRequestSettings = (
143+
BaseRequestSettings()
144+
.with_operation_timeout(31536000) # year
145+
.with_cancel_after(31536000) # year
146+
.with_timeout(31536000) # year
147+
)
139148

140149
def _get_client_settings(
141150
self,
@@ -168,12 +177,12 @@ def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQ
168177
settings=settings,
169178
)
170179

171-
def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]:
180+
def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
172181
return self._driver(
173182
_apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
174183
_apis.QueryService.Stub,
175184
_apis.QueryService.AttachSession,
176-
settings=settings,
185+
settings=self._attach_settings,
177186
)
178187

179188
def _execute_call(
@@ -213,16 +222,36 @@ class QuerySession(BaseQuerySession):
213222

214223
_stream = None
215224

216-
def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None:
217-
self._stream = self._attach_call(settings=settings)
225+
def _attach(self) -> None:
226+
self._stream = self._attach_call()
218227
status_stream = _utilities.SyncResponseIterator(
219228
self._stream,
220229
lambda response: common_utils.ServerStatus.from_proto(response),
221230
)
222231

223-
first_response = next(status_stream)
224-
if first_response.status != issues.StatusCode.SUCCESS:
225-
pass
232+
waiter = _utilities.future()
233+
234+
def get_first_response(waiter):
235+
first_response = next(status_stream)
236+
if first_response.status != issues.StatusCode.SUCCESS:
237+
self._state.reset()
238+
raise RuntimeError("Failed to attach session")
239+
waiter.set_result(True)
240+
241+
thread = threading.Thread(
242+
target=get_first_response,
243+
args=(waiter,),
244+
name="first response attach stream thread",
245+
daemon=True,
246+
)
247+
thread.start()
248+
249+
try:
250+
waiter.result(timeout=DEFAULT_ATTACH_FIRST_RESP_TIMEOUT)
251+
except Exception as e:
252+
status_stream.cancel()
253+
raise e
254+
226255

227256
self._state.set_attached(True)
228257
self._state._change_state(QuerySessionStateEnum.CREATED)

0 commit comments

Comments
 (0)