Skip to content

Commit 34e2a0a

Browse files
committed
add wrapper for asyncio.create_task
1 parent c5e3a64 commit 34e2a0a

File tree

3 files changed

+39
-10
lines changed

3 files changed

+39
-10
lines changed

ydb/_topic_common/common.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import concurrent.futures
3+
import sys
34
import threading
45
import typing
56
from typing import Optional
@@ -29,6 +30,12 @@ def wrapper(rpc_state, response_pb, driver=None):
2930
return wrapper
3031

3132

33+
def wrap_create_asyncio_task(func: typing.Callable, *args, **kwargs, task_name: str):
34+
if sys.hexversion < 0x03080000:
35+
return asyncio.create_task(func(*args, **kwargs))
36+
return asyncio.create_task(func(*args, **kwargs), task_name=loop_name)
37+
38+
3239
_shared_event_loop_lock = threading.Lock()
3340
_shared_event_loop: Optional[asyncio.AbstractEventLoop] = None
3441

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import asyncio
44
import concurrent.futures
55
import gzip
6+
import sys
67
import typing
78
from asyncio import Task
89
from collections import deque
910
from typing import Optional, Set, Dict, Union, Callable
1011

1112
import ydb
1213
from .. import _apis, issues
14+
from .._topic_common import common as topic_common
1315
from .._utilities import AtomicCounter
1416
from ..aio import Driver
1517
from ..issues import Error as YdbError, _process_response
@@ -87,7 +89,10 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
8789

8890
def __del__(self):
8991
if not self._closed:
90-
self._loop.create_task(self.close(flush=False))
92+
if sys.hexversion < 0x03080000:
93+
self._loop.create_task(self.close(flush=False))
94+
else:
95+
self._loop.create_task(self.close(flush=False), name="close reader")
9196

9297
async def wait_message(self):
9398
"""
@@ -337,11 +342,21 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
337342

338343
self._update_token_event.set()
339344

340-
self._background_tasks.add(asyncio.create_task(self._read_messages_loop()))
341-
self._background_tasks.add(asyncio.create_task(self._decode_batches_loop()))
345+
self._background_tasks.add(
346+
topic_common.wrap_create_asyncio_task(self._read_messages_loop, task_name="read_messages_loop"),
347+
)
348+
self._background_tasks.add(
349+
topic_common.wrap_create_asyncio_task(self._decode_batches_loop, task_name="decode_batches"),
350+
)
342351
if self._get_token_function:
343-
self._background_tasks.add(asyncio.create_task(self._update_token_loop()))
344-
self._background_tasks.add(asyncio.create_task(self._handle_background_errors()))
352+
self._background_tasks.add(
353+
topic_common.wrap_create_asyncio_task(self._update_token_loop, task_name="update_token_loop"),
354+
)
355+
self._background_tasks.add(
356+
topic_common.wrap_create_asyncio_task(
357+
self._handle_background_errors, task_name="handle_background_errors",
358+
),
359+
)
345360

346361
async def wait_error(self):
347362
raise await self._first_error

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
issues,
2929
)
3030
from .._errors import check_retriable_error
31+
from .._topic_common import common as topic_common
3132
from ..retries import RetrySettings
3233
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
3334
from .._grpc.grpcwrapper.ydb_topic import (
@@ -231,8 +232,8 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
231232
self._new_messages = asyncio.Queue()
232233
self._stop_reason = self._loop.create_future()
233234
self._background_tasks = [
234-
asyncio.create_task(self._connection_loop()),
235-
asyncio.create_task(self._encode_loop()),
235+
topic_common.wrap_create_asyncio_task(self._connection_loop, task_name="connection_loop"),
236+
topic_common.wrap_create_asyncio_task(self._encode_loop, task_name="encode_loop"),
236237
]
237238

238239
self._state_changed = asyncio.Event()
@@ -366,8 +367,12 @@ async def _connection_loop(self):
366367

367368
self._stream_connected.set()
368369

369-
send_loop = asyncio.create_task(self._send_loop(stream_writer))
370-
receive_loop = asyncio.create_task(self._read_loop(stream_writer))
370+
send_loop = topic_common.wrap_create_asyncio_task(
371+
self._send_loop, stream_writer, task_name="writer send loop",
372+
)
373+
receive_loop = topic_common.wrap_create_asyncio_task(
374+
self._read_loop, stream_writer, task_name="writer receive loop",
375+
)
371376

372377
tasks = [send_loop, receive_loop]
373378
done, _ = await asyncio.wait([send_loop, receive_loop], return_when=asyncio.FIRST_COMPLETED)
@@ -653,7 +658,9 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
653658

654659
if self._update_token_interval is not None:
655660
self._update_token_event.set()
656-
self._update_token_task = asyncio.create_task(self._update_token_loop())
661+
self._update_token_task = topic_common.wrap_create_asyncio_task(
662+
self._update_token_loop, task_name="update_token_loop",
663+
)
657664

658665
@staticmethod
659666
def _ensure_ok(message: WriterMessagesFromServerToClient):

0 commit comments

Comments
 (0)