Skip to content

Commit beb3fee

Browse files
authored
Merge pull request #90 from rsocket/concurrent_multiple_responses
request concurrency fix for reactivex integration
2 parents 524172b + 40df612 commit beb3fee

17 files changed

+197
-15
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ v0.4.5
99
- Breaking change: ReactiveX clients will remove empty payload from request_response Observable, resulting in an actually empty Observable
1010
- Bug fix: fixed channel stream being released prematurely when canceled by requester, and responder side still working
1111
- Bug fix: removed cyclic references in RSocketBase which caused old sessions not to be released
12+
- Bug fix: fixed ability for rxpy streams and fragmented responses to send payloads concurrently
1213
- CollectorSubscriber : exposed subscription methods directly instead of relying on internal **subscription** variable
1314
- Reactivex server side request_response allowed to return reactivex.empty(). Library code will replace with empty Payload when needed
1415
- Added EmptyStream for use in stream and channel responses

rsocket/async_helpers.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import asyncio
2+
3+
4+
async def async_range(count: int):
5+
for i in range(count):
6+
yield i
7+
await asyncio.sleep(0.0)

rsocket/awaitable/collector_subscriber.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def on_next(self, value, is_complete=False):
4343
else:
4444
if self._received_count == self._limit_rate:
4545
self._received_count = 0
46-
self.subscription.request(self._limit_rate)
46+
self.subscription.request(self._limit_rate)
4747

4848
def on_error(self, exception: Exception):
4949
self.error = exception

rsocket/frame.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,5 @@ def serialize_with_frame_size_header(frame: Frame) -> bytes:
759759
RequestChannelFrame: 10,
760760
}
761761

762-
763762
def get_header_length(frame: FragmentableFrame) -> int:
764763
return frame_header_length[frame.__class__]

rsocket/reactivex/back_pressure_publisher.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from reactivestreams.publisher import Publisher
1313
from reactivestreams.subscriber import Subscriber
14+
from rsocket.async_helpers import async_range
1415
from rsocket.helpers import DefaultPublisherSubscription
1516
from rsocket.logger import logger
1617
from rsocket.reactivex.subscriber_adapter import SubscriberAdapter
@@ -66,8 +67,12 @@ async def _aio_next():
6667

6768
try:
6869
while True:
69-
next_n = await request_n_queue.get()
70-
for i in range(next_n):
70+
try:
71+
next_n = await request_n_queue.get()
72+
except RuntimeError:
73+
return
74+
75+
async for i in async_range(next_n):
7176
try:
7277
value = await iterator.__anext__()
7378
observer.on_next(value)
@@ -100,15 +105,23 @@ def cancel_sender():
100105
async def observable_to_async_event_generator(observable: Observable) -> AsyncGenerator[Notification, None]:
101106
queue = asyncio.Queue()
102107

108+
completed = object()
109+
103110
def on_next(i):
104111
queue.put_nowait(i)
105112

106113
observable.pipe(materialize()).subscribe(
107-
on_next=on_next
114+
on_next=on_next,
115+
on_completed=lambda: queue.put_nowait(completed)
108116
)
109117

110118
while True:
111119
value = await queue.get()
120+
121+
if value is completed:
122+
queue.task_done()
123+
return
124+
112125
yield value
113126
queue.task_done()
114127

@@ -128,7 +141,7 @@ async def _aio_next():
128141
try:
129142
while True:
130143
next_n = await request_n_queue.get()
131-
for i in range(next_n):
144+
async for i in async_range(next_n):
132145
event = await iterator.__anext__()
133146

134147
if isinstance(event, OnNext):

rsocket/rsocket_base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,9 @@ async def _get_next_frame_to_send(self, transport: Transport) -> Frame:
404404
if isinstance(next_frame_source, FrameFragmentMixin):
405405
next_fragment = next_frame_source.get_next_fragment(transport.requires_length_header())
406406

407-
if not next_fragment.flags_follows:
407+
if next_fragment.flags_follows:
408+
self._send_queue.put_nowait(self._send_queue.get_nowait()) # cycle to next frame source in queue
409+
else:
408410
next_frame_source.get_next_fragment(
409411
transport.requires_length_header()) # workaround to clean-up generator.
410412
self._send_queue.get_nowait()

