Skip to content

Commit b097524

Browse files
refactoring canceling tasks to helpermethod
1 parent 3b713ba commit b097524

File tree

5 files changed

+24
-22
lines changed

5 files changed

+24
-22
lines changed

rsocket/helpers.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import struct
3+
from asyncio import Task
34
from typing import TypeVar
45
from contextlib import contextmanager
56
from typing import Any
@@ -11,6 +12,7 @@
1112
from rsocket.exceptions import RSocketMimetypeTooLong
1213
from rsocket.exceptions import RSocketTransportError
1314
from rsocket.frame import Frame
15+
from rsocket.logger import logger
1416
from rsocket.payload import Payload
1517

1618
_default = object()
@@ -111,3 +113,15 @@ def parse_well_known_encoding(buffer: bytes, encoding_name_provider: Callable[[W
111113
offset = 1 + real_mime_type_length
112114

113115
return metadata_encoding, offset
116+
117+
118+
async def cancel_if_task_exists(task: Optional[Task]):
119+
if task is not None:
120+
task.cancel()
121+
122+
try:
123+
await task
124+
except asyncio.CancelledError:
125+
logger().debug('Asyncio task cancellation error: %s', str(task))
126+
except RuntimeError:
127+
logger().error('Runtime error', exc_info=True)

rsocket/rsocket_base.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from rsocket.handlers.request_response_responder import RequestResponseResponder
3030
from rsocket.handlers.request_stream_requester import RequestStreamRequester
3131
from rsocket.handlers.request_stream_responder import RequestStreamResponder
32-
from rsocket.helpers import payload_from_frame, async_noop
32+
from rsocket.helpers import payload_from_frame, async_noop, cancel_if_task_exists
3333
from rsocket.lease import DefinedLease, NullLease, Lease
3434
from rsocket.logger import logger
3535
from rsocket.payload import Payload
@@ -413,8 +413,8 @@ async def close(self):
413413
logger().debug('%s: Closing', self._log_identifier())
414414

415415
self._is_closing = True
416-
await self._cancel_if_task_exists(self._sender_task)
417-
await self._cancel_if_task_exists(self._receiver_task)
416+
await cancel_if_task_exists(self._sender_task)
417+
await cancel_if_task_exists(self._receiver_task)
418418

419419
await self._close_transport()
420420

@@ -430,16 +430,6 @@ async def _close_transport(self):
430430
logger().debug('Transport already closed or failed to close: %s', str(exception))
431431
pass
432432

433-
async def _cancel_if_task_exists(self, task):
434-
if task is not None:
435-
task.cancel()
436-
try:
437-
await task
438-
except asyncio.CancelledError:
439-
logger().debug('%s: Asyncio task cancellation error: %s', self._log_identifier(), str(task))
440-
except RuntimeError:
441-
logger().error('Runtime error', exc_info=True)
442-
443433
async def __aenter__(self) -> 'RSocketBase':
444434
return self
445435

rsocket/rsocket_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from reactivestreams.publisher import Publisher
88
from rsocket.exceptions import RSocketNoAvailableTransport
99
from rsocket.extensions.mimetypes import WellKnownMimeTypes
10-
from rsocket.helpers import create_future
10+
from rsocket.helpers import create_future, cancel_if_task_exists
1111
from rsocket.logger import logger
1212
from rsocket.payload import Payload
1313
from rsocket.request_handler import BaseRequestHandler
@@ -93,7 +93,7 @@ async def close(self):
9393

9494
async def _close(self, reconnect=False):
9595
if not reconnect:
96-
await self._cancel_if_task_exists(self._reconnect_task)
96+
await cancel_if_task_exists(self._reconnect_task)
9797
else:
9898
logger().debug('%s: Closing before reconnect', self._log_identifier())
9999

@@ -141,7 +141,7 @@ def _before_sender(self):
141141
self._keepalive_task = self._start_task_if_not_closing(self._keepalive_send_task)
142142

143143
async def _finally_sender(self):
144-
await self._cancel_if_task_exists(self._keepalive_task)
144+
await cancel_if_task_exists(self._keepalive_task)
145145

146146
def _update_last_keepalive(self):
147147
self._last_server_keepalive = datetime.now()
@@ -171,4 +171,4 @@ async def _receiver_listen(self):
171171
try:
172172
await super()._receiver_listen()
173173
finally:
174-
await self._cancel_if_task_exists(keepalive_timeout_task)
174+
await cancel_if_task_exists(keepalive_timeout_task)

rsocket/transports/abstract_messaging.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ def __init__(self):
1212
async def next_frame_generator(self):
1313
frame = await self._incoming_frame_queue.get()
1414

15-
self._incoming_frame_queue.task_done()
16-
1715
if isinstance(frame, Exception):
1816
raise frame
1917

rsocket/transports/aioquic_transport.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from rsocket.exceptions import RSocketTransportError
99
from rsocket.frame import Frame
10-
from rsocket.helpers import wrap_transport_exception
10+
from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists
1111
from rsocket.logger import logger
1212
from rsocket.rsocket_server import RSocketServer
1313
from rsocket.transports.abstract_messaging import AbstractMessagingTransport
@@ -95,16 +95,16 @@ async def incoming_data_listener(self):
9595

9696
if isinstance(data, Exception):
9797
self._incoming_frame_queue.put_nowait(data)
98+
return
9899
else:
99100
async for frame in self._frame_parser.receive_data(data, 0):
100101
self._incoming_frame_queue.put_nowait(frame)
101102

102-
self._incoming_bytes_queue.task_done()
103103
except asyncio.CancelledError:
104104
logger().debug('Asyncio task canceled: incoming_data_listener')
105105
except Exception:
106106
self._incoming_frame_queue.put_nowait(RSocketTransportError())
107107

108108
async def close(self):
109-
self._listener.cancel()
109+
await cancel_if_task_exists(self._listener)
110110
self._quic_protocol.close()

0 commit comments

Comments
 (0)