Skip to content

Commit e99c545

Browse files
committed
Return topic desctuctors
1 parent 4943aec commit e99c545

File tree

8 files changed

+23
-10
lines changed

8 files changed

+23
-10
lines changed

tests/topics/test_topic_reader.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ async def test_read_batch(self, driver, topic_with_messages, topic_consumer):
1717
await reader.close()
1818

1919
async def test_link_to_client(self, driver, topic_path, topic_consumer):
20-
reader = driver.topic_client.reader(topic_path, topic_consumer)
21-
assert reader._parent is driver.topic_client
20+
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
21+
assert reader._parent is driver.topic_client
2222

2323
async def test_read_message(self, driver, topic_with_messages, topic_consumer):
2424
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
@@ -138,8 +138,8 @@ def test_read_batch(self, driver_sync, topic_with_messages, topic_consumer):
138138
reader.close()
139139

140140
def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
141-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
142-
assert reader._parent is driver_sync.topic_client
141+
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
142+
assert reader._parent is driver_sync.topic_client
143143

144144
def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):
145145
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)

tests/topics/test_topic_writer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
3737
assert init_info.last_seqno == 5
3838

3939
async def test_link_to_client(self, driver, topic_path, topic_consumer):
40-
writer = driver.topic_client.writer(topic_path)
41-
assert writer._parent is driver.topic_client
40+
async with driver.topic_client.writer(topic_path) as writer:
41+
assert writer._parent is driver.topic_client
4242

4343
async def test_random_producer_id(self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO):
4444
async with driver.topic_client.writer(topic_path) as writer:
@@ -180,8 +180,8 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
180180
assert init_info.last_seqno == last_seqno
181181

182182
def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
183-
writer = driver_sync.topic_client.writer(topic_path)
184-
assert writer._parent is driver_sync.topic_client
183+
with driver_sync.topic_client.writer(topic_path) as writer:
184+
assert writer._parent is driver_sync.topic_client
185185

186186
def test_random_producer_id(
187187
self,

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ def __init__(self, convert_server_grpc_to_wrapper):
161161
self._stream_call = None
162162
self._wait_executor = None
163163

164+
def __del__(self):
165+
self._clean_executor(wait=False)
166+
164167
async def start(self, driver: SupportedDriverType, stub, method):
165168
if asyncio.iscoroutinefunction(driver.__call__):
166169
await self._start_asyncio_driver(driver, stub, method)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
9696
def __del__(self):
9797
if not self._closed:
9898
logger.warning("Topic reader was not closed properly. Consider using method close().")
99+
task = self._loop.create_task(self.close(flush=False))
100+
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader")
99101

100102
async def wait_message(self):
101103
"""

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ async def create_reader():
6060
def __del__(self):
6161
if not self._closed:
6262
logger.warning("Topic reader was not closed properly. Consider using method close().")
63+
self.close(flush=False)
6364

6465
def __enter__(self):
6566
return self

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import concurrent.futures
33
import datetime
4+
import functools
45
import gzip
56
import typing
67
from collections import deque
@@ -79,8 +80,11 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
7980
raise
8081

8182
def __del__(self):
82-
if not self._closed:
83-
logger.warning("Topic writer was not closed properly. Consider using method close().")
83+
if self._closed or self._loop.is_closed():
84+
return
85+
86+
logger.warning("Topic writer was not closed properly. Consider using method close().")
87+
self._loop.call_soon(functools.partial(self.close, flush=False))
8488

8589
async def close(self, *, flush: bool = True):
8690
if self._closed:

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7676
def __del__(self):
7777
if not self._closed:
7878
logger.warning("Topic writer was not closed properly. Consider using method close().")
79+
self.close(flush=False)
7980

8081
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
8182
if self._closed:

ydb/topic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def __init__(self, driver: aio.Driver, settings: Optional[TopicClientSettings] =
117117
def __del__(self):
118118
if not self._closed:
119119
logger.warning("Topic client was not closed properly. Consider using method close().")
120+
self.close()
120121

121122
async def create_topic(
122123
self,
@@ -349,6 +350,7 @@ def __init__(self, driver: driver.Driver, settings: Optional[TopicClientSettings
349350
def __del__(self):
350351
if not self._closed:
351352
logger.warning("Topic client was not closed properly. Consider using method close().")
353+
self.close()
352354

353355
def create_topic(
354356
self,

0 commit comments

Comments
 (0)