1616
1717package io .rsocket ;
1818
19+ import static io .rsocket .keepalive .KeepAliveSupport .ClientKeepAliveSupport ;
20+ import static io .rsocket .keepalive .KeepAliveSupport .KeepAlive ;
21+
1922import io .netty .buffer .ByteBuf ;
2023import io .netty .buffer .ByteBufAllocator ;
2124import io .netty .util .ReferenceCountUtil ;
2225import io .netty .util .collection .IntObjectHashMap ;
26+ import io .rsocket .exceptions .ConnectionErrorException ;
2327import io .rsocket .exceptions .Exceptions ;
2428import io .rsocket .frame .*;
2529import io .rsocket .frame .decoder .PayloadDecoder ;
2630import io .rsocket .internal .LimitableRequestPublisher ;
2731import io .rsocket .internal .UnboundedProcessor ;
2832import io .rsocket .internal .UnicastMonoProcessor ;
33+ import io .rsocket .keepalive .KeepAliveFramesAcceptor ;
34+ import io .rsocket .keepalive .KeepAliveHandler ;
35+ import io .rsocket .keepalive .KeepAliveSupport ;
2936import java .nio .channels .ClosedChannelException ;
3037import java .util .Collections ;
3138import java .util .Map ;
3643import org .reactivestreams .Processor ;
3744import org .reactivestreams .Publisher ;
3845import org .reactivestreams .Subscriber ;
39- import reactor .core .publisher .BaseSubscriber ;
40- import reactor .core .publisher .Flux ;
41- import reactor .core .publisher .Mono ;
42- import reactor .core .publisher .SignalType ;
43- import reactor .core .publisher .UnicastProcessor ;
46+ import reactor .core .publisher .*;
4447
4548/** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */
4649class RSocketClient implements RSocket {
@@ -54,14 +57,18 @@ class RSocketClient implements RSocket {
5457 private final UnboundedProcessor <ByteBuf > sendProcessor ;
5558 private final Lifecycle lifecycle = new Lifecycle ();
5659 private final ByteBufAllocator allocator ;
60+ private final KeepAliveFramesAcceptor keepAliveFramesAcceptor ;
5761
5862 /*client requester*/
5963 RSocketClient (
6064 ByteBufAllocator allocator ,
6165 DuplexConnection connection ,
6266 PayloadDecoder payloadDecoder ,
6367 Consumer <Throwable > errorConsumer ,
64- StreamIdSupplier streamIdSupplier ) {
68+ StreamIdSupplier streamIdSupplier ,
69+ int keepAliveTickPeriod ,
70+ int keepAliveAckTimeout ,
71+ KeepAliveHandler keepAliveHandler ) {
6572 this .allocator = allocator ;
6673 this .connection = connection ;
6774 this .payloadDecoder = payloadDecoder ;
@@ -74,19 +81,40 @@ class RSocketClient implements RSocket {
7481 this .sendProcessor = new UnboundedProcessor <>();
7582
7683 connection .onClose ().doFinally (signalType -> terminate ()).subscribe (null , errorConsumer );
77-
78- sendProcessor
79- .doOnRequest (
80- r -> {
81- for (LimitableRequestPublisher lrp : senders .values ()) {
82- lrp .increaseInternalLimit (r );
83- }
84- })
85- .transform (connection ::send )
84+ connection
85+ .send (sendProcessor )
8686 .doFinally (this ::handleSendProcessorCancel )
8787 .subscribe (null , this ::handleSendProcessorError );
8888
8989 connection .receive ().subscribe (this ::handleIncomingFrames , errorConsumer );
90+
91+ if (keepAliveTickPeriod != 0 && keepAliveHandler != null ) {
92+ KeepAliveSupport keepAliveSupport =
93+ new ClientKeepAliveSupport (allocator , keepAliveTickPeriod , keepAliveAckTimeout );
94+ this .keepAliveFramesAcceptor =
95+ keepAliveHandler .start (keepAliveSupport , sendProcessor ::onNext , this ::terminate );
96+ } else {
97+ keepAliveFramesAcceptor = null ;
98+ }
99+ }
100+
101+ /*server requester*/
102+ RSocketClient (
103+ ByteBufAllocator allocator ,
104+ DuplexConnection connection ,
105+ PayloadDecoder payloadDecoder ,
106+ Consumer <Throwable > errorConsumer ,
107+ StreamIdSupplier streamIdSupplier ) {
108+ this (allocator , connection , payloadDecoder , errorConsumer , streamIdSupplier , 0 , 0 , null );
109+ }
110+
111+ private void terminate (KeepAlive keepAlive ) {
112+ String message =
113+ String .format ("No keep-alive acks for %d ms" , keepAlive .getTimeout ().toMillis ());
114+ ConnectionErrorException err = new ConnectionErrorException (message );
115+ lifecycle .setTerminationError (err );
116+ errorConsumer .accept (err );
117+ connection .dispose ();
90118 }
91119
92120 private void handleSendProcessorError (Throwable t ) {
@@ -294,7 +322,7 @@ public void accept(long n) {
294322 .transform (
295323 f -> {
296324 LimitableRequestPublisher <Payload > wrapped =
297- LimitableRequestPublisher .wrap (f , sendProcessor . available () );
325+ LimitableRequestPublisher .wrap (f );
298326 // Need to set this to one for first the frame
299327 wrapped .request (1 );
300328 senders .put (streamId , wrapped );
@@ -452,8 +480,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) {
452480 case LEASE :
453481 break ;
454482 case KEEPALIVE :
455- // KeepAlive is handled by corresponding connection interceptor,
456- // just release its frame here
483+ if (keepAliveFramesAcceptor != null ) {
484+ keepAliveFramesAcceptor .receive (frame );
485+ }
457486 break ;
458487 default :
459488 // Ignore unknown frames. Throwing an error will close the socket.
0 commit comments