Skip to content

Commit d8c89c2

Browse files
committed
more reads and relax detector
1 parent fea33ed commit d8c89c2

File tree

5 files changed

+20
-11
lines changed

5 files changed

+20
-11
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,44 +13,49 @@
1313
import io.netty.channel.ChannelHandlerContext;
1414
import io.netty.util.concurrent.ScheduledFuture;
1515

16+
import org.apache.logging.log4j.LogManager;
17+
import org.apache.logging.log4j.Logger;
1618
import org.elasticsearch.common.time.TimeProvider;
1719
import org.elasticsearch.common.util.concurrent.FutureUtils;
1820

1921
import java.util.concurrent.TimeUnit;
2022

2123
/**
2224
* When channel auto-read is disabled handlers are responsible to read from channel.
23-
* But it's hard to detect when read is missing. This helper class throws assertion errors
25+
* But it's hard to detect when read is missing. This helper class print warnings
2426
* when no reads where detected in given time interval. Normally, in tests, 30 seconds is enough
2527
* to avoid test hang for too long, but can be increased if needed.
2628
*/
2729
class MissingReadDetector extends ChannelDuplexHandler {
28-
final long interval;
29-
final TimeProvider timer;
30-
long reqTimeMs;
31-
long respTimeMs;
32-
ScheduledFuture<?> checker;
30+
31+
private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);
32+
33+
private final long interval;
34+
private final TimeProvider timer;
35+
private long reqTimeMs;
36+
private long respTimeMs;
37+
private ScheduledFuture<?> checker;
3338

3439
MissingReadDetector(TimeProvider timer, long missingReadInterval) {
3540
this.interval = missingReadInterval;
3641
this.timer = timer;
3742
}
3843

3944
@Override
40-
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
45+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
4146
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
4247
if (respTimeMs >= reqTimeMs) { // stale read
4348
long now = timer.absoluteTimeInMillis();
4449
if (now >= respTimeMs + interval) {
45-
ctx.fireExceptionCaught(new AssertionError("stale channel, no reads for " + (now - respTimeMs) + " ms"));
50+
logger.warn("haven't read from channel for {}", (now - respTimeMs));
4651
}
4752
}
4853
}, interval, interval, TimeUnit.MILLISECONDS);
4954
super.channelRegistered(ctx);
5055
}
5156

5257
@Override
53-
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
58+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
5459
if (checker != null) {
5560
FutureUtils.cancel(checker);
5661
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
123123
isContinueExpected = true;
124124
} else {
125125
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
126+
ctx.read();
126127
return;
127128
}
128129
}
@@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
136137
decoder.reset();
137138
}
138139
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
140+
ctx.read();
139141
} else {
140142
ignoreContent = false;
141143
currentContentLength = 0;
@@ -155,6 +157,7 @@ private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
155157
if (currentContentLength > maxContentLength) {
156158
msg.release();
157159
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
160+
ctx.read();
158161
} else {
159162
ctx.fireChannelRead(msg);
160163
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
509509
} else {
510510
serverTransport.onException(channel, (Exception) cause);
511511
}
512+
ctx.read();
512513
}
513514

514515
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
371371
ch.pipeline().addLast(new FlowControlHandler());
372372
if (Assertions.ENABLED) {
373373
// missing reads are hard to catch, but we can detect absence of reads within interval
374-
long missingReadIntervalMs = 30_000;
374+
long missingReadIntervalMs = 10_000;
375375
ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
376376
}
377377
// disable auto-read and issue first read, following reads must come from handlers

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public void close() {
251251
server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release());
252252
server.netty.stop();
253253
server.threadPool.shutdownNow();
254-
safeAwait(client.netty.config().group().shutdownGracefully());
254+
safeAwait(client.netty.config().group().shutdownGracefully(0,0, TimeUnit.SECONDS));
255255
}
256256
}
257257

0 commit comments

Comments
 (0)