Skip to content

Commit 9fe5bdf

Browse files
committed
Return topic desctuctors
1 parent 4943aec commit 9fe5bdf

File tree

5 files changed

+13
-2
lines changed

5 files changed

+13
-2
lines changed

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ def __init__(self, convert_server_grpc_to_wrapper):
161161
self._stream_call = None
162162
self._wait_executor = None
163163

164+
def __del__(self):
165+
self._clean_executor(wait=False)
166+
164167
async def start(self, driver: SupportedDriverType, stub, method):
165168
if asyncio.iscoroutinefunction(driver.__call__):
166169
await self._start_asyncio_driver(driver, stub, method)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
9696
def __del__(self):
9797
if not self._closed:
9898
logger.warning("Topic reader was not closed properly. Consider using method close().")
99+
task = self._loop.create_task(self.close(flush=False))
100+
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader")
99101

100102
async def wait_message(self):
101103
"""

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ async def create_reader():
6060
def __del__(self):
6161
if not self._closed:
6262
logger.warning("Topic reader was not closed properly. Consider using method close().")
63+
self.close(flush=False)
6364

6465
def __enter__(self):
6566
return self

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import concurrent.futures
33
import datetime
4+
import functools
45
import gzip
56
import typing
67
from collections import deque
@@ -79,8 +80,11 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
7980
raise
8081

8182
def __del__(self):
82-
if not self._closed:
83-
logger.warning("Topic writer was not closed properly. Consider using method close().")
83+
if self._closed or self._loop.is_closed():
84+
return
85+
86+
logger.warning("Topic writer was not closed properly. Consider using method close().")
87+
self._loop.call_soon(functools.partial(self.close, flush=False))
8488

8589
async def close(self, *, flush: bool = True):
8690
if self._closed:

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7676
def __del__(self):
7777
if not self._closed:
7878
logger.warning("Topic writer was not closed properly. Consider using method close().")
79+
self.close(flush=False)
7980

8081
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
8182
if self._closed:

0 commit comments

Comments
 (0)