3636import io .rsocket .lease .ResponderLeaseHandler ;
3737import java .util .function .Consumer ;
3838import java .util .function .LongConsumer ;
39+ import javax .annotation .Nullable ;
3940import org .reactivestreams .Processor ;
4041import org .reactivestreams .Publisher ;
4142import org .reactivestreams .Subscriber ;
@@ -302,7 +303,7 @@ private void handleFrame(ByteBuf frame) {
302303 case REQUEST_STREAM :
303304 int streamInitialRequestN = RequestStreamFrameFlyweight .initialRequestN (frame );
304305 Payload streamPayload = payloadDecoder .apply (frame );
305- handleStream (streamId , requestStream (streamPayload ), streamInitialRequestN );
306+ handleStream (streamId , requestStream (streamPayload ), streamInitialRequestN , null );
306307 break ;
307308 case REQUEST_CHANNEL :
308309 int channelInitialRequestN = RequestChannelFrameFlyweight .initialRequestN (frame );
@@ -433,7 +434,11 @@ protected void hookFinally(SignalType type) {
433434 response .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER ).subscribe (subscriber );
434435 }
435436
436- private void handleStream (int streamId , Flux <Payload > response , int initialRequestN ) {
437+ private void handleStream (
438+ int streamId ,
439+ Flux <Payload > response ,
440+ int initialRequestN ,
441+ @ Nullable UnicastProcessor <Payload > requestChannel ) {
437442 final BaseSubscriber <Payload > subscriber =
438443 new BaseSubscriber <Payload >() {
439444
@@ -446,6 +451,17 @@ protected void hookOnSubscribe(Subscription s) {
446451 protected void hookOnNext (Payload payload ) {
447452 if (!PayloadValidationUtils .isValid (mtu , payload )) {
448453 payload .release ();
454+ // specifically for requestChannel case so when Payload is invalid we will not be
455+ // sending CancelFrame and ErrorFrame
456+ // Note: CancelFrame is redundant and due to spec
457+ // (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
458+ // Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
459+ // terminated on both Requester and Responder.
460+ // Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
461+ // terminated on both the Requester and Responder.
462+ if (requestChannel != null ) {
463+ channelProcessors .remove (streamId , requestChannel );
464+ }
449465 cancel ();
450466 final IllegalArgumentException t =
451467 new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
@@ -495,9 +511,6 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
495511
496512 Flux <Payload > payloads =
497513 frames
498- .doOnCancel (
499- () -> sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId )))
500- .doOnError (t -> handleError (streamId , t ))
501514 .doOnRequest (
502515 new LongConsumer () {
503516 boolean first = true ;
@@ -511,10 +524,19 @@ public void accept(long l) {
511524 } else {
512525 n = l ;
513526 }
514- sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
527+ if (n > 0 ) {
528+ sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
529+ }
530+ }
531+ })
532+ .doFinally (
533+ signalType -> {
534+ if (channelProcessors .remove (streamId , frames )) {
535+ if (signalType == SignalType .CANCEL ) {
536+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
537+ }
515538 }
516539 })
517- .doFinally (signalType -> channelProcessors .remove (streamId ))
518540 .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
519541
520542 // not chained, as the payload should be enqueued in the Unicast processor before this method
@@ -523,9 +545,9 @@ public void accept(long l) {
523545 frames .onNext (payload );
524546
525547 if (responderRSocket != null ) {
526- handleStream (streamId , requestChannel (payload , payloads ), initialRequestN );
548+ handleStream (streamId , requestChannel (payload , payloads ), initialRequestN , frames );
527549 } else {
528- handleStream (streamId , requestChannel (payloads ), initialRequestN );
550+ handleStream (streamId , requestChannel (payloads ), initialRequestN , frames );
529551 }
530552 }
531553
0 commit comments