15
15
*/
16
16
package reactor .netty .http .server ;
17
17
18
- import java .net .SocketAddress ;
19
- import java .time .Duration ;
20
- import java .time .ZonedDateTime ;
21
- import java .util .Optional ;
22
- import java .util .Queue ;
23
- import java .util .function .BiFunction ;
24
- import java .util .function .BiPredicate ;
18
+ import static io .netty .handler .codec .http .HttpUtil .isContentLengthSet ;
19
+ import static io .netty .handler .codec .http .HttpUtil .isKeepAlive ;
20
+ import static io .netty .handler .codec .http .HttpUtil .isTransferEncodingChunked ;
21
+ import static io .netty .handler .codec .http .HttpUtil .setKeepAlive ;
22
+ import static io .netty .handler .codec .http .LastHttpContent .EMPTY_LAST_CONTENT ;
23
+ import static reactor .netty .ReactorNetty .format ;
25
24
26
25
import io .netty .channel .ChannelDuplexHandler ;
27
26
import io .netty .channel .ChannelFutureListener ;
28
27
import io .netty .channel .ChannelHandlerContext ;
28
+ import io .netty .channel .ChannelPipeline ;
29
29
import io .netty .channel .ChannelPromise ;
30
30
import io .netty .handler .codec .DecoderResult ;
31
31
import io .netty .handler .codec .DecoderResultProvider ;
43
43
import io .netty .handler .codec .http .LastHttpContent ;
44
44
import io .netty .handler .codec .http .cookie .ServerCookieDecoder ;
45
45
import io .netty .handler .codec .http .cookie .ServerCookieEncoder ;
46
+ import io .netty .handler .codec .http2 .Http2FrameCodec ;
46
47
import io .netty .handler .ssl .SslHandler ;
47
48
import io .netty .util .ReferenceCountUtil ;
49
+ import java .net .SocketAddress ;
50
+ import java .time .Duration ;
51
+ import java .time .ZonedDateTime ;
52
+ import java .util .Optional ;
53
+ import java .util .Queue ;
54
+ import java .util .function .BiFunction ;
55
+ import java .util .function .BiPredicate ;
48
56
import reactor .core .Exceptions ;
49
57
import reactor .core .publisher .Mono ;
50
58
import reactor .netty .Connection ;
51
59
import reactor .netty .ConnectionObserver ;
52
60
import reactor .netty .ReactorNetty ;
53
61
import reactor .netty .channel .ChannelOperations ;
62
+ import reactor .netty .http .Http2ConnectionLiveness ;
54
63
import reactor .netty .http .Http2SettingsSpec ;
64
+ import reactor .netty .http .HttpConnectionImmediateClose ;
65
+ import reactor .netty .http .IdleTimeoutHandler ;
55
66
import reactor .netty .http .logging .HttpMessageArgProviderFactory ;
56
67
import reactor .netty .http .logging .HttpMessageLogFactory ;
57
68
import reactor .netty .http .server .compression .HttpCompressionOptionsSpec ;
58
69
import reactor .util .annotation .Nullable ;
59
70
import reactor .util .concurrent .Queues ;
60
71
61
- import static io .netty .handler .codec .http .HttpUtil .isContentLengthSet ;
62
- import static io .netty .handler .codec .http .HttpUtil .isKeepAlive ;
63
- import static io .netty .handler .codec .http .HttpUtil .isTransferEncodingChunked ;
64
- import static io .netty .handler .codec .http .HttpUtil .setKeepAlive ;
65
- import static io .netty .handler .codec .http .LastHttpContent .EMPTY_LAST_CONTENT ;
66
- import static reactor .netty .ReactorNetty .format ;
67
-
68
72
/**
69
73
* Replace {@link io.netty.handler.codec.http.HttpServerKeepAliveHandler} with extra
70
74
* handler management.
@@ -157,6 +161,12 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
157
161
ctx .read ();
158
162
}
159
163
164
+ @ Override
165
+ public void channelActive (ChannelHandlerContext ctx ) {
166
+ setupIdleTimeoutHandler (ctx .pipeline ());
167
+ ctx .fireChannelActive ();
168
+ }
169
+
160
170
@ Override
161
171
public void channelRead (ChannelHandlerContext ctx , Object msg ) {
162
172
read = true ;
@@ -172,6 +182,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
172
182
if (msg instanceof HttpRequest ) {
173
183
finalizingResponse = false ;
174
184
185
+ if (idleTimeout != null ) {
186
+ IdleTimeoutHandler .removeIdleTimeoutHandler (ctx .pipeline ());
187
+ }
188
+
175
189
final HttpRequest request = (HttpRequest ) msg ;
176
190
177
191
if (H2 .equals (request .protocolVersion ())) {
@@ -539,6 +553,7 @@ void handleLastHttpContent(Object msg, ChannelPromise promise) {
539
553
ctx .executor ().execute (this );
540
554
}
541
555
else {
556
+ setupIdleTimeoutHandler (ctx .pipeline ());
542
557
ctx .read ();
543
558
}
544
559
}
@@ -682,6 +697,29 @@ static boolean isMultipart(HttpResponse response) {
682
697
MULTIPART_PREFIX .length ());
683
698
}
684
699
700
+ private void setupIdleTimeoutHandler (ChannelPipeline pipeline ) {
701
+ Http2FrameCodec httpCodec = pipeline .get (Http2FrameCodec .class );
702
+ if (httpCodec != null ) {
703
+ IdleTimeoutHandler .addIdleTimeoutServerHandler (
704
+ pipeline ,
705
+ idleTimeout ,
706
+ new Http2ConnectionLiveness (
707
+ httpCodec ,
708
+ http2SettingsSpec != null ? http2SettingsSpec .pingAckTimeout () : null ,
709
+ http2SettingsSpec != null ? http2SettingsSpec .pingScheduleInterval () : null ,
710
+ http2SettingsSpec != null ? http2SettingsSpec .pingAckDropThreshold () : null
711
+ )
712
+ );
713
+ return ;
714
+ }
715
+
716
+ IdleTimeoutHandler .addIdleTimeoutServerHandler (
717
+ pipeline ,
718
+ idleTimeout ,
719
+ new HttpConnectionImmediateClose ()
720
+ );
721
+ }
722
+
685
723
static final class HttpRequestHolder {
686
724
final HttpRequest request ;
687
725
final ZonedDateTime timestamp ;
0 commit comments