Skip to content

Commit 1e7eb8e

Browse files
committed
code cleanup
1 parent ffd54e4 commit 1e7eb8e

File tree

8 files changed

+47
-42
lines changed

8 files changed

+47
-42
lines changed

rsocket/frame.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
from rsocket.exceptions import RSocketProtocolError, ParseError, RSocketUnknownFrameType
1010
from rsocket.fragment import Fragment
1111
from rsocket.frame_fragmenter import data_to_fragments_if_required
12-
from rsocket.frame_helpers import unpack_position, pack_position, unpack_24bit, pack_24bit, unpack_32bit, \
13-
ensure_bytes, pack_string, unpack_string
12+
from rsocket.frame_helpers import (is_flag_set, unpack_position, pack_position, unpack_24bit, pack_24bit, unpack_32bit,
13+
ensure_bytes, pack_string, unpack_string)
1414
from rsocket.logger import logger
1515

1616
PROTOCOL_MAJOR_VERSION = 1
@@ -96,9 +96,6 @@ class ParseHelper:
9696
parse_header: Callable = None
9797

9898

99-
from rsocket.frame_helpers import is_flag_set
100-
101-
10299
def parse_header_native(frame: Header, buffer: bytes, offset: int) -> Flags:
103100
frame.length = len(buffer)
104101
frame.stream_id, frame.frame_type, flag_bits = struct.unpack_from('>IBB', buffer, offset)
@@ -780,11 +777,13 @@ def is_fragmentable_frame(frame: Frame) -> bool:
780777
))
781778

782779

783-
FragmentableFrame = Union[PayloadFrame,
784-
RequestResponseFrame,
785-
RequestChannelFrame,
786-
RequestStreamFrame,
787-
RequestFireAndForgetFrame]
780+
FragmentableFrame = Union[
781+
PayloadFrame,
782+
RequestResponseFrame,
783+
RequestChannelFrame,
784+
RequestStreamFrame,
785+
RequestFireAndForgetFrame
786+
]
788787

789788

790789
def new_frame_fragment(base_frame: FragmentableFrame, fragment: Fragment) -> Frame:

rsocket/frame_fragment_cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def _frame_fragment_builder(self, next_fragment: FragmentableFrame) -> Fragmenta
2424

2525
current_frame_from_fragments = self._frames_by_stream_id.get(next_fragment.stream_id)
2626

27-
if current_frame_from_fragments is not None and type(next_fragment) != PayloadFrame:
27+
if current_frame_from_fragments is not None and not isinstance(next_fragment, PayloadFrame):
2828
raise RSocketFrameFragmentDifferentType()
2929

3030
if current_frame_from_fragments is None:

rsocket/handlers/interfaces.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,10 @@
1-
class Requester:
2-
pass
1+
import abc
2+
3+
from rsocket.frame import Frame
4+
5+
6+
class Requester(metaclass=abc.ABCMeta):
7+
8+
@abc.abstractmethod
9+
def frame_received(self, frame: Frame):
10+
...

rsocket/reactivex/from_rsocket_publisher.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import asyncio
21
import functools
2+
from asyncio import Event, CancelledError, get_event_loop, create_task
33

44
import reactivex
55
from reactivex import Observable, Observer
@@ -17,8 +17,8 @@ def __init__(self, observer: Observer, limit_rate: int = MAX_REQUEST_N):
1717
self.limit_rate = limit_rate
1818
self.observer = observer
1919
self._received_messages = 0
20-
self.done = asyncio.Event()
21-
self.get_next_n = asyncio.Event()
20+
self.done = Event()
21+
self.get_next_n = Event()
2222
self.subscription = None
2323

