|
27 | 27 | import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber; |
28 | 28 | import io.reactivesocket.reactivestreams.extensions.Px; |
29 | 29 | import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription; |
30 | | -import io.reactivesocket.reactivestreams.extensions.internal.processors.ConnectableUnicastProcessor; |
31 | 30 | import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber; |
32 | 31 | import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers; |
33 | 32 | import org.agrona.collections.Int2ObjectHashMap; |
34 | | -import org.reactivestreams.Processor; |
35 | 33 | import org.reactivestreams.Publisher; |
36 | 34 | import org.reactivestreams.Subscriber; |
37 | 35 | import org.reactivestreams.Subscription; |
|
40 | 38 | import java.nio.charset.StandardCharsets; |
41 | 39 | import java.util.function.Consumer; |
42 | 40 |
|
43 | | -import static io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers.doOnError; |
| 41 | +import static io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers.*; |
44 | 42 |
|
45 | 43 | /** |
46 | 44 | * Client Side of a ReactiveSocket socket. Sends {@link Frame}s |
@@ -75,46 +73,31 @@ public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> err |
75 | 73 |
|
76 | 74 | @Override |
77 | 75 | public Publisher<Void> fireAndForget(Payload payload) { |
78 | | - try { |
| 76 | + return Px.defer(() -> { |
79 | 77 | final int streamId = nextStreamId(); |
80 | 78 | final Frame requestFrame = Frame.Request.from(streamId, FrameType.FIRE_AND_FORGET, payload, 0); |
81 | 79 | return connection.sendOne(requestFrame); |
82 | | - } catch (Throwable t) { |
83 | | - return Px.error(t); |
84 | | - } |
| 80 | + }); |
85 | 81 | } |
86 | 82 |
|
87 | 83 | @Override |
88 | 84 | public Publisher<Payload> requestResponse(Payload payload) { |
89 | | - final int streamId = nextStreamId(); |
90 | | - final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1); |
91 | | - |
92 | | - return handleRequestResponse(Px.just(requestFrame), streamId, 1, false); |
| 85 | + return handleRequestResponse(payload); |
93 | 86 | } |
94 | 87 |
|
95 | 88 | @Override |
96 | 89 | public Publisher<Payload> requestStream(Payload payload) { |
97 | | - final int streamId = nextStreamId(); |
98 | | - final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, 1); |
99 | | - |
100 | | - return handleStreamResponse(Px.just(requestFrame), streamId); |
| 90 | + return handleStreamResponse(Px.just(payload), FrameType.REQUEST_STREAM); |
101 | 91 | } |
102 | 92 |
|
103 | 93 | @Override |
104 | 94 | public Publisher<Payload> requestSubscription(Payload payload) { |
105 | | - final int streamId = nextStreamId(); |
106 | | - final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_SUBSCRIPTION, payload, 1); |
107 | | - |
108 | | - return handleStreamResponse(Px.just(requestFrame), streamId); |
| 95 | + return handleStreamResponse(Px.just(payload), FrameType.REQUEST_SUBSCRIPTION); |
109 | 96 | } |
110 | 97 |
|
111 | 98 | @Override |
112 | 99 | public Publisher<Payload> requestChannel(Publisher<Payload> payloads) { |
113 | | - final int streamId = nextStreamId(); |
114 | | - return handleStreamResponse(Px.from(payloads) |
115 | | - .map(payload -> { |
116 | | - return Frame.Request.from(streamId, FrameType.REQUEST_CHANNEL, payload, 1); |
117 | | - }), streamId); |
| 100 | + return handleStreamResponse(Px.from(payloads), FrameType.REQUEST_CHANNEL); |
118 | 101 | } |
119 | 102 |
|
120 | 103 | @Override |
@@ -148,77 +131,53 @@ public ClientReactiveSocket start(Consumer<Lease> leaseConsumer) { |
148 | 131 | return this; |
149 | 132 | } |
150 | 133 |
|
151 | | - private Publisher<Payload> handleRequestResponse(final Publisher<Frame> payload, final int streamId, |
152 | | - final int initialRequestN, final boolean sendRequestN) { |
153 | | - ConnectableUnicastProcessor<Frame> sender = new ConnectableUnicastProcessor<>(); |
154 | | - |
155 | | - synchronized (this) { |
156 | | - senders.put(streamId, sender); |
157 | | - } |
158 | | - |
159 | | - final Runnable cleanup = () -> { |
| 134 | + private Publisher<Payload> handleRequestResponse(final Payload payload) { |
| 135 | + return Px.create(subscriber -> { |
| 136 | + int streamId = nextStreamId(); |
| 137 | + final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1); |
160 | 138 | synchronized (this) { |
161 | | - receivers.remove(streamId); |
162 | | - senders.remove(streamId); |
| 139 | + @SuppressWarnings("rawtypes") |
| 140 | + Subscriber raw = subscriber; |
| 141 | + @SuppressWarnings("unchecked") |
| 142 | + Subscriber<Frame> fs = raw; |
| 143 | + receivers.put(streamId, fs); |
163 | 144 | } |
164 | | - }; |
165 | | - |
166 | | - return Px |
167 | | - .<Payload>create(subscriber -> { |
168 | | - @SuppressWarnings("rawtypes") |
169 | | - Subscriber raw = subscriber; |
170 | | - @SuppressWarnings("unchecked") |
171 | | - Subscriber<Frame> fs = raw; |
172 | | - synchronized (this) { |
173 | | - receivers.put(streamId, fs); |
174 | | - } |
175 | | - |
176 | | - payload.subscribe(sender); |
177 | | - |
178 | | - subscriber.onSubscribe(new Subscription() { |
179 | | - |
180 | | - @Override |
181 | | - public void request(long n) { |
182 | | - if (sendRequestN) { |
183 | | - sender.onNext(Frame.RequestN.from(streamId, n)); |
184 | | - } |
185 | | - } |
186 | | - |
187 | | - @Override |
188 | | - public void cancel() { |
189 | | - sender.onNext(Frame.Cancel.from(streamId)); |
190 | | - sender.cancel(); |
191 | | - } |
192 | | - }); |
193 | | - |
194 | | - Px.from(connection.send(sender)) |
195 | | - .doOnError(th -> subscriber.onError(th)) |
196 | | - .subscribe(DefaultSubscriber.defaultInstance()); |
197 | | - |
198 | | - }) |
199 | | - .doOnRequest(subscription -> sender.start(initialRequestN)) |
200 | | - .doOnTerminate(cleanup); |
| 145 | + Px.concatEmpty(connection.sendOne(requestFrame), Px.never()) |
| 146 | + .cast(Payload.class) |
| 147 | + .doOnCancel(() -> { |
| 148 | + if (connection.availability() > 0.0) { |
| 149 | + connection.sendOne(Frame.Cancel.from(streamId)) |
| 150 | + .subscribe(DefaultSubscriber.defaultInstance()); |
| 151 | + } |
| 152 | + }) |
| 153 | + .subscribe(subscriber); |
| 154 | + }); |
201 | 155 | } |
202 | 156 |
|
203 | | - private Publisher<Payload> handleStreamResponse(Publisher<Frame> request, final int streamId) { |
204 | | - RemoteSender sender = new RemoteSender(request, () -> senders.remove(streamId), streamId, 1); |
205 | | - Publisher<Frame> src = s -> { |
206 | | - CancellableSubscriber<Void> sendSub = doOnError(throwable -> { |
207 | | - s.onError(throwable); |
208 | | - }); |
209 | | - ValidatingSubscription<? super Frame> sub = ValidatingSubscription.create(s, () -> { |
210 | | - sendSub.cancel(); |
211 | | - }, requestN -> { |
212 | | - transportReceiveSubscription.request(requestN); |
213 | | - }); |
214 | | - connection.send(sender).subscribe(sendSub); |
215 | | - s.onSubscribe(sub); |
216 | | - }; |
217 | | - |
218 | | - RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, () -> receivers.remove(streamId), true); |
219 | | - senders.put(streamId, sender); |
220 | | - receivers.put(streamId, receiver); |
221 | | - return receiver; |
| 157 | + private Publisher<Payload> handleStreamResponse(Px<Payload> request, FrameType requestType) { |
| 158 | + return Px.defer(() -> { |
| 159 | + int streamId = nextStreamId(); |
| 160 | + RemoteSender sender = new RemoteSender(request.map(payload -> Frame.Request.from(streamId, requestType, |
| 161 | + payload, 1)), |
| 162 | + removeSenderLambda(streamId), 1); |
| 163 | + Publisher<Frame> src = s -> { |
| 164 | + CancellableSubscriber<Void> sendSub = doOnError(throwable -> { |
| 165 | + s.onError(throwable); |
| 166 | + }); |
| 167 | + ValidatingSubscription<? super Frame> sub = ValidatingSubscription.create(s, () -> { |
| 168 | + sendSub.cancel(); |
| 169 | + }, requestN -> { |
| 170 | + transportReceiveSubscription.request(requestN); |
| 171 | + }); |
| 172 | + connection.send(sender).subscribe(sendSub); |
| 173 | + s.onSubscribe(sub); |
| 174 | + }; |
| 175 | + |
| 176 | + RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, removeReceiverLambda(streamId), |
| 177 | + true); |
| 178 | + registerSenderReceiver(streamId, sender, receiver); |
| 179 | + return receiver; |
| 180 | + }); |
222 | 181 | } |
223 | 182 |
|
224 | 183 | private void startKeepAlive() { |
@@ -291,10 +250,16 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { |
291 | 250 | switch (type) { |
292 | 251 | case ERROR: |
293 | 252 | receiver.onError(Exceptions.from(frame)); |
| 253 | + synchronized (this) { |
| 254 | + receivers.remove(streamId); |
| 255 | + } |
294 | 256 | break; |
295 | 257 | case NEXT_COMPLETE: |
296 | 258 | receiver.onNext(frame); |
297 | 259 | receiver.onComplete(); |
| 260 | + synchronized (this) { |
| 261 | + receivers.remove(streamId); |
| 262 | + } |
298 | 263 | break; |
299 | 264 | case CANCEL: { |
300 | 265 | Subscription sender; |
@@ -324,6 +289,9 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { |
324 | 289 | } |
325 | 290 | case COMPLETE: |
326 | 291 | receiver.onComplete(); |
| 292 | + synchronized (this) { |
| 293 | + receivers.remove(streamId); |
| 294 | + } |
327 | 295 | break; |
328 | 296 | default: |
329 | 297 | throw new IllegalStateException( |
@@ -360,5 +328,28 @@ private static String getByteBufferAsString(ByteBuffer bb) { |
360 | 328 | return new String(bytes, StandardCharsets.UTF_8); |
361 | 329 | } |
362 | 330 |
|
| 331 | + private Runnable removeReceiverLambda(int streamId) { |
| 332 | + return () -> { |
| 333 | + removeReceiver(streamId); |
| 334 | + }; |
| 335 | + } |
363 | 336 |
|
| 337 | + private synchronized void removeReceiver(int streamId) { |
| 338 | + receivers.remove(streamId); |
| 339 | + } |
| 340 | + |
| 341 | + private Runnable removeSenderLambda(int streamId) { |
| 342 | + return () -> { |
| 343 | + removeSender(streamId); |
| 344 | + }; |
| 345 | + } |
| 346 | + |
| 347 | + private synchronized void removeSender(int streamId) { |
| 348 | + senders.remove(streamId); |
| 349 | + } |
| 350 | + |
| 351 | + private synchronized void registerSenderReceiver(int streamId, Subscription sender, Subscriber<Frame> receiver) { |
| 352 | + senders.put(streamId, sender); |
| 353 | + receivers.put(streamId, receiver); |
| 354 | + } |
364 | 355 | } |
0 commit comments