1818
1919import io .netty .buffer .ByteBuf ;
2020import io .netty .buffer .ByteBufAllocator ;
21+ import io .netty .util .CharsetUtil ;
2122import io .rsocket .DuplexConnection ;
2223import io .rsocket .exceptions .ConnectionErrorException ;
2324import io .rsocket .exceptions .Exceptions ;
@@ -54,6 +55,7 @@ public class ClientRSocketSession
5455 final Retry retry ;
5556 final boolean cleanupStoreOnKeepAlive ;
5657 final ByteBuf resumeToken ;
58+ final String session ;
5759
5860 volatile Subscription s ;
5961 static final AtomicReferenceFieldUpdater <ClientRSocketSession , Subscription > S =
@@ -71,20 +73,30 @@ public ClientRSocketSession(
7173 Retry retry ,
7274 boolean cleanupStoreOnKeepAlive ) {
7375 this .resumeToken = resumeToken ;
76+ this .session = resumeToken .toString (CharsetUtil .UTF_8 );
7477 this .connectionFactory =
7578 connectionFactory .flatMap (
7679 dc -> {
80+ final long impliedPosition = resumableFramesStore .frameImpliedPosition ();
81+ final long position = resumableFramesStore .framePosition ();
7782 dc .sendFrame (
7883 0 ,
7984 ResumeFrameCodec .encode (
8085 dc .alloc (),
8186 resumeToken .retain (),
8287 // server uses this to release its cache
83- resumableFramesStore . frameImpliedPosition () , // observed on the client side
88+ impliedPosition , // observed on the client side
8489 // server uses this to check whether there is no mismatch
85- resumableFramesStore . framePosition () // sent from the client sent
90+ position // sent from the client sent
8691 ));
87- logger .debug ("Resume Frame has been sent" );
92+
93+ if (logger .isDebugEnabled ()) {
94+ logger .debug (
95+ "Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent." ,
96+ session ,
97+ impliedPosition ,
98+ position );
99+ }
88100
89101 return connectionTransformer .apply (dc );
90102 });
@@ -105,7 +117,12 @@ void reconnect(int index) {
105117 if (this .s == Operators .cancelledSubscription ()
106118 && S .compareAndSet (this , Operators .cancelledSubscription (), null )) {
107119 keepAliveSupport .stop ();
108- logger .debug ("Connection[" + index + "] is lost. Reconnecting to resume..." );
120+ if (logger .isDebugEnabled ()) {
121+ logger .debug (
122+ "Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume..." ,
123+ session ,
124+ index );
125+ }
109126 connectionFactory .retryWhen (retry ).timeout (resumeSessionDuration ).subscribe (this );
110127 }
111128 }
@@ -155,21 +172,30 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
155172 DuplexConnection nextDuplexConnection = tuple2 .getT2 ();
156173
157174 if (!Operators .terminate (S , this )) {
158- logger .debug ("Session has already been expired. Terminating received connection" );
175+ if (logger .isDebugEnabled ()) {
176+ logger .debug (
177+ "Side[client]|Session[{}]. Session has already been expired. Terminating received connection" ,
178+ session );
179+ }
159180 final ConnectionErrorException connectionErrorException =
160181 new ConnectionErrorException ("resumption_server=[Session Expired]" );
161182 nextDuplexConnection .sendErrorAndClose (connectionErrorException );
183+ nextDuplexConnection .receive ().subscribe ().dispose ();
162184 return ;
163185 }
164186
165187 final int streamId = FrameHeaderCodec .streamId (shouldBeResumeOKFrame );
166188 if (streamId != 0 ) {
167- logger .debug (
168- "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection" );
169- resumableConnection .dispose ();
189+ if (logger .isDebugEnabled ()) {
190+ logger .debug (
191+ "Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection" ,
192+ session );
193+ }
170194 final ConnectionErrorException connectionErrorException =
171195 new ConnectionErrorException ("RESUME_OK frame must be received before any others" );
196+ resumableConnection .dispose (connectionErrorException );
172197 nextDuplexConnection .sendErrorAndClose (connectionErrorException );
198+ nextDuplexConnection .receive ().subscribe ().dispose ();
173199 return ;
174200 }
175201
@@ -183,7 +209,8 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
183209 final long position = resumableFramesStore .framePosition ();
184210 final long impliedPosition = resumableFramesStore .frameImpliedPosition ();
185211 logger .debug (
186- "ResumeOK FRAME received. ServerResumeState{observedFramesPosition[{}]}. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}" ,
212+ "Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]" ,
213+ session ,
187214 remoteImpliedPos ,
188215 impliedPosition ,
189216 position );
@@ -194,42 +221,54 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
194221 }
195222 } catch (IllegalStateException e ) {
196223 logger .debug ("Exception occurred while releasing frames in the frameStore" , e );
197- resumableConnection .dispose ();
224+ resumableConnection .dispose (e );
198225 final ConnectionErrorException t = new ConnectionErrorException (e .getMessage (), e );
199226 nextDuplexConnection .sendErrorAndClose (t );
227+ nextDuplexConnection .receive ().subscribe ().dispose ();
200228 return ;
201229 }
202230
203231 if (resumableConnection .connect (nextDuplexConnection )) {
204232 keepAliveSupport .start ();
205- logger .debug ("Session has been resumed successfully" );
233+ if (logger .isDebugEnabled ()) {
234+ logger .debug (
235+ "Side[client]|Session[{}]. Session has been resumed successfully" , session );
236+ }
206237 } else {
207- logger .debug ("Session has already been expired. Terminating received connection" );
238+ if (logger .isDebugEnabled ()) {
239+ logger .debug (
240+ "Side[client]|Session[{}]. Session has already been expired. Terminating received connection" ,
241+ session );
242+ }
208243 final ConnectionErrorException connectionErrorException =
209244 new ConnectionErrorException ("resumption_server_pos=[Session Expired]" );
210245 nextDuplexConnection .sendErrorAndClose (connectionErrorException );
246+ nextDuplexConnection .receive ().subscribe ().dispose ();
211247 }
212248 } else {
213249 logger .debug (
214- "Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection" ,
250+ "Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection" ,
251+ session ,
215252 remoteImpliedPos ,
216253 position );
217- resumableConnection .dispose ();
218254 final ConnectionErrorException connectionErrorException =
219255 new ConnectionErrorException ("resumption_server_pos=[" + remoteImpliedPos + "]" );
256+ resumableConnection .dispose (connectionErrorException );
220257 nextDuplexConnection .sendErrorAndClose (connectionErrorException );
258+ nextDuplexConnection .receive ().subscribe ().dispose ();
221259 }
222260 } else if (frameType == FrameType .ERROR ) {
223261 final RuntimeException exception = Exceptions .from (0 , shouldBeResumeOKFrame );
224262 logger .debug ("Received error frame. Terminating received connection" , exception );
225- resumableConnection .dispose ();
263+ resumableConnection .dispose (exception );
226264 } else {
227265 logger .debug (
228266 "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection" );
229- resumableConnection .dispose ();
230267 final ConnectionErrorException connectionErrorException =
231268 new ConnectionErrorException ("RESUME_OK frame must be received before any others" );
269+ resumableConnection .dispose (connectionErrorException );
232270 nextDuplexConnection .sendErrorAndClose (connectionErrorException );
271+ nextDuplexConnection .receive ().subscribe ().dispose ();
233272 }
234273 }
235274
@@ -239,7 +278,7 @@ public void onError(Throwable t) {
239278 Operators .onErrorDropped (t , currentContext ());
240279 }
241280
242- resumableConnection .dispose ();
281+ resumableConnection .dispose (t );
243282 }
244283
245284 @ Override
0 commit comments