Skip to content

Commit 9097cf4

Browse files
committed
Fix hang while write messages to closed driver.
Close #296
1 parent c9f389d commit 9097cf4

File tree

6 files changed

+68
-5
lines changed

6 files changed

+68
-5
lines changed

tests/topics/test_topic_writer.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,38 @@ async def test_write_multi_message_with_ack(
9696
None,
9797
],
9898
)
99-
async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec):
99+
async def test_write_encoded(self, driver: ydb.aio.Driver, topic_path: str, codec):
100100
async with driver.topic_client.writer(topic_path, codec=codec) as writer:
101101
await writer.write("a" * 1000)
102102
await writer.write("b" * 1000)
103103
await writer.write("c" * 1000)
104104

105+
async def test_create_writer_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
106+
await driver.stop()
107+
with pytest.raises(ydb.Error):
108+
async with driver.topic_client.writer(topic_path) as writer:
109+
await writer.write_with_ack("123")
110+
111+
async def test_send_message_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
112+
writer = driver.topic_client.writer(topic_path)
113+
await driver.stop()
114+
with pytest.raises(ydb.Error):
115+
await writer.write_with_ack("123")
116+
117+
async def test_preserve_exception_on_cm_close(self, driver: ydb.aio.Driver, topic_path: str):
118+
class TestException(Exception):
119+
pass
120+
121+
with pytest.raises(TestException):
122+
async with driver.topic_client.writer(topic_path) as writer:
123+
driver.stop() # will raise exception on topic writer __exit__
124+
try:
125+
writer.write("123")
126+
except ydb.Error:
127+
pass
128+
129+
raise TestException()
130+
105131

106132
class TestTopicWriterSync:
107133
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
@@ -212,3 +238,29 @@ def test_start_many_sync_writers_in_parallel(self, driver_sync: ydb.Driver, topi
212238

213239
for writer in writers:
214240
writer.close()
241+
242+
def test_create_writer_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
243+
driver_sync.stop()
244+
with pytest.raises(ydb.Error):
245+
with driver_sync.topic_client.writer(topic_path) as writer:
246+
writer.write_with_ack("123")
247+
248+
def test_send_message_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
249+
writer = driver_sync.topic_client.writer(topic_path)
250+
driver_sync.stop()
251+
with pytest.raises(ydb.Error):
252+
writer.write_with_ack("123")
253+
254+
def test_preserve_exception_on_cm_close(self, driver_sync: ydb.Driver, topic_path: str):
255+
class TestException(Exception):
256+
pass
257+
258+
with pytest.raises(TestException):
259+
with driver_sync.topic_client.writer(topic_path) as writer:
260+
driver_sync.stop() # will raise exception on topic writer __exit__
261+
try:
262+
writer.write("123")
263+
except ydb.Error:
264+
pass
265+
266+
raise TestException()

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ def _prepare_internal_messages(self, messages: List[PublicMessage]) -> List[Inte
330330

331331
def _check_stop(self):
332332
if self._stop_reason.done():
333-
raise self._stop_reason.result()
333+
raise self._stop_reason.exception()
334334

335335
async def _connection_loop(self):
336336
retry_settings = RetrySettings() # todo
@@ -543,7 +543,7 @@ def _stop(self, reason: BaseException):
543543
if self._stop_reason.done():
544544
return
545545

546-
self._stop_reason.set_result(reason)
546+
self._stop_reason.set_exception(reason)
547547

548548
for f in self._messages_future:
549549
f.set_exception(reason)

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from concurrent.futures import Future
66
from typing import Union, List, Optional
77

8+
import ydb
89
from .._grpc.grpcwrapper.common_utils import SupportedDriverType
910
from .topic_writer import (
1011
PublicWriterSettings,
@@ -56,7 +57,12 @@ def __enter__(self):
5657
return self
5758

5859
def __exit__(self, exc_type, exc_val, exc_tb):
59-
self.close()
60+
try:
61+
self.close()
62+
except ydb.Error:
63+
if exc_val:
64+
raise exc_val
65+
raise
6066

6167
def __del__(self):
6268
self.close(flush=False)

ydb/aio/pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ async def __call__(
241241
preferred_endpoint=None,
242242
fast_fail=False,
243243
):
244+
if self._stopped:
245+
raise issues.Error("Driver was stopped")
244246
wait_timeout = settings.timeout if settings else 10
245247
try:
246248
connection = await self._store.get(preferred_endpoint, fast_fail=fast_fail, wait_timeout=wait_timeout)

ydb/pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,9 @@ def __call__(
429429
430430
:return: A result of computation
431431
"""
432+
if self._stopped:
433+
raise issues.Error("Driver was stopped")
434+
432435
tracing.trace(self.tracer, {"request": request, "stub": stub, "rpc_name": rpc_name})
433436
try:
434437
connection = self._store.get(preferred_endpoint)

ydb/topic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ def writer(
337337
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
338338
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
339339
) -> TopicWriter:
340-
args = locals()
340+
args = locals().copy()
341341
del args["self"]
342342
self._check_closed()
343343

0 commit comments

Comments
 (0)