Skip to content

Commit c8805b8

Browse files
authored
Add flow-control and remove auto-read in netty4 HTTP pipeline (#126441)
1 parent c1ecafa commit c8805b8

File tree

15 files changed

+485
-1155
lines changed

15 files changed

+485
-1155
lines changed

docs/changelog/126441.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126441
2+
summary: Add flow-control and remove auto-read in netty4 http pipeline
3+
area: Network
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 72 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,64 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
9494

9595
private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);
9696

97+
private static long transportStatsRequestBytesSize(Ctx ctx) {
98+
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
99+
var stats = httpTransport.stats().clientStats();
100+
var bytes = 0L;
101+
for (var s : stats) {
102+
bytes += s.requestSizeBytes();
103+
}
104+
return bytes;
105+
}
106+
107+
static int MBytes(int m) {
108+
return m * 1024 * 1024;
109+
}
110+
111+
static <T> T safePoll(BlockingDeque<T> queue) {
112+
try {
113+
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
114+
assertNotNull("queue is empty", t);
115+
return t;
116+
} catch (InterruptedException e) {
117+
Thread.currentThread().interrupt();
118+
throw new AssertionError(e);
119+
}
120+
}
121+
122+
private static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
123+
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
124+
req.headers().add(CONTENT_LENGTH, content.readableBytes());
125+
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
126+
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
127+
return req;
128+
}
129+
130+
private static HttpRequest httpRequest(String opaqueId, int contentLength) {
131+
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
132+
}
133+
134+
private static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
135+
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
136+
req.headers().add(CONTENT_LENGTH, contentLength);
137+
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
138+
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
139+
return req;
140+
}
141+
142+
private static HttpContent randomContent(int size, boolean isLast) {
143+
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
144+
if (isLast) {
145+
return new DefaultLastHttpContent(buf);
146+
} else {
147+
return new DefaultHttpContent(buf);
148+
}
149+
}
150+
151+
private static ByteBuf randomByteBuf(int size) {
152+
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
153+
}
154+
97155
@Override
98156
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
99157
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
@@ -178,8 +236,6 @@ public void testClientConnectionCloseMidStream() throws Exception {
178236

179237
// await stream handler is ready and request full content
180238
var handler = ctx.awaitRestChannelAccepted(opaqueId);
181-
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
182-
183239
assertFalse(handler.streamClosed);
184240

185241
// terminate client connection
@@ -190,10 +246,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
190246
handler.stream.next();
191247

192248
// wait for resources to be released
193-
assertBusy(() -> {
194-
assertEquals(0, handler.stream.bufSize());
195-
assertTrue(handler.streamClosed);
196-
});
249+
assertBusy(() -> assertTrue(handler.streamClosed));
197250
}
198251
}
199252

@@ -208,15 +261,11 @@ public void testServerCloseConnectionMidStream() throws Exception {
208261

209262
// await stream handler is ready and request full content
210263
var handler = ctx.awaitRestChannelAccepted(opaqueId);
211-
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
212264
assertFalse(handler.streamClosed);
213265

214266
// terminate connection on server and wait resources are released
215267
handler.channel.request().getHttpChannel().close();
216-
assertBusy(() -> {
217-
assertEquals(0, handler.stream.bufSize());
218-
assertTrue(handler.streamClosed);
219-
});
268+
assertBusy(() -> assertTrue(handler.streamClosed));
220269
}
221270
}
222271

@@ -230,16 +279,12 @@ public void testServerExceptionMidStream() throws Exception {
230279

231280
// await stream handler is ready and request full content
232281
var handler = ctx.awaitRestChannelAccepted(opaqueId);
233-
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
234282
assertFalse(handler.streamClosed);
235283

236284
handler.shouldThrowInsideHandleChunk = true;
237285
handler.stream.next();
238286

239-
assertBusy(() -> {
240-
assertEquals(0, handler.stream.bufSize());
241-
assertTrue(handler.streamClosed);
242-
});
287+
assertBusy(() -> assertTrue(handler.streamClosed));
243288
}
244289
}
245290

@@ -280,7 +325,7 @@ public void testClientBackpressure() throws Exception {
280325
});
281326
handler.readBytes(partSize);
282327
}
283-
assertTrue(handler.stream.hasLast());
328+
assertTrue(handler.recvLast);
284329
}
285330
}
286331

@@ -385,16 +430,6 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
385430
}
386431
}
387432

388-
private static long transportStatsRequestBytesSize(Ctx ctx) {
389-
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
390-
var stats = httpTransport.stats().clientStats();
391-
var bytes = 0L;
392-
for (var s : stats) {
393-
bytes += s.requestSizeBytes();
394-
}
395-
return bytes;
396-
}
397-
398433
/**
399434
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
400435
*/
@@ -489,63 +524,15 @@ private String opaqueId(int reqNo) {
489524
return getTestName() + "-" + reqNo;
490525
}
491526

