1616
1717package io .rsocket ;
1818
19- import static io .rsocket .util .ExceptionUtil .noStacktrace ;
20-
2119import io .netty .buffer .Unpooled ;
22- import io .netty .util .collection .IntObjectHashMap ;
2320import io .rsocket .exceptions .ConnectionException ;
2421import io .rsocket .exceptions .Exceptions ;
2522import io .rsocket .internal .LimitableRequestPublisher ;
2623import io .rsocket .internal .UnboundedProcessor ;
27- import java .nio .channels .ClosedChannelException ;
24+ import io .rsocket .util .NonBlockingHashMapLong ;
25+ import org .reactivestreams .Publisher ;
26+ import org .reactivestreams .Subscriber ;
27+ import reactor .core .Disposable ;
28+ import reactor .core .publisher .*;
29+
30+ import javax .annotation .Nullable ;
2831import java .time .Duration ;
2932import java .util .Collection ;
3033import java .util .concurrent .atomic .AtomicBoolean ;
3134import java .util .concurrent .atomic .AtomicInteger ;
3235import java .util .function .Consumer ;
3336import java .util .function .Function ;
3437import java .util .function .Supplier ;
35- import javax .annotation .Nullable ;
36- import org .reactivestreams .Publisher ;
37- import org .reactivestreams .Subscriber ;
38- import reactor .core .Disposable ;
39- import reactor .core .publisher .*;
4038
4139/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
4240class RSocketClient implements RSocket {
4341
44- private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION =
45- noStacktrace (new ClosedChannelException ());
46-
4742 private final DuplexConnection connection ;
4843 private final Function <Frame , ? extends Payload > frameDecoder ;
4944 private final Consumer <Throwable > errorConsumer ;
5045 private final StreamIdSupplier streamIdSupplier ;
5146 private final MonoProcessor <Void > started ;
52- private final IntObjectHashMap <LimitableRequestPublisher > senders ;
53- private final IntObjectHashMap < Subscriber <Payload >> receivers ;
47+ private final NonBlockingHashMapLong <LimitableRequestPublisher > senders ;
48+ private final NonBlockingHashMapLong < UnicastProcessor <Payload >> receivers ;
5449 private final AtomicInteger missedAckCounter ;
5550
5651 private final UnboundedProcessor <Frame > sendProcessor ;
@@ -80,8 +75,8 @@ class RSocketClient implements RSocket {
8075 this .errorConsumer = errorConsumer ;
8176 this .streamIdSupplier = streamIdSupplier ;
8277 this .started = MonoProcessor .create ();
83- this .senders = new IntObjectHashMap <>(256 , 0.9f );
84- this .receivers = new IntObjectHashMap <>(256 , 0.9f );
78+ this .senders = new NonBlockingHashMapLong <>(256 );
79+ this .receivers = new NonBlockingHashMapLong <>(256 );
8580 this .missedAckCounter = new AtomicInteger ();
8681
8782 // DO NOT Change the order here. The Send processor must be subscribed to before receiving
@@ -127,7 +122,7 @@ class RSocketClient implements RSocket {
127122 }
128123
129124 private void handleSendProcessorError (Throwable t ) {
130- Collection <Subscriber <Payload >> values ;
125+ Collection <UnicastProcessor <Payload >> values ;
131126 Collection <LimitableRequestPublisher > values1 ;
132127 synchronized (RSocketClient .this ) {
133128 values = receivers .values ();
@@ -151,7 +146,7 @@ private void handleSendProcessorCancel(SignalType t) {
151146 if (SignalType .ON_ERROR == t ) {
152147 return ;
153148 }
154- Collection <Subscriber <Payload >> values ;
149+ Collection <UnicastProcessor <Payload >> values ;
155150 Collection <LimitableRequestPublisher > values1 ;
156151 synchronized (RSocketClient .this ) {
157152 values = receivers .values ();
@@ -222,10 +217,15 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
222217
223218 @ Override
224219 public Mono <Void > metadataPush (Payload payload ) {
225- final Frame requestFrame = Frame .Request .from (0 , FrameType .METADATA_PUSH , payload , 1 );
226- payload .release ();
227- sendProcessor .onNext (requestFrame );
228- return Mono .empty ();
220+ Mono <Void > defer =
221+ Mono .fromRunnable (
222+ () -> {
223+ final Frame requestFrame = Frame .Request .from (0 , FrameType .METADATA_PUSH , payload , 1 );
224+ payload .release ();
225+ sendProcessor .onNext (requestFrame );
226+ });
227+
228+ return started .then (defer );
229229 }
230230
231231 @ Override
@@ -303,7 +303,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
303303 Frame .Request .from (streamId , FrameType .REQUEST_RESPONSE , payload , 1 );
304304 payload .release ();
305305
306- MonoProcessor <Payload > receiver = MonoProcessor .create ();
306+ UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
307307
308308 synchronized (this ) {
309309 receivers .put (streamId , receiver );
@@ -312,6 +312,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
312312 sendProcessor .onNext (requestFrame );
313313
314314 return receiver
315+ .singleOrEmpty ()
315316 .doOnError (t -> sendProcessor .onNext (Frame .Error .from (streamId , t )))
316317 .doOnCancel (() -> sendProcessor .onNext (Frame .Cancel .from (streamId )))
317318 .doFinally (
@@ -438,7 +439,7 @@ private boolean contains(int streamId) {
438439
439440 protected void cleanup () {
440441 try {
441- Collection <Subscriber <Payload >> subscribers ;
442+ Collection <UnicastProcessor <Payload >> subscribers ;
442443 Collection <LimitableRequestPublisher > publishers ;
443444 synchronized (RSocketClient .this ) {
444445 subscribers = receivers .values ();
@@ -468,9 +469,9 @@ private synchronized void cleanUpLimitableRequestPublisher(
468469 }
469470 }
470471
471- private synchronized void cleanUpSubscriber (Subscriber <?> subscriber ) {
472+ private synchronized void cleanUpSubscriber (UnicastProcessor <?> subscriber ) {
472473 try {
473- subscriber .onError ( CLOSED_CHANNEL_EXCEPTION );
474+ subscriber .cancel ( );
474475 } catch (Throwable t ) {
475476 errorConsumer .accept (t );
476477 }
0 commit comments