Skip to content

Commit f238d0b

Browse files
committed
last fixes + style fixing
1 parent df0b81e commit f238d0b

File tree

3 files changed

+5
-9
lines changed

3 files changed

+5
-9
lines changed

tests/coordination/test_coordination_client.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ async def second_lock_task():
217217

218218
await asyncio.sleep(0)
219219

220-
221220
await asyncio.wait_for(lock2_acquired.wait(), timeout=timeout)
222221

223222
lock2_release.set()
@@ -275,14 +274,11 @@ async def test_coordination_reconnect_async(self, async_coordination_node):
275274

276275
lock = client.lock("test_lock", node_path)
277276

278-
# create semaphore
279277
res = await lock.create(init_limit=1, init_data=b"")
280278
assert res.status == StatusCode.SUCCESS
281279

282-
# break connection (simulate network drop)
283280
await lock._reconnector._stream.close()
284281

285-
# next call must succeed after reconnect
286282
desc = await lock.describe()
287283
assert desc.status == StatusCode.SUCCESS
288284
assert desc.name == "test_lock"

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
220220
self._stream_call = stream_call
221221
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
222222

223-
async def receive(self, timeout: Optional[int] = None) -> Any:
223+
async def receive(self, timeout: Optional[int] = None, is_coordination_calls=False) -> Any:
224224
# todo handle grpc exceptions and convert it to internal exceptions
225225
try:
226226
if timeout is None:
@@ -235,8 +235,8 @@ async def get_response():
235235
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
236236
raise connection._rpc_error_handler(self._connection_state, e)
237237

238-
# coordination grpc calls dont have status field
239-
# issues._process_response(grpc_message)
238+
if not is_coordination_calls:
239+
issues._process_response(grpc_message)
240240

241241
if self._connection_state != "has_received_messages":
242242
self._connection_state = "has_received_messages"

ydb/aio/coordination/stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async def start_session(self, path: str, timeout_millis: int):
3434
try:
3535
while True:
3636
try:
37-
resp = await self._stream.receive(timeout=3)
37+
resp = await self._stream.receive(timeout=3, is_coordination_calls=True)
3838
except asyncio.TimeoutError:
3939
raise issues.Error("Timeout waiting for SessionStart response")
4040
except StopAsyncIteration:
@@ -73,7 +73,7 @@ async def _reader_loop(self):
7373
try:
7474
while True:
7575
try:
76-
resp = await self._stream.receive(timeout=3)
76+
resp = await self._stream.receive(timeout=3, is_coordination_calls=True)
7777
except asyncio.TimeoutError:
7878
continue
7979
except asyncio.CancelledError:

0 commit comments

Comments
 (0)