2424
def on_subscribe(self, subscription: Subscription):
@@ -54,7 +54,7 @@ async def _aio_sub(publisher: Publisher, subscriber: RxSubscriber, observer: Obs
5454
publisher.subscribe(subscriber)
5555
await subscriber.done.wait()
5656

57-
except asyncio.CancelledError:
57+
except CancelledError:
5858
if not subscriber.done.is_set():
5959
subscriber.subscription.cancel()
6060
except Exception as exception:
@@ -67,21 +67,21 @@ async def _trigger_next_request_n(subscriber: RxSubscriber, limit_rate):
6767
await subscriber.get_next_n.wait()
6868
subscriber.subscription.request(limit_rate)
6969
subscriber.get_next_n.clear()
70-
except asyncio.CancelledError:
70+
except CancelledError:
7171
logger().debug('Asyncio task canceled: trigger_next_request_n')
7272

7373

7474
def from_rsocket_publisher(publisher: Publisher, limit_rate: int = MAX_REQUEST_N) -> Observable:
75-
loop = asyncio.get_event_loop()
75+
loop = get_event_loop()
7676

7777
# noinspection PyUnusedLocal
7878
def on_subscribe(observer: Observer, scheduler):
7979
subscriber = RxSubscriber(observer, limit_rate)
8080

81-
get_next_task = asyncio.create_task(
81+
get_next_task = create_task(
8282
_trigger_next_request_n(subscriber, limit_rate)
8383
)
84-
task = asyncio.create_task(
84+
task = create_task(
8585
_aio_sub(publisher, subscriber, observer, loop)
8686
)
8787

rsocket/reactivex/reactivex_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import asyncio
2-
from asyncio import Future
1+
from asyncio import Future, Event
32
from typing import Optional, cast, Union, Callable
43

54
import reactivex
@@ -30,7 +29,7 @@ def request_channel(self,
3029
request: Payload,
3130
request_limit: int = MAX_REQUEST_N,
3231
observable: Optional[Union[Observable, Callable[[Subject], Observable]]] = None,
33-
sending_done: Optional[asyncio.Event] = None) -> Observable:
32+
sending_done: Optional[Event] = None) -> Observable:
3433
requester_publisher = observable_to_publisher(observable)
3534

3635
response_publisher = self._rsocket.request_channel(

rsocket/rsocket_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class RSocketClient(RSocketBase):
2121
"""
2222
Client side instance of an RSocket connection.
2323
24-
:param transport_provider: Async generator which returns `Transport` to use with this instance. See `Transport` class implementations.
24+
:param transport_provider: Async generator which returns `Transport` to use with this instance.
2525
:param request_queue_size: Number of frames which can be queued while waiting for a lease.
2626
:param fragment_size_bytes: Minimum 64, Maximum depends on transport.
2727
"""

rsocket/rx_support/back_pressure_publisher.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import asyncio
2-
from asyncio import Queue
1+
from asyncio import Queue, create_task, sleep
32
from dataclasses import dataclass
43
from typing import Optional, Callable, AsyncGenerator, Union
54

@@ -50,16 +49,16 @@ async def task_from_awaitable(future):
5049
async def coroutine_from_awaitable(awaitable):
5150
return await awaitable
5251

53-
task = asyncio.create_task(coroutine_from_awaitable(future))
54-
await asyncio.sleep(0) # allow awaitable to be accessed at least once
52+
task = create_task(coroutine_from_awaitable(future))
53+
await sleep(0) # allow awaitable to be accessed at least once
5554
return task
5655

5756

5857
def observable_from_async_generator(iterator, backpressure: Subject) -> Observable:
5958
# noinspection PyUnusedLocal
6059
def on_subscribe(observer: Observer, scheduler):
6160

62-
request_n_queue = asyncio.Queue()
61+
request_n_queue = Queue()
6362

6463
def request_next_n(n):
6564
request_n_queue.put_nowait(n)
@@ -92,15 +91,15 @@ def cancel_sender():
9291
on_completed=cancel_sender
9392
)
9493

95-
sender = asyncio.create_task(_aio_next())
94+
sender = create_task(_aio_next())
9695

9796
return result
9897

9998
return rx.create(on_subscribe)
10099

101100

102101
async def observable_to_async_event_generator(observable: Observable) -> AsyncGenerator[Notification, None]:
103-
queue = asyncio.Queue()
102+
queue = Queue()
104103

105104
completed = object()
106105

@@ -131,7 +130,7 @@ def from_async_event_iterator(iterator, backpressure: Subject) -> Observable:
131130
# noinspection PyUnusedLocal
132131
def on_subscribe(observer: Observer, scheduler):
133132

134-
request_n_queue = asyncio.Queue()
133+
request_n_queue = Queue()
135134

136135
async def _aio_next():
137136

@@ -156,7 +155,7 @@ async def _aio_next():
156155
logger().error(str(exception), exc_info=True)
157156
observer.on_error(exception)
158157

159-
sender = asyncio.create_task(_aio_next())
158+
sender = create_task(_aio_next())
160159

161160
def cancel_sender():
162161
sender.cancel()

rsocket/rx_support/from_rsocket_publisher.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import asyncio
21
import functools
2+
from asyncio import Event, CancelledError, get_event_loop, create_task
33

44
import rx
55
from rx import Observable
@@ -18,8 +18,8 @@ def __init__(self, observer: Observer, limit_rate: int = MAX_REQUEST_N):
1818
self.limit_rate = limit_rate
1919
self.observer = observer
2020
self._received_messages = 0
21-
self.done = asyncio.Event()
22-
self.get_next_n = asyncio.Event()
21+
self.done = Event()
22+
self.get_next_n = Event()
2323
self.subscription = None
2424

2525
def on_subscribe(self, subscription: Subscription):
@@ -55,7 +55,7 @@ async def _aio_sub(publisher: Publisher, subscriber: RxSubscriber, observer: Obs
5555
publisher.subscribe(subscriber)
5656
await subscriber.done.wait()
5757

58-
except asyncio.CancelledError:
58+
except CancelledError:
5959
if not subscriber.done.is_set():
6060
subscriber.subscription.cancel()
6161
except Exception as exception:
@@ -68,21 +68,21 @@ async def _trigger_next_request_n(subscriber: RxSubscriber, limit_rate):
6868
await subscriber.get_next_n.wait()
6969
subscriber.subscription.request(limit_rate)
7070
subscriber.get_next_n.clear()
71-
except asyncio.CancelledError:
71+
except CancelledError:
7272
logger().debug('Asyncio task canceled: trigger_next_request_n')
7373

7474

7575
def from_rsocket_publisher(publisher: Publisher, limit_rate: int = MAX_REQUEST_N) -> Observable:
76-
loop = asyncio.get_event_loop()
76+
loop = get_event_loop()
7777

7878
# noinspection PyUnusedLocal
7979
def on_subscribe(observer: Observer, scheduler):
8080
subscriber = RxSubscriber(observer, limit_rate)
8181

82-
get_next_task = asyncio.create_task(
82+
get_next_task = create_task(
8383
_trigger_next_request_n(subscriber, limit_rate)
8484
)
85-
task = asyncio.create_task(
85+
task = create_task(
8686
_aio_sub(publisher, subscriber, observer, loop)
8787
)
8888

0 commit comments

Comments
 (0)