Skip to content

Commit fea33ed

Browse files
committed
missing read detector
1 parent e9df377 commit fea33ed

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.ChannelDuplexHandler;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.util.concurrent.ScheduledFuture;
15+
16+
import org.elasticsearch.common.time.TimeProvider;
17+
import org.elasticsearch.common.util.concurrent.FutureUtils;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
/**
22+
* When channel auto-read is disabled handlers are responsible to read from channel.
23+
* But it's hard to detect when read is missing. This helper class throws assertion errors
24+
* when no reads where detected in given time interval. Normally, in tests, 30 seconds is enough
25+
* to avoid test hang for too long, but can be increased if needed.
26+
*/
27+
class MissingReadDetector extends ChannelDuplexHandler {
28+
final long interval;
29+
final TimeProvider timer;
30+
long reqTimeMs;
31+
long respTimeMs;
32+
ScheduledFuture<?> checker;
33+
34+
MissingReadDetector(TimeProvider timer, long missingReadInterval) {
35+
this.interval = missingReadInterval;
36+
this.timer = timer;
37+
}
38+
39+
@Override
40+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
41+
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
42+
if (respTimeMs >= reqTimeMs) { // stale read
43+
long now = timer.absoluteTimeInMillis();
44+
if (now >= respTimeMs + interval) {
45+
ctx.fireExceptionCaught(new AssertionError("stale channel, no reads for " + (now - respTimeMs) + " ms"));
46+
}
47+
}
48+
}, interval, interval, TimeUnit.MILLISECONDS);
49+
super.channelRegistered(ctx);
50+
}
51+
52+
@Override
53+
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
54+
if (checker != null) {
55+
FutureUtils.cancel(checker);
56+
}
57+
super.channelUnregistered(ctx);
58+
}
59+
60+
@Override
61+
public void read(ChannelHandlerContext ctx) throws Exception {
62+
reqTimeMs = timer.absoluteTimeInMillis();
63+
super.read(ctx);
64+
}
65+
66+
@Override
67+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
68+
respTimeMs = timer.absoluteTimeInMillis();
69+
super.channelRead(ctx, msg);
70+
}
71+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
4848
}
4949
if (aggregating || msg instanceof FullHttpRequest) {
5050
super.channelRead(ctx, msg);
51+
ctx.read();
5152
} else {
5253
streamContentSizeHandler.channelRead(ctx, msg);
5354
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.settings.Settings;
4848
import org.elasticsearch.common.unit.ByteSizeValue;
4949
import org.elasticsearch.common.util.concurrent.EsExecutors;
50+
import org.elasticsearch.core.Assertions;
5051
import org.elasticsearch.core.IOUtils;
5152
import org.elasticsearch.core.Nullable;
5253
import org.elasticsearch.http.AbstractHttpServerTransport;
@@ -365,9 +366,18 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
365366
}
366367
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
367368
ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces
369+
370+
// from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
368371
ch.pipeline().addLast(new FlowControlHandler());
372+
if (Assertions.ENABLED) {
373+
// missing reads are hard to catch, but we can detect absence of reads within interval
374+
long missingReadIntervalMs = 30_000;
375+
ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
376+
}
377+
// disable auto-read and issue first read, following reads must come from handlers
369378
ch.config().setAutoRead(false);
370379
ch.read();
380+
371381
if (httpValidator != null) {
372382
// runs a validation function on the first HTTP message piece which contains all the headers
373383
// if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded

0 commit comments

Comments
 (0)