Skip to content

Commit 49c85b3

Browse files
authored
Merge pull request #298 Fix hang while write messages to closed driver.
2 parents c9f389d + 2a4141f commit 49c85b3

File tree

5 files changed

+74
-5
lines changed

5 files changed

+74
-5
lines changed

tests/topics/test_topic_writer.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from __future__ import annotations
2+
3+
import asyncio
24
from typing import List
35

46
import pytest
@@ -96,12 +98,39 @@ async def test_write_multi_message_with_ack(
9698
None,
9799
],
98100
)
99-
async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec):
101+
async def test_write_encoded(self, driver: ydb.aio.Driver, topic_path: str, codec):
100102
async with driver.topic_client.writer(topic_path, codec=codec) as writer:
101103
await writer.write("a" * 1000)
102104
await writer.write("b" * 1000)
103105
await writer.write("c" * 1000)
104106

107+
async def test_create_writer_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
108+
await driver.stop()
109+
with pytest.raises(ydb.Error):
110+
async with driver.topic_client.writer(topic_path) as writer:
111+
await writer.write_with_ack("123")
112+
113+
async def test_send_message_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
114+
writer = driver.topic_client.writer(topic_path)
115+
await driver.stop()
116+
with pytest.raises(ydb.Error):
117+
await writer.write_with_ack("123")
118+
119+
async def test_preserve_exception_on_cm_close(self, driver: ydb.aio.Driver, topic_path: str):
120+
class TestException(Exception):
121+
pass
122+
123+
with pytest.raises(TestException):
124+
async with driver.topic_client.writer(topic_path) as writer:
125+
await writer.wait_init()
126+
await driver.stop() # will raise exception on topic writer __exit__
127+
128+
# ensure writer has exception internally
129+
with pytest.raises((ydb.Error, asyncio.CancelledError)):
130+
await writer.write_with_ack("123")
131+
132+
raise TestException()
133+
105134

106135
class TestTopicWriterSync:
107136
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
@@ -212,3 +241,30 @@ def test_start_many_sync_writers_in_parallel(self, driver_sync: ydb.Driver, topi
212241

213242
for writer in writers:
214243
writer.close()
244+
245+
def test_create_writer_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
246+
driver_sync.stop()
247+
with pytest.raises(ydb.Error):
248+
with driver_sync.topic_client.writer(topic_path) as writer:
249+
writer.write_with_ack("123")
250+
251+
def test_send_message_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
252+
writer = driver_sync.topic_client.writer(topic_path)
253+
driver_sync.stop()
254+
with pytest.raises(ydb.Error):
255+
writer.write_with_ack("123")
256+
257+
def test_preserve_exception_on_cm_close(self, driver_sync: ydb.Driver, topic_path: str):
258+
class TestException(Exception):
259+
pass
260+
261+
with pytest.raises(TestException):
262+
with driver_sync.topic_client.writer(topic_path) as writer:
263+
writer.wait_init()
264+
driver_sync.stop() # will raise exception on topic writer __exit__
265+
266+
# ensure writer has exception internally
267+
with pytest.raises(ydb.Error):
268+
writer.write_with_ack("123")
269+
270+
raise TestException()

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ async def __aenter__(self) -> "WriterAsyncIO":
6565
return self
6666

6767
async def __aexit__(self, exc_type, exc_val, exc_tb):
68-
await self.close()
68+
try:
69+
await self.close()
70+
except BaseException:
71+
if exc_val is None:
72+
raise
6973

7074
def __del__(self):
7175
if self._closed or self._loop.is_closed():
@@ -330,7 +334,7 @@ def _prepare_internal_messages(self, messages: List[PublicMessage]) -> List[Inte
330334

331335
def _check_stop(self):
332336
if self._stop_reason.done():
333-
raise self._stop_reason.result()
337+
raise self._stop_reason.exception()
334338

335339
async def _connection_loop(self):
336340
retry_settings = RetrySettings() # todo
@@ -543,7 +547,7 @@ def _stop(self, reason: BaseException):
543547
if self._stop_reason.done():
544548
return
545549

546-
self._stop_reason.set_result(reason)
550+
self._stop_reason.set_exception(reason)
547551

548552
for f in self._messages_future:
549553
f.set_exception(reason)

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ def __enter__(self):
5656
return self
5757

5858
def __exit__(self, exc_type, exc_val, exc_tb):
59-
self.close()
59+
try:
60+
self.close()
61+
except BaseException:
62+
if exc_val is None:
63+
raise
6064

6165
def __del__(self):
6266
self.close(flush=False)

ydb/aio/pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ async def __call__(
241241
preferred_endpoint=None,
242242
fast_fail=False,
243243
):
244+
if self._stopped:
245+
raise issues.Error("Driver was stopped")
244246
wait_timeout = settings.timeout if settings else 10
245247
try:
246248
connection = await self._store.get(preferred_endpoint, fast_fail=fast_fail, wait_timeout=wait_timeout)

ydb/pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,9 @@ def __call__(
429429
430430
:return: A result of computation
431431
"""
432+
if self._stopped:
433+
raise issues.Error("Driver was stopped")
434+
432435
tracing.trace(self.tracer, {"request": request, "stub": stub, "rpc_name": rpc_name})
433436
try:
434437
connection = self._store.get(preferred_endpoint)

0 commit comments

Comments
 (0)