492-
static int MBytes(int m) {
493-
return m * 1024 * 1024;
494-
}
495-
496-
static <T> T safePoll(BlockingDeque<T> queue) {
497-
try {
498-
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
499-
assertNotNull("queue is empty", t);
500-
return t;
501-
} catch (InterruptedException e) {
502-
Thread.currentThread().interrupt();
503-
throw new AssertionError(e);
504-
}
505-
}
506-
507-
static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
508-
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
509-
req.headers().add(CONTENT_LENGTH, content.readableBytes());
510-
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
511-
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
512-
return req;
513-
}
514-
515-
static HttpRequest httpRequest(String opaqueId, int contentLength) {
516-
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
517-
}
518-
519-
static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
520-
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
521-
req.headers().add(CONTENT_LENGTH, contentLength);
522-
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
523-
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
524-
return req;
525-
}
526-
527-
static HttpContent randomContent(int size, boolean isLast) {
528-
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
529-
if (isLast) {
530-
return new DefaultLastHttpContent(buf);
531-
} else {
532-
return new DefaultHttpContent(buf);
533-
}
534-
}
535-
536-
static ByteBuf randomByteBuf(int size) {
537-
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
538-
}
539-
540-
Ctx setupClientCtx() throws Exception {
527+
private Ctx setupClientCtx() throws Exception {
541528
var nodeName = internalCluster().getRandomNodeName();
542529
var clientRespQueue = new LinkedBlockingDeque<>(16);
543530
var bootstrap = bootstrapClient(nodeName, clientRespQueue);
544531
var channel = bootstrap.connect().sync().channel();
545532
return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue);
546533
}
547534

548-
Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
535+
private Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
549536
var httpServer = internalCluster().getInstance(HttpServerTransport.class, node);
550537
var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses());
551538
return new Bootstrap().group(new NioEventLoopGroup(1))
@@ -583,9 +570,13 @@ protected boolean addMockHttpTransport() {
583570
return false; // enable http
584571
}
585572

586-
record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque<Object> clientRespQueue)
587-
implements
588-
AutoCloseable {
573+
private record Ctx(
574+
String testName,
575+
String nodeName,
576+
Bootstrap clientBootstrap,
577+
Channel clientChannel,
578+
BlockingDeque<Object> clientRespQueue
579+
) implements AutoCloseable {
589580

590581
@Override
591582
public void close() throws Exception {
@@ -610,7 +601,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception
610601
}
611602
}
612603

613-
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
604+
private static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
614605
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
615606
final String opaqueId;
616607
final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.ChannelDuplexHandler;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.util.concurrent.ScheduledFuture;
15+
16+
import org.apache.logging.log4j.LogManager;
17+
import org.apache.logging.log4j.Logger;
18+
import org.elasticsearch.common.time.TimeProvider;
19+
import org.elasticsearch.common.util.concurrent.FutureUtils;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
/**
24+
* When channel auto-read is disabled handlers are responsible to read from channel.
25+
* But it's hard to detect when read is missing. This helper class print warnings
26+
* when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
27+
* to avoid test hang for too long, but can be increased if needed.
28+
*/
29+
class MissingReadDetector extends ChannelDuplexHandler {
30+
31+
private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);
32+
33+
private final long interval;
34+
private final TimeProvider timer;
35+
private boolean pendingRead;
36+
private long lastRead;
37+
private ScheduledFuture<?> checker;
38+
39+
MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
40+
this.interval = missingReadIntervalMillis;
41+
this.timer = timer;
42+
}
43+
44+
@Override
45+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
46+
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
47+
if (pendingRead == false) {
48+
long now = timer.absoluteTimeInMillis();
49+
if (now >= lastRead + interval) {
50+
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
51+
}
52+
}
53+
}, interval, interval, TimeUnit.MILLISECONDS);
54+
super.handlerAdded(ctx);
55+
}
56+
57+
@Override
58+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
59+
if (checker != null) {
60+
FutureUtils.cancel(checker);
61+
}
62+
super.handlerRemoved(ctx);
63+
}
64+
65+
@Override
66+
public void read(ChannelHandlerContext ctx) throws Exception {
67+
pendingRead = true;
68+
ctx.read();
69+
}
70+
71+
@Override
72+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
73+
assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
74+
pendingRead = false;
75+
lastRead = timer.absoluteTimeInMillis();
76+
ctx.fireChannelRead(msg);
77+
}
78+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.netty.handler.codec.http.HttpObjectAggregator;
1616
import io.netty.handler.codec.http.HttpRequest;
1717
import io.netty.handler.codec.http.HttpRequestDecoder;
18+
import io.netty.handler.codec.http.LastHttpContent;
1819

1920
import org.elasticsearch.http.HttpPreRequest;
2021
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
@@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
4849
}
4950
if (aggregating || msg instanceof FullHttpRequest) {
5051
super.channelRead(ctx, msg);
52+
if (msg instanceof LastHttpContent == false) {
53+
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
54+
}
5155
} else {
5256
streamContentSizeHandler.channelRead(ctx, msg);
5357
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
123123
isContinueExpected = true;
124124
} else {
125125
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
126+
ctx.read();
126127
return;
127128
}
128129
}
@@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
136137
decoder.reset();
137138
}
138139
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
140+
ctx.read();
139141
} else {
140142
ignoreContent = false;
141143
currentContentLength = 0;
@@ -150,11 +152,13 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
150152
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
151153
if (ignoreContent) {
152154
msg.release();
155+
ctx.read();
153156
} else {
154157
currentContentLength += msg.content().readableBytes();
155158
if (currentContentLength > maxContentLength) {
156159
msg.release();
157160
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
161+
ctx.read();
158162
} else {
159163
ctx.fireChannelRead(msg);
160164
}

0 commit comments

Comments
 (0)