Skip to content

Commit 0e59863

Browse files
committed
Add a handler to check connection status via Ping frame health checks in HTTP/2
Signed-off-by: raccoonback <[email protected]>
1 parent a46afc2 commit 0e59863

File tree

2 files changed

+492
-0
lines changed

2 files changed

+492
-0
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package reactor.netty.http.client;
17+
18+
import io.netty.channel.Channel;
19+
import io.netty.channel.ChannelFuture;
20+
import io.netty.channel.ChannelFutureListener;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.channel.ChannelPromise;
23+
import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
24+
import io.netty.handler.codec.http2.Http2ChannelDuplexHandler;
25+
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
26+
import reactor.util.Logger;
27+
import reactor.util.Loggers;
28+
import reactor.util.annotation.Nullable;
29+
30+
import java.time.Duration;
31+
import java.util.Objects;
32+
import java.util.concurrent.ScheduledFuture;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
35+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
36+
37+
/**
38+
* Handler that supports connection health checks using HTTP/2 Ping Frames.
39+
*
40+
* <p>This Handler sends a ping frame at the specified interval when no frame is being read or written,
41+
* ensuring the connection health is monitored. If a ping ACK frame is not received within the configured interval,
42+
* the connection will be closed.</p>
43+
*
44+
* <p>Ping frame checking will not be performed while a read or write operation is in progress.</p>
45+
*
46+
* <p>Be cautious when setting a very short interval, as it may cause the connection to be closed,
47+
* even if the keep-alive setting is enabled.</p>
48+
*
49+
* <p>If no interval is specified, no ping frame checking will be performed.</p>
50+
*
51+
* @author raccoonback
52+
* @since 1.2.3
53+
*/
54+
public class Http2ConnectionLivenessHandler extends Http2ChannelDuplexHandler {
55+
56+
private static final Logger log = Loggers.getLogger(Http2ConnectionLivenessHandler.class);
57+
58+
private ScheduledFuture<?> pingScheduler;
59+
private final ChannelFutureListener pingWriteListener = new PingWriteListener();
60+
private final Http2ConnectionEncoder encoder;
61+
private final long pingIntervalNanos;
62+
private long lastSentPingData;
63+
private long lastReceivedPingTime;
64+
private long lastIoTime;
65+
private boolean isPingAckPending;
66+
67+
public Http2ConnectionLivenessHandler(Http2ConnectionEncoder encoder, @Nullable Duration pingInterval) {
68+
Objects.requireNonNull(encoder, "encoder");
69+
this.encoder = encoder;
70+
71+
if (pingInterval != null) {
72+
this.pingIntervalNanos = pingInterval.toNanos();
73+
}
74+
else {
75+
this.pingIntervalNanos = 0L;
76+
}
77+
}
78+
79+
@Override
80+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
81+
if (isPingIntervalConfigured()) {
82+
isPingAckPending = false;
83+
pingScheduler = ctx.executor()
84+
.schedule(
85+
new PingChecker(ctx),
86+
pingIntervalNanos,
87+
NANOSECONDS
88+
);
89+
}
90+
91+
ctx.fireChannelActive();
92+
}
93+
94+
@Override
95+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
96+
if (msg instanceof DefaultHttp2PingFrame) {
97+
DefaultHttp2PingFrame frame = (DefaultHttp2PingFrame) msg;
98+
if (frame.ack() && frame.content() == lastSentPingData) {
99+
lastReceivedPingTime = System.nanoTime();
100+
}
101+
}
102+
else {
103+
lastIoTime = System.nanoTime();
104+
}
105+
106+
ctx.fireChannelRead(msg);
107+
}
108+
109+
@Override
110+
@SuppressWarnings("FutureReturnValueIgnored")
111+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
112+
lastIoTime = System.nanoTime();
113+
114+
ctx.write(msg, promise);
115+
}
116+
117+
@Override
118+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
119+
cancel();
120+
ctx.fireChannelInactive();
121+
}
122+
123+
@Override
124+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
125+
cancel();
126+
ctx.fireExceptionCaught(cause);
127+
}
128+
129+
private boolean isPingIntervalConfigured() {
130+
return pingIntervalNanos > 0;
131+
}
132+
133+
private void cancel() {
134+
if (pingScheduler != null) {
135+
pingScheduler.cancel(false);
136+
}
137+
}
138+
139+
private class PingChecker implements Runnable {
140+
141+
private final ChannelHandlerContext ctx;
142+
143+
PingChecker(ChannelHandlerContext ctx) {
144+
this.ctx = ctx;
145+
}
146+
147+
@Override
148+
public void run() {
149+
Channel channel = ctx.channel();
150+
if (channel == null || !channel.isOpen()) {
151+
152+
return;
153+
}
154+
155+
if (lastIoTime == 0 || isIoInProgress()) {
156+
if (log.isDebugEnabled()) {
157+
log.debug("{} channel is currently reading or writing data.", channel);
158+
}
159+
160+
isPingAckPending = false;
161+
pingScheduler = invokeNextSchedule();
162+
return;
163+
}
164+
165+
if (!isPingAckPending) {
166+
if (log.isDebugEnabled()) {
167+
log.debug("Attempting to send a ping frame to {} channel.", channel);
168+
}
169+
170+
writePing(ctx);
171+
pingScheduler = invokeNextSchedule();
172+
return;
173+
}
174+
175+
if (isOutOfTimeRange()) {
176+
if (log.isInfoEnabled()) {
177+
log.info("Closing {} channel due to delayed ping frame response (timeout: {} ns).", channel, pingIntervalNanos);
178+
}
179+
180+
close(channel);
181+
return;
182+
}
183+
184+
isPingAckPending = false;
185+
pingScheduler = invokeNextSchedule();
186+
}
187+
188+
private void writePing(ChannelHandlerContext ctx) {
189+
lastSentPingData = ThreadLocalRandom.current().nextLong();
190+
191+
encoder.frameWriter()
192+
.writePing(ctx, false, lastSentPingData, ctx.newPromise())
193+
.addListener(pingWriteListener);
194+
ctx.flush();
195+
}
196+
197+
private boolean isIoInProgress() {
198+
return pingIntervalNanos > (System.nanoTime() - lastIoTime);
199+
}
200+
201+
private boolean isOutOfTimeRange() {
202+
return pingIntervalNanos < (System.nanoTime() - lastReceivedPingTime);
203+
}
204+
205+
private ScheduledFuture<?> invokeNextSchedule() {
206+
return ctx.executor()
207+
.schedule(
208+
new PingChecker(ctx),
209+
pingIntervalNanos,
210+
NANOSECONDS
211+
);
212+
}
213+
214+
private void close(Channel channel) {
215+
channel.close()
216+
.addListener(future -> {
217+
if (future.isSuccess()) {
218+
if (log.isDebugEnabled()) {
219+
log.debug("{} channel closed an channel", channel);
220+
}
221+
}
222+
else if (log.isDebugEnabled()) {
223+
log.debug("{} channel failed to close an channel", channel, future.cause());
224+
}
225+
});
226+
}
227+
}
228+
229+
private class PingWriteListener implements ChannelFutureListener {
230+
231+
@Override
232+
public void operationComplete(ChannelFuture future) throws Exception {
233+
if (future.isSuccess()) {
234+
if (log.isDebugEnabled()) {
235+
log.debug("Wrote PING frame to {} channel.", future.channel());
236+
}
237+
238+
isPingAckPending = true;
239+
}
240+
else if (log.isDebugEnabled()) {
241+
log.debug("Failed to wrote PING frame to {} channel.", future.channel());
242+
}
243+
}
244+
}
245+
}

0 commit comments

Comments
 (0)