2727import org .reactivestreams .Subscriber ;
2828import org .reactivestreams .Subscription ;
2929import reactor .core .Disposable ;
30- import reactor .core .publisher .*;
30+ import reactor .core .publisher .Flux ;
31+ import reactor .core .publisher .Mono ;
32+ import reactor .core .publisher .SignalType ;
33+ import reactor .core .publisher .UnicastProcessor ;
3134
3235import java .util .Collections ;
3336import java .util .Map ;
3942import static io .rsocket .frame .FrameHeaderFlyweight .FLAGS_M ;
4043
4144/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
42- class RSocketServer implements RSocket {
45+ class RSocketServer implements ResponderRSocket {
4346
4447 private final DuplexConnection connection ;
4548 private final RSocket requestHandler ;
49+ private final ResponderRSocket responderRSocket ;
4650 private final Function <Frame , ? extends Payload > frameDecoder ;
4751 private final Consumer <Throwable > errorConsumer ;
4852
4953 private final Map <Integer , Subscription > sendingSubscriptions ;
50- private final Map <Integer , Processor <Payload ,Payload >> channelProcessors ;
54+ private final Map <Integer , Processor <Payload , Payload >> channelProcessors ;
5155
5256 private final UnboundedProcessor <Frame > sendProcessor ;
5357 private KeepAliveHandler keepAliveHandler ;
@@ -69,12 +73,16 @@ class RSocketServer implements RSocket {
6973 Consumer <Throwable > errorConsumer ,
7074 long tickPeriod ,
7175 long ackTimeout ) {
72- this . connection = connection ;
76+
7377 this .requestHandler = requestHandler ;
78+ this .responderRSocket =
79+ (requestHandler instanceof ResponderRSocket ) ? (ResponderRSocket ) requestHandler : null ;
80+
81+ this .connection = connection ;
7482 this .frameDecoder = frameDecoder ;
7583 this .errorConsumer = errorConsumer ;
7684 this .sendingSubscriptions = Collections .synchronizedMap (new IntObjectHashMap <>());
77- this .channelProcessors = Collections .synchronizedMap (new IntObjectHashMap <>());
85+ this .channelProcessors = Collections .synchronizedMap (new IntObjectHashMap <>());
7886
7987 // DO NOT Change the order here. The Send processor must be subscribed to before receiving
8088 // connections
@@ -116,43 +124,55 @@ class RSocketServer implements RSocket {
116124 }
117125
118126 private void handleSendProcessorError (Throwable t ) {
119- sendingSubscriptions .values ().forEach (subscription -> {
120- try {
121- subscription .cancel ();
122- } catch (Throwable e ) {
123- errorConsumer .accept (e );
124- }
125- });
127+ sendingSubscriptions
128+ .values ()
129+ .forEach (
130+ subscription -> {
131+ try {
132+ subscription .cancel ();
133+ } catch (Throwable e ) {
134+ errorConsumer .accept (e );
135+ }
136+ });
126137
127- channelProcessors .values ().forEach (subscription -> {
128- try {
129- subscription .onError (t );
130- } catch (Throwable e ) {
131- errorConsumer .accept (e );
132- }
133- });
138+ channelProcessors
139+ .values ()
140+ .forEach (
141+ subscription -> {
142+ try {
143+ subscription .onError (t );
144+ } catch (Throwable e ) {
145+ errorConsumer .accept (e );
146+ }
147+ });
134148 }
135149
136150 private void handleSendProcessorCancel (SignalType t ) {
137151 if (SignalType .ON_ERROR == t ) {
138152 return ;
139153 }
140154
141- sendingSubscriptions .values ().forEach (subscription -> {
142- try {
143- subscription .cancel ();
144- } catch (Throwable e ) {
145- errorConsumer .accept (e );
146- }
147- });
155+ sendingSubscriptions
156+ .values ()
157+ .forEach (
158+ subscription -> {
159+ try {
160+ subscription .cancel ();
161+ } catch (Throwable e ) {
162+ errorConsumer .accept (e );
163+ }
164+ });
148165
149- channelProcessors .values ().forEach (subscription -> {
150- try {
151- subscription .onComplete ();
152- } catch (Throwable e ) {
153- errorConsumer .accept (e );
154- }
155- });
166+ channelProcessors
167+ .values ()
168+ .forEach (
169+ subscription -> {
170+ try {
171+ subscription .onComplete ();
172+ } catch (Throwable e ) {
173+ errorConsumer .accept (e );
174+ }
175+ });
156176 }
157177
158178 @ Override
@@ -191,6 +211,15 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
191211 }
192212 }
193213
214+ @ Override
215+ public Flux <Payload > requestChannel (Payload payload , Publisher <Payload > payloads ) {
216+ try {
217+ return responderRSocket .requestChannel (payload , payloads );
218+ } catch (Throwable t ) {
219+ return Flux .error (t );
220+ }
221+ }
222+
194223 @ Override
195224 public Mono <Void > metadataPush (Payload payload ) {
196225 try {
@@ -232,9 +261,7 @@ private synchronized void cleanUpSendingSubscriptions() {
232261 }
233262
234263 private synchronized void cleanUpChannelProcessors () {
235- channelProcessors
236- .values ()
237- .forEach (Processor ::onComplete );
264+ channelProcessors .values ().forEach (Processor ::onComplete );
238265 channelProcessors .clear ();
239266 }
240267
@@ -381,7 +408,11 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
381408 // and any later payload can be processed
382409 frames .onNext (payload );
383410
384- handleStream (streamId , requestChannel (payloads ), initialRequestN );
411+ if (responderRSocket != null ) {
412+ handleStream (streamId , requestChannel (payload , payloads ), initialRequestN );
413+ } else {
414+ handleStream (streamId , requestChannel (payloads ), initialRequestN );
415+ }
385416 }
386417
387418 private void handleKeepAliveFrame (Frame frame ) {
0 commit comments