rsocket/rx_support/back_pressure_publisher.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from reactivestreams.publisher import Publisher
1414
from reactivestreams.subscriber import Subscriber
15+
from rsocket.async_helpers import async_range
1516
from rsocket.helpers import DefaultPublisherSubscription
1617
from rsocket.logger import logger
1718
from rsocket.rx_support.subscriber_adapter import SubscriberAdapter
@@ -68,7 +69,7 @@ async def _aio_next():
6869
try:
6970
while True:
7071
next_n = await request_n_queue.get()
71-
for i in range(next_n):
72+
async for i in async_range(next_n):
7273
try:
7374
value = await iterator.__anext__()
7475
observer.on_next(value)
@@ -101,15 +102,23 @@ def cancel_sender():
101102
async def observable_to_async_event_generator(observable: Observable) -> AsyncGenerator[Notification, None]:
102103
queue = asyncio.Queue()
103104

105+
completed = object()
106+
104107
def on_next(i):
105108
queue.put_nowait(i)
106109

107110
observable.pipe(materialize()).subscribe(
108-
on_next=on_next
111+
on_next=on_next,
112+
on_completed=lambda: queue.put_nowait(completed)
109113
)
110114

111115
while True:
112116
value = await queue.get()
117+
118+
if value is completed:
119+
queue.task_done()
120+
return
121+
113122
yield value
114123
queue.task_done()
115124

@@ -129,7 +138,7 @@ async def _aio_next():
129138
try:
130139
while True:
131140
next_n = await request_n_queue.get()
132-
for i in range(next_n):
141+
async for i in async_range(next_n):
133142
event = await iterator.__anext__()
134143

135144
if isinstance(event, OnNext):

rsocket/stream_control.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from rsocket.error_codes import ErrorCode
44
from rsocket.exceptions import RSocketStreamAllocationFailure, RSocketStreamIdInUse
55
from rsocket.frame import CONNECTION_STREAM_ID, Frame, ErrorFrame
6+
from rsocket.logger import logger
67
from rsocket.streams.stream_handler import StreamHandler
78

89
MAX_STREAM_ID = 0x7FFFFFFF
@@ -33,6 +34,7 @@ def _increment_stream_id(self):
3334
self._current_stream_id = (self._current_stream_id + 2) & self._maximum_stream_id
3435

3536
def finish_stream(self, stream_id: int):
37+
logger().debug('Finishing stream: %s', stream_id)
3638
self._streams.pop(stream_id, None)
3739

3840
def register_stream(self, stream_id: int, handler: StreamHandler):
@@ -54,6 +56,7 @@ def handle_stream(self, frame: Frame) -> bool:
5456
return False
5557

5658
def stop_all_streams(self, error_code=ErrorCode.CANCELED, data=b''):
59+
logger().debug('Stopping all streams')
5760
for stream_id, stream in list(self._streams.items()):
5861
frame = ErrorFrame()
5962
frame.stream_id = stream_id

rsocket/streams/stream_from_async_generator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import AsyncGenerator, Tuple
22

3+
from rsocket.async_helpers import async_range
34
from rsocket.payload import Payload
45
from rsocket.streams.exceptions import FinishedIterator
56
from rsocket.streams.stream_from_generator import StreamFromGenerator
@@ -11,7 +12,7 @@ async def _start_generator(self):
1112

1213
async def _generate_next_n(self, n: int) -> AsyncGenerator[Tuple[Payload, bool], None]:
1314
is_complete_sent = False
14-
for i in range(n):
15+
async for i in async_range(n):
1516
try:
1617
next_value = await self._iteration.__anext__()
1718
is_complete_sent = next_value[1]

rsocket/streams/stream_from_generator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import AsyncGenerator, Tuple, Optional, Callable, Generator
55

66
from reactivestreams.subscriber import Subscriber
7+
from rsocket.async_helpers import async_range
78
from rsocket.helpers import DefaultPublisherSubscription
89
from rsocket.logger import logger
910
from rsocket.payload import Payload
@@ -71,7 +72,7 @@ async def queue_next_n(self):
7172

7273
async def _generate_next_n(self, n: int) -> AsyncGenerator[Tuple[Payload, bool], None]:
7374
is_complete_sent = False
74-
for i in range(n):
75+
async for i in async_range(n):
7576
next_value = next(self._iteration, _finished_iterator)
7677

7778
if next_value is _finished_iterator:

0 commit comments

Comments
 (0)