16
16
package reactor .netty .http .client ;
17
17
18
18
import io .netty .channel .Channel ;
19
+ import io .netty .channel .ChannelFutureListener ;
19
20
import io .netty .channel .ChannelHandlerContext ;
20
21
import io .netty .channel .SimpleChannelInboundHandler ;
21
22
import io .netty .handler .codec .http2 .DefaultHttp2PingFrame ;
22
23
import io .netty .handler .codec .http2 .Http2FrameCodec ;
23
24
import io .netty .handler .codec .http2 .Http2FrameCodecBuilder ;
24
25
import io .netty .handler .codec .http2 .Http2PingFrame ;
26
+ import io .netty .handler .ssl .SslContext ;
27
+ import io .netty .handler .ssl .SslContextBuilder ;
25
28
import io .netty .handler .ssl .util .InsecureTrustManagerFactory ;
26
29
import io .netty .handler .ssl .util .SelfSignedCertificate ;
27
30
import org .junit .jupiter .api .BeforeAll ;
30
33
import reactor .netty .BaseHttpTest ;
31
34
import reactor .netty .DisposableServer ;
32
35
import reactor .netty .NettyPipeline ;
33
- import reactor .netty .http . Http2SslContextSpec ;
36
+ import reactor .netty .resources . ConnectionProvider ;
34
37
38
+ import javax .net .ssl .SSLException ;
35
39
import java .security .cert .CertificateException ;
36
40
import java .time .Duration ;
37
41
import java .time .LocalDateTime ;
42
+ import java .time .ZoneId ;
38
43
import java .util .ArrayList ;
39
44
import java .util .List ;
40
45
import java .util .function .BiConsumer ;
52
57
class Http2ConnectionLivenessHandlerTest extends BaseHttpTest {
53
58
54
59
static SelfSignedCertificate ssc ;
60
+ static SslContext sslServer ;
61
+ static SslContext sslClient ;
55
62
56
63
@ BeforeAll
57
- static void createSelfSignedCertificate () throws CertificateException {
64
+ static void createSelfSignedCertificate () throws CertificateException , SSLException {
58
65
ssc = new SelfSignedCertificate ();
66
+ sslServer = SslContextBuilder .forServer (ssc .certificate (), ssc .privateKey ())
67
+ .build ();
68
+ sslClient = SslContextBuilder .forClient ()
69
+ .trustManager (InsecureTrustManagerFactory .INSTANCE )
70
+ .build ();
59
71
}
60
72
61
73
@ Test
62
74
void successReceiveResponse () {
63
75
DisposableServer disposableServer = createServer ()
64
76
.protocol (H2 )
65
- .secure (spec -> spec .sslContext (
66
- Http2SslContextSpec .forServer (ssc .certificate (), ssc .privateKey ())
67
- ))
77
+ .secure (spec -> spec .sslContext (sslServer ))
68
78
.handle ((req , resp ) -> resp .sendString (Mono .just ("Test" )))
69
79
.bindNow ();
70
80
71
81
String result = createClient (disposableServer ::address )
72
82
.protocol (H2 )
73
- .secure (spec -> spec .sslContext (
74
- Http2SslContextSpec .forClient ()
75
- .configure (builder -> builder .trustManager (InsecureTrustManagerFactory .INSTANCE ))
76
- ))
83
+ .secure (spec -> spec .sslContext (sslClient ))
77
84
.get ()
78
85
.uri ("/" )
79
86
.responseSingle ((resp , bytes ) -> bytes .asString ())
@@ -89,9 +96,7 @@ void noPingCheckWhenNotConfigured() {
89
96
DisposableServer disposableServer = createServer ()
90
97
.protocol (H2 )
91
98
.maxKeepAliveRequests (1 )
92
- .secure (spec -> spec .sslContext (
93
- Http2SslContextSpec .forServer (ssc .certificate (), ssc .privateKey ())
94
- ))
99
+ .secure (spec -> spec .sslContext (sslServer ))
95
100
.doOnChannelInit ((connectionObserver , channel , remoteAddress ) -> {
96
101
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder .forServer ()
97
102
.autoAckPingFrame (false )
@@ -107,10 +112,7 @@ void noPingCheckWhenNotConfigured() {
107
112
Channel channel = createClient (disposableServer ::address )
108
113
.protocol (H2 )
109
114
.keepAlive (true )
110
- .secure (spec -> spec .sslContext (
111
- Http2SslContextSpec .forClient ()
112
- .configure (builder -> builder .trustManager (InsecureTrustManagerFactory .INSTANCE ))
113
- ))
115
+ .secure (spec -> spec .sslContext (sslClient ))
114
116
.get ()
115
117
.uri ("/" )
116
118
.responseConnection ((conn , receiver ) -> Mono .just (receiver .channel ()))
@@ -125,19 +127,20 @@ void noPingCheckWhenNotConfigured() {
125
127
}
126
128
127
129
@ Test
128
- void closePingFrameIfDelayed () {
130
+ void closeConnectionIfPingFrameDelayed () {
129
131
Http2PingFrameHandler handler = new Http2PingFrameHandler (
130
132
(ctx , frame ) -> Mono .delay (Duration .ofMillis (150 ))
131
- .doOnNext (unUsed -> ctx .writeAndFlush (new DefaultHttp2PingFrame (frame .content (), true )))
133
+ .doOnNext (
134
+ unUsed -> ctx .writeAndFlush (new DefaultHttp2PingFrame (frame .content (), true ))
135
+ .addListener (ChannelFutureListener .CLOSE_ON_FAILURE )
136
+ )
132
137
.subscribe ()
133
138
);
134
139
135
140
DisposableServer disposableServer = createServer ()
136
141
.protocol (H2 )
137
142
.maxKeepAliveRequests (1 )
138
- .secure (spec -> spec .sslContext (
139
- Http2SslContextSpec .forServer (ssc .certificate (), ssc .privateKey ())
140
- ))
143
+ .secure (spec -> spec .sslContext (sslServer ))
141
144
.doOnChannelInit ((connectionObserver , channel , remoteAddress ) -> {
142
145
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder .forServer ()
143
146
.autoAckPingFrame (false )
@@ -153,10 +156,55 @@ void closePingFrameIfDelayed() {
153
156
Channel channel = createClient (disposableServer ::address )
154
157
.protocol (H2 )
155
158
.keepAlive (true )
156
- .secure (spec -> spec .sslContext (
157
- Http2SslContextSpec .forClient ()
158
- .configure (builder -> builder .trustManager (InsecureTrustManagerFactory .INSTANCE ))
159
- ))
159
+ .secure (spec -> spec .sslContext (sslClient ))
160
+ .http2Settings (builder -> {
161
+ builder .pingInterval (Duration .ofMillis (100 ));
162
+ })
163
+ .get ()
164
+ .uri ("/" )
165
+ .responseConnection ((conn , receiver ) -> Mono .just (receiver .channel ()))
166
+ .single ()
167
+ .block ();
168
+
169
+ Mono .delay (Duration .ofMillis (600 ))
170
+ .block ();
171
+
172
+ assertThat (handler .getReceivedPingTimes ()).hasSize (1 );
173
+ assertThat (channel .parent ().isOpen ()).isFalse ();
174
+ }
175
+
176
+ @ Test
177
+ void closeConnectionInPoolIfPingFrameDelayed () {
178
+ Http2PingFrameHandler handler = new Http2PingFrameHandler (
179
+ (ctx , frame ) -> Mono .delay (Duration .ofMillis (150 ))
180
+ .doOnNext (
181
+ unUsed -> ctx .writeAndFlush (new DefaultHttp2PingFrame (frame .content (), true ))
182
+ .addListener (ChannelFutureListener .CLOSE_ON_FAILURE )
183
+ )
184
+ .subscribe ()
185
+ );
186
+
187
+ DisposableServer disposableServer = createServer ()
188
+ .protocol (H2 )
189
+ .maxKeepAliveRequests (1 )
190
+ .secure (spec -> spec .sslContext (sslServer ))
191
+ .doOnChannelInit ((connectionObserver , channel , remoteAddress ) -> {
192
+ Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder .forServer ()
193
+ .autoAckPingFrame (false )
194
+ .autoAckSettingsFrame (true )
195
+ .build ();
196
+
197
+ channel .pipeline ().replace (NettyPipeline .HttpCodec , NettyPipeline .HttpCodec , http2FrameCodec );
198
+ channel .pipeline ().addLast (handler );
199
+ })
200
+ .handle ((req , resp ) -> resp .sendString (Mono .just ("Test" )))
201
+ .bindNow ();
202
+
203
+ ConnectionProvider pool = ConnectionProvider .create ("closeConnectionInPoolIfPingFrameDelayed" , 1 );
204
+ Channel channel = createClient (pool , disposableServer ::address )
205
+ .protocol (H2 )
206
+ .keepAlive (true )
207
+ .secure (spec -> spec .sslContext (sslClient ))
160
208
.http2Settings (builder -> {
161
209
builder .pingInterval (Duration .ofMillis (100 ));
162
210
})
@@ -180,9 +228,7 @@ void ackPingFrameWithinInterval() {
180
228
DisposableServer disposableServer = createServer ()
181
229
.protocol (H2 )
182
230
.maxKeepAliveRequests (1 )
183
- .secure (spec -> spec .sslContext (
184
- Http2SslContextSpec .forServer (ssc .certificate (), ssc .privateKey ())
185
- ))
231
+ .secure (spec -> spec .sslContext (sslServer ))
186
232
.doOnChannelInit ((connectionObserver , channel , remoteAddress ) -> {
187
233
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder .forServer ()
188
234
.autoAckPingFrame (false )
@@ -198,10 +244,48 @@ void ackPingFrameWithinInterval() {
198
244
Channel channel = createClient (disposableServer ::address )
199
245
.protocol (H2 )
200
246
.keepAlive (true )
201
- .secure (spec -> spec .sslContext (
202
- Http2SslContextSpec .forClient ()
203
- .configure (builder -> builder .trustManager (InsecureTrustManagerFactory .INSTANCE ))
204
- ))
247
+ .secure (spec -> spec .sslContext (sslClient ))
248
+ .http2Settings (builder -> {
249
+ builder .pingInterval (Duration .ofMillis (100 ));
250
+ })
251
+ .get ()
252
+ .uri ("/" )
253
+ .responseConnection ((conn , receiver ) -> Mono .just (receiver .channel ()))
254
+ .single ()
255
+ .block ();
256
+
257
+ Mono .delay (Duration .ofSeconds (1 ))
258
+ .block ();
259
+
260
+ assertThat (handler .getReceivedPingTimes ()).hasSizeGreaterThanOrEqualTo (2 );
261
+ assertThat (channel .parent ().isOpen ()).isTrue ();
262
+ }
263
+
264
+ @ Test
265
+ void connectionRetentionInPoolOnPingFrameAck () {
266
+ Http2PingFrameHandler handler = new Http2PingFrameHandler ();
267
+
268
+ DisposableServer disposableServer = createServer ()
269
+ .protocol (H2 )
270
+ .maxKeepAliveRequests (1 )
271
+ .secure (spec -> spec .sslContext (sslServer ))
272
+ .doOnChannelInit ((connectionObserver , channel , remoteAddress ) -> {
273
+ Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder .forServer ()
274
+ .autoAckPingFrame (false )
275
+ .autoAckSettingsFrame (true )
276
+ .build ();
277
+
278
+ channel .pipeline ().replace (NettyPipeline .HttpCodec , NettyPipeline .HttpCodec , http2FrameCodec );
279
+ channel .pipeline ().addLast (handler );
280
+ })
281
+ .handle ((req , resp ) -> resp .sendString (Mono .just ("Test" )))
282
+ .bindNow ();
283
+
284
+ ConnectionProvider pool = ConnectionProvider .create ("connectionRetentionInPoolOnPingFrameAck" , 1 );
285
+ Channel channel = createClient (pool , disposableServer ::address )
286
+ .protocol (H2 )
287
+ .keepAlive (true )
288
+ .secure (spec -> spec .sslContext (sslClient ))
205
289
.http2Settings (builder -> {
206
290
builder .pingInterval (Duration .ofMillis (100 ));
207
291
})
@@ -211,21 +295,23 @@ void ackPingFrameWithinInterval() {
211
295
.single ()
212
296
.block ();
213
297
214
- Mono .delay (Duration .ofMillis ( 1000 ))
298
+ Mono .delay (Duration .ofSeconds ( 1 ))
215
299
.block ();
216
300
217
301
assertThat (handler .getReceivedPingTimes ()).hasSizeGreaterThanOrEqualTo (2 );
218
302
assertThat (channel .parent ().isOpen ()).isTrue ();
219
303
}
220
304
221
- private static class Http2PingFrameHandler extends SimpleChannelInboundHandler <Http2PingFrame > {
305
+ private static final class Http2PingFrameHandler extends SimpleChannelInboundHandler <Http2PingFrame > {
222
306
223
- private List <LocalDateTime > receivedPingTimes = new ArrayList <>();
307
+ private final List <LocalDateTime > receivedPingTimes = new ArrayList <>();
224
308
225
309
private final BiConsumer <ChannelHandlerContext , Http2PingFrame > consumer ;
226
310
227
311
private Http2PingFrameHandler () {
228
- this .consumer = (ctx , frame ) -> ctx .writeAndFlush (new DefaultHttp2PingFrame (frame .content (), true ));
312
+ this .consumer = (ctx , frame ) ->
313
+ ctx .writeAndFlush (new DefaultHttp2PingFrame (frame .content (), true ))
314
+ .addListener (ChannelFutureListener .CLOSE_ON_FAILURE );
229
315
}
230
316
231
317
private Http2PingFrameHandler (BiConsumer <ChannelHandlerContext , Http2PingFrame > consumer ) {
@@ -234,7 +320,7 @@ private Http2PingFrameHandler(BiConsumer<ChannelHandlerContext, Http2PingFrame>
234
320
235
321
@ Override
236
322
protected void channelRead0 (ChannelHandlerContext ctx , Http2PingFrame frame ) throws InterruptedException {
237
- receivedPingTimes .add (LocalDateTime .now ());
323
+ receivedPingTimes .add (LocalDateTime .now (ZoneId . systemDefault () ));
238
324
consumer .accept (ctx , frame );
239
325
}
240
326
0 commit comments