33
33
import java .util .Collections ;
34
34
import java .util .List ;
35
35
import java .util .concurrent .CompletableFuture ;
36
+ import java .util .concurrent .Executors ;
37
+ import java .util .concurrent .ScheduledExecutorService ;
36
38
import java .util .concurrent .TimeUnit ;
39
+ import java .util .logging .Level ;
40
+ import java .util .logging .Logger ;
37
41
import org .junit .jupiter .api .BeforeEach ;
38
42
import org .junit .jupiter .api .Test ;
39
43
import org .mockito .Mockito ;
40
44
import software .amazon .awssdk .http .Protocol ;
41
45
import software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey ;
46
+ import software .amazon .awssdk .http .nio .netty .internal .utils .NettyClientLogger ;
42
47
43
48
public class Http2PingHandlerTest {
49
+ private static final NettyClientLogger log = NettyClientLogger .getLogger (Http2PingHandler .class );
50
+
44
51
private static final int FAST_CHECKER_DURATION_MILLIS = 100 ;
45
52
46
53
private Http2PingHandler fastChecker ;
@@ -109,15 +116,21 @@ public void unregister_stopsRunning() throws InterruptedException {
109
116
}
110
117
111
118
@ Test
112
- public void ignoredPingsResultInOneChannelException () throws InterruptedException {
119
+ public void schedulingDelayDoesNotCausePingTimeout () throws InterruptedException {
113
120
PipelineExceptionCatcher catcher = new PipelineExceptionCatcher ();
114
- EmbeddedChannel channel = createHttp2Channel (fastChecker , catcher );
115
-
116
- Thread .sleep (FAST_CHECKER_DURATION_MILLIS );
121
+ PingResponder pingResponder = new PingResponder ();
122
+ EmbeddedChannel channel = createHttp2Channel (fastChecker , catcher , pingResponder );
123
+
124
+ pingResponder .setCallback (() -> channel .writeInbound (new DefaultHttp2PingFrame (0 , true )),
125
+ (long )(FAST_CHECKER_DURATION_MILLIS / 10 ) /* send ack 10ms after getting ping */ );
126
+
117
127
channel .runPendingTasks ();
118
128
119
- assertThat (catcher .caughtExceptions ).hasSize (1 );
120
- assertThat (catcher .caughtExceptions .get (0 )).isInstanceOf (IOException .class );
129
+ // cause a scheduling delay for the timer to run
130
+ Thread .sleep (FAST_CHECKER_DURATION_MILLIS * 2 );
131
+ channel .runPendingTasks ();
132
+
133
+ assertThat (catcher .caughtExceptions ).hasSize (0 );
121
134
}
122
135
123
136
@ Test
@@ -214,6 +227,51 @@ public void channelInactive_shouldCancelTaskAndForwardToOtherHandlers() {
214
227
assertThat (channel .runScheduledPendingTasks ()).isEqualTo (-1L );
215
228
}
216
229
230
+ @ Test
231
+ public void delayedPingFlushDoesntTerminateConnectionPrematurely () {
232
+ Logger .getLogger ("" ).setLevel (Level .ALL );
233
+
234
+ PipelineExceptionCatcher catcher = new PipelineExceptionCatcher ();
235
+ PingResponder pingResponder = new PingResponder ();
236
+ DelayingWriter delayingWriter = new DelayingWriter ((long )(FAST_CHECKER_DURATION_MILLIS * 1.5 ));
237
+
238
+ EmbeddedChannel channel = createHttp2Channel (fastChecker , catcher , pingResponder , delayingWriter );
239
+
240
+ pingResponder .setCallback (() -> channel .writeInbound (new DefaultHttp2PingFrame (0 , true )),
241
+ FAST_CHECKER_DURATION_MILLIS / 10 /* send ack in 10 ms after getting ping*/ );
242
+
243
+ Instant runEnd = Instant .now ().plus (1 , SECONDS );
244
+ while (Instant .now ().isBefore (runEnd )) {
245
+ channel .runPendingTasks ();
246
+ assertThat (catcher .caughtExceptions ).isEmpty ();
247
+ }
248
+ }
249
+
250
+ @ Test
251
+ public void delayedPingAckTerminatesConnection () {
252
+ Logger .getLogger ("" ).setLevel (Level .ALL );
253
+
254
+ PipelineExceptionCatcher catcher = new PipelineExceptionCatcher ();
255
+ PingResponder pingResponder = new PingResponder ();
256
+ DelayingWriter delayingWriter = new DelayingWriter ((long )(FAST_CHECKER_DURATION_MILLIS * 1.5 ));
257
+
258
+ EmbeddedChannel channel = createHttp2Channel (fastChecker , catcher , pingResponder , delayingWriter );
259
+
260
+ pingResponder .setCallback (() -> channel .writeInbound (new DefaultHttp2PingFrame (0 , true )),
261
+ (long )(FAST_CHECKER_DURATION_MILLIS * 1.5 ) /* send a late ack to trigger timeout */ );
262
+
263
+ Instant runEnd = Instant .now ().plus (1 , SECONDS );
264
+ while (Instant .now ().isBefore (runEnd )) {
265
+ channel .runPendingTasks ();
266
+ if (!catcher .caughtExceptions .isEmpty ()) {
267
+ break ;
268
+ }
269
+ }
270
+
271
+ assertThat (catcher .caughtExceptions ).hasSize (1 );
272
+ assertThat (catcher .caughtExceptions .get (0 )).isInstanceOf (PingFailedException .class );
273
+ }
274
+
217
275
private static final class PingReadCatcher extends SimpleChannelInboundHandler <Http2PingFrame > {
218
276
private final List <Http2PingFrame > caughtPings = Collections .synchronizedList (new ArrayList <>());
219
277
@@ -239,4 +297,45 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
239
297
promise .setFailure (new IOException ("Failed!" ));
240
298
}
241
299
}
300
+
301
+ private static final class PingResponder extends ChannelOutboundHandlerAdapter {
302
+ private final ScheduledExecutorService scheduler = Executors .newSingleThreadScheduledExecutor ();
303
+ private Runnable respondCallback ;
304
+ private long callbackDelayMillis ;
305
+
306
+ void setCallback (Runnable respondCallback , long delay ) {
307
+ this .respondCallback = respondCallback ;
308
+ this .callbackDelayMillis = delay ;
309
+ }
310
+
311
+ @ Override
312
+ public void write (ChannelHandlerContext ctx , Object msg , ChannelPromise promise ) {
313
+ if (msg instanceof Http2PingFrame ) {
314
+ log .debug (ctx .channel (), () -> "OutgoingPingCatcher Received ping " + msg );
315
+ scheduler .schedule (respondCallback , callbackDelayMillis , TimeUnit .MILLISECONDS );
316
+ }
317
+ ctx .write (msg , promise );
318
+ }
319
+ }
320
+
321
+ private static final class DelayingWriter extends ChannelOutboundHandlerAdapter {
322
+ private final long sleepMillis ;
323
+
324
+ DelayingWriter (long sleepMillis ) {
325
+ this .sleepMillis = sleepMillis ;
326
+ }
327
+
328
+ @ Override
329
+ public void write (ChannelHandlerContext ctx , Object msg , ChannelPromise promise ) {
330
+ log .debug (ctx .channel (), () -> " Writing " + msg + " delayed by " + sleepMillis );
331
+ try {
332
+ Thread .sleep (sleepMillis );
333
+ } catch (InterruptedException e ) {
334
+
335
+ }
336
+ log .debug (ctx .channel (), () -> " Continuing write of " + msg );
337
+
338
+ ctx .write (msg , promise );
339
+ }
340
+ }
242
341
}
0 commit comments