|
22 | 22 | logger = logging.getLogger(__name__) |
23 | 23 |
|
24 | 24 |
|
| 25 | +DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600 |
| 26 | + |
| 27 | + |
25 | 28 | class QuerySessionStateEnum(enum.Enum): |
26 | 29 | NOT_INITIALIZED = "NOT_INITIALIZED" |
27 | 30 | CREATED = "CREATED" |
@@ -136,6 +139,12 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[ |
136 | 139 | self._driver = driver |
137 | 140 | self._settings = self._get_client_settings(driver, settings) |
138 | 141 | self._state = QuerySessionState(settings) |
| 142 | + self._attach_settings: BaseRequestSettings = ( |
| 143 | + BaseRequestSettings() |
| 144 | + .with_operation_timeout(31536000) # year |
| 145 | + .with_cancel_after(31536000) # year |
| 146 | + .with_timeout(31536000) # year |
| 147 | + ) |
139 | 148 |
|
140 | 149 | def _get_client_settings( |
141 | 150 | self, |
@@ -168,12 +177,12 @@ def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQ |
168 | 177 | settings=settings, |
169 | 178 | ) |
170 | 179 |
|
171 | | - def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]: |
| 180 | + def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]: |
172 | 181 | return self._driver( |
173 | 182 | _apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id), |
174 | 183 | _apis.QueryService.Stub, |
175 | 184 | _apis.QueryService.AttachSession, |
176 | | - settings=settings, |
| 185 | + settings=self._attach_settings, |
177 | 186 | ) |
178 | 187 |
|
179 | 188 | def _execute_call( |
@@ -213,16 +222,35 @@ class QuerySession(BaseQuerySession): |
213 | 222 |
|
214 | 223 | _stream = None |
215 | 224 |
|
216 | | - def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None: |
217 | | - self._stream = self._attach_call(settings=settings) |
| 225 | + def _attach(self) -> None: |
| 226 | + self._stream = self._attach_call() |
218 | 227 | status_stream = _utilities.SyncResponseIterator( |
219 | 228 | self._stream, |
220 | 229 | lambda response: common_utils.ServerStatus.from_proto(response), |
221 | 230 | ) |
222 | 231 |
|
223 | | - first_response = next(status_stream) |
224 | | - if first_response.status != issues.StatusCode.SUCCESS: |
225 | | - pass |
| 232 | + waiter = _utilities.future() |
| 233 | + |
| 234 | + def get_first_response(waiter): |
| 235 | + first_response = next(status_stream) |
| 236 | + if first_response.status != issues.StatusCode.SUCCESS: |
| 237 | + self._state.reset() |
| 238 | + raise RuntimeError("Failed to attach session") |
| 239 | + waiter.set_result(True) |
| 240 | + |
| 241 | + thread = threading.Thread( |
| 242 | + target=get_first_response, |
| 243 | + args=(waiter,), |
| 244 | + name="first response attach stream thread", |
| 245 | + daemon=True, |
| 246 | + ) |
| 247 | + thread.start() |
| 248 | + |
| 249 | + try: |
| 250 | + waiter.result(timeout=DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) |
| 251 | + except Exception as e: |
| 252 | + status_stream.cancel() |
| 253 | + raise e |
226 | 254 |
|
227 | 255 | self._state.set_attached(True) |
228 | 256 | self._state._change_state(QuerySessionStateEnum.CREATED) |
|
0 commit comments