Skip to content

Commit 3fe635b

Browse files
committed
add flow-control and remove auto-read
1 parent 997a7b8 commit 3fe635b

File tree

8 files changed

+187
-1090
lines changed

8 files changed

+187
-1090
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,6 @@ public void testClientConnectionCloseMidStream() throws Exception {
178178

179179
// await stream handler is ready and request full content
180180
var handler = ctx.awaitRestChannelAccepted(opaqueId);
181-
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
182-
183181
assertFalse(handler.streamClosed);
184182

185183
// terminate client connection
@@ -190,10 +188,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
190188
handler.stream.next();
191189

192190
// wait for resources to be released
193-
assertBusy(() -> {
194-
assertEquals(0, handler.stream.bufSize());
195-
assertTrue(handler.streamClosed);
196-
});
191+
assertBusy(() -> assertTrue(handler.streamClosed));
197192
}
198193
}
199194

@@ -208,15 +203,11 @@ public void testServerCloseConnectionMidStream() throws Exception {
208203

209204
// await stream handler is ready and request full content
210205
var handler = ctx.awaitRestChannelAccepted(opaqueId);
211-
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
212206
assertFalse(handler.streamClosed);
213207

214208
// terminate connection on server and wait resources are released
215209
handler.channel.request().getHttpChannel().close();
216-
assertBusy(() -> {
217-
assertEquals(0, handler.stream.bufSize());
218-
assertTrue(handler.streamClosed);
219-
});
210+
assertBusy(() -> assertTrue(handler.streamClosed));
220211
}
221212
}
222213

@@ -230,16 +221,12 @@ public void testServerExceptionMidStream() throws Exception {
230221

231222
// await stream handler is ready and request full content
232223
var handler = ctx.awaitRestChannelAccepted(opaqueId);
233-
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
234224
assertFalse(handler.streamClosed);
235225

236226
handler.shouldThrowInsideHandleChunk = true;
237227
handler.stream.next();
238228

239-
assertBusy(() -> {
240-
assertEquals(0, handler.stream.bufSize());
241-
assertTrue(handler.streamClosed);
242-
});
229+
assertBusy(() -> assertTrue(handler.streamClosed));
243230
}
244231
}
245232

@@ -280,7 +267,7 @@ public void testClientBackpressure() throws Exception {
280267
});
281268
handler.readBytes(partSize);
282269
}
283-
assertTrue(handler.stream.hasLast());
270+
assertTrue(handler.recvLast);
284271
}
285272
}
286273

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 63 additions & 206 deletions
Original file line numberDiff line numberDiff line change
@@ -9,249 +9,106 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12-
import io.netty.buffer.Unpooled;
12+
import io.netty.channel.ChannelDuplexHandler;
1313
import io.netty.channel.ChannelHandlerContext;
14-
import io.netty.channel.ChannelInboundHandlerAdapter;
1514
import io.netty.handler.codec.DecoderResult;
1615
import io.netty.handler.codec.http.HttpContent;
1716
import io.netty.handler.codec.http.HttpObject;
1817
import io.netty.handler.codec.http.HttpRequest;
19-
import io.netty.handler.codec.http.LastHttpContent;
20-
import io.netty.util.ReferenceCountUtil;
2118

2219
import org.elasticsearch.action.ActionListener;
2320
import org.elasticsearch.action.support.ContextPreservingActionListener;
2421
import org.elasticsearch.common.util.concurrent.ThreadContext;
22+
import org.elasticsearch.core.Nullable;
2523
import org.elasticsearch.http.netty4.internal.HttpValidator;
2624
import org.elasticsearch.transport.Transports;
2725

28-
import java.util.ArrayDeque;
29-
30-
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY;
31-
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
32-
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
33-
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
34-
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
35-
36-
public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
26+
public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
3727

3828
private final HttpValidator validator;
3929
private final ThreadContext threadContext;
40-
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
41-
private State state = WAITING_TO_START;
30+
private boolean droppingContent;
31+
private boolean validatingRequest;
4232

4333
public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
4434
this.validator = validator;
4535
this.threadContext = threadContext;
4636
}
4737

48-
State getState() {
49-
return state;
50-
}
51-
52-
@SuppressWarnings("fallthrough")
5338
@Override
5439
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
55-
assert msg instanceof HttpObject;
56-
final HttpObject httpObject = (HttpObject) msg;
57-
58-
switch (state) {
59-
case WAITING_TO_START:
60-
assert pending.isEmpty();
61-
pending.add(ReferenceCountUtil.retain(httpObject));
62-
requestStart(ctx);
63-
assert state == QUEUEING_DATA;
64-
assert ctx.channel().config().isAutoRead() == false;
65-
break;
66-
case QUEUEING_DATA:
67-
pending.add(ReferenceCountUtil.retain(httpObject));
68-
break;
69-
case FORWARDING_DATA_UNTIL_NEXT_REQUEST:
70-
assert pending.isEmpty();
71-
if (httpObject instanceof LastHttpContent) {
72-
state = WAITING_TO_START;
73-
}
74-
ctx.fireChannelRead(httpObject);
75-
break;
76-
case DROPPING_DATA_UNTIL_NEXT_REQUEST:
77-
assert pending.isEmpty();
78-
if (httpObject instanceof LastHttpContent) {
79-
state = WAITING_TO_START;
80-
}
81-
ReferenceCountUtil.release(httpObject);
82-
break;
83-
case DROPPING_DATA_PERMANENTLY:
84-
assert pending.isEmpty();
85-
ReferenceCountUtil.release(httpObject); // consume without enqueuing
86-
ctx.channel().config().setAutoRead(false);
87-
break;
88-
}
89-
}
90-
91-
private void requestStart(ChannelHandlerContext ctx) {
92-
assert state == WAITING_TO_START;
93-
94-
if (pending.isEmpty()) {
95-
return;
96-
}
97-
98-
final HttpObject httpObject = pending.getFirst();
99-
final HttpRequest httpRequest;
100-
if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) {
101-
// a properly decoded HTTP start message is expected to begin validation
102-
// anything else is probably an error that the downstream HTTP message aggregator will have to handle
103-
httpRequest = (HttpRequest) httpObject;
104-
} else {
105-
httpRequest = null;
106-
}
107-
108-
state = QUEUEING_DATA;
109-
ctx.channel().config().setAutoRead(false);
110-
111-
if (httpRequest == null) {
112-
// this looks like a malformed request and will forward without validation
113-
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
114-
} else {
115-
assert Transports.assertDefaultThreadContext(threadContext);
116-
ActionListener.run(
117-
// this prevents thread-context changes to propagate to the validation listener
118-
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
119-
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
120-
ActionListener.assertOnce(
121-
new ContextPreservingActionListener<Void>(
122-
threadContext.wrapRestorable(threadContext.newStoredContext()),
123-
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
124-
new ActionListener<>() {
125-
@Override
126-
public void onResponse(Void unused) {
127-
assert Transports.assertDefaultThreadContext(threadContext);
128-
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
129-
}
130-
131-
@Override
132-
public void onFailure(Exception e) {
133-
assert Transports.assertDefaultThreadContext(threadContext);
134-
ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e));
135-
}
136-
}
137-
)
138-
),
139-
listener -> {
140-
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
141-
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
142-
validator.validate(httpRequest, ctx.channel(), listener);
40+
assert ctx.channel().config().isAutoRead() == false : "auto-read should be always disabled";
41+
if (msg instanceof HttpObject httpObject) {
42+
if (httpObject.decoderResult().isFailure()) {
43+
ctx.fireChannelRead(httpObject); // pass-through for decoding failures
44+
} else {
45+
if (msg instanceof HttpRequest request) {
46+
validate(ctx, request);
47+
} else if (msg instanceof HttpContent content) {
48+
if (droppingContent) {
49+
content.release();
50+
} else {
51+
assert validatingRequest == false : "unexpected content before validation completed";
52+
ctx.fireChannelRead(content);
14353
}
14454
}
145-
);
146-
}
147-
}
148-
149-
private void forwardFullRequest(ChannelHandlerContext ctx) {
150-
Transports.assertDefaultThreadContext(threadContext);
151-
assert ctx.channel().eventLoop().inEventLoop();
152-
assert ctx.channel().config().isAutoRead() == false;
153-
assert state == QUEUEING_DATA;
154-
155-
ctx.channel().config().setAutoRead(true);
156-
boolean fullRequestForwarded = forwardData(ctx, pending);
157-
158-
assert fullRequestForwarded || pending.isEmpty();
159-
if (fullRequestForwarded) {
160-
state = WAITING_TO_START;
161-
requestStart(ctx);
162-
} else {
163-
state = FORWARDING_DATA_UNTIL_NEXT_REQUEST;
164-
}
165-
166-
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
167-
}
168-
169-
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
170-
Transports.assertDefaultThreadContext(threadContext);
171-
assert ctx.channel().eventLoop().inEventLoop();
172-
assert ctx.channel().config().isAutoRead() == false;
173-
assert state == QUEUEING_DATA;
174-
175-
HttpObject messageToForward = pending.getFirst();
176-
boolean fullRequestDropped = dropData(pending);
177-
if (messageToForward instanceof HttpContent toReplace) {
178-
// if the request to forward contained data (which got dropped), replace with empty data
179-
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
180-
}
181-
messageToForward.setDecoderResult(DecoderResult.failure(e));
182-
183-
ctx.channel().config().setAutoRead(true);
184-
ctx.fireChannelRead(messageToForward);
185-
186-
assert fullRequestDropped || pending.isEmpty();
187-
if (fullRequestDropped) {
188-
state = WAITING_TO_START;
189-
requestStart(ctx);
190-
} else {
191-
state = DROPPING_DATA_UNTIL_NEXT_REQUEST;
55+
}
19256
}
193-
194-
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
19557
}
19658

19759
@Override
198-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
199-
state = DROPPING_DATA_PERMANENTLY;
200-
while (true) {
201-
if (dropData(pending) == false) {
202-
break;
203-
}
60+
public void read(ChannelHandlerContext ctx) throws Exception {
61+
// until validation is completed we can ignore read calls,
62+
// once validation is finished HttpRequest will be fired and downstream can read from there
63+
if (validatingRequest == false) {
64+
ctx.read();
20465
}
205-
super.channelInactive(ctx);
20666
}
20767

208-
private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque<HttpObject> pending) {
209-
final int pendingMessages = pending.size();
210-
try {
211-
HttpObject toForward;
212-
while ((toForward = pending.poll()) != null) {
213-
ctx.fireChannelRead(toForward);
214-
ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued
215-
if (toForward instanceof LastHttpContent) {
216-
return true;
217-
}
218-
}
219-
return false;
220-
} finally {
221-
maybeResizePendingDown(pendingMessages, pending);
222-
}
223-
}
68+
void validate(ChannelHandlerContext ctx, HttpRequest request) {
69+
assert Transports.assertDefaultThreadContext(threadContext);
70+
droppingContent = false;
71+
validatingRequest = true;
72+
ActionListener.run(
73+
// this prevents thread-context changes to propagate to the validation listener
74+
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
75+
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
76+
ActionListener.assertOnce(
77+
new ContextPreservingActionListener<Void>(
78+
threadContext.wrapRestorable(threadContext.newStoredContext()),
79+
new ActionListener<>() {
80+
@Override
81+
public void onResponse(Void unused) {
82+
handleValidationResult(ctx, request, null);
83+
}
22484

225-
private static boolean dropData(ArrayDeque<HttpObject> pending) {
226-
final int pendingMessages = pending.size();
227-
try {
228-
HttpObject toDrop;
229-
while ((toDrop = pending.poll()) != null) {
230-
ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming
231-
if (toDrop instanceof LastHttpContent) {
232-
return true;
85+
@Override
86+
public void onFailure(Exception e) {
87+
handleValidationResult(ctx, request, e);
88+
}
89+
}
90+
)
91+
),
92+
listener -> {
93+
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
94+
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
95+
validator.validate(request, ctx.channel(), listener);
23396
}
23497
}
235-
return false;
236-
} finally {
237-
maybeResizePendingDown(pendingMessages, pending);
238-
}
98+
);
23999
}
240100

241-
private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject> pending) {
242-
if (pending.size() <= 4 && largeSize > 32) {
243-
// Prevent the ArrayDeque from becoming forever large due to a single large message.
244-
ArrayDeque<HttpObject> old = pending;
245-
pending = new ArrayDeque<>(4);
246-
pending.addAll(old);
247-
}
101+
void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) {
102+
assert Transports.assertDefaultThreadContext(threadContext);
103+
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
104+
ctx.channel().eventLoop().execute(() -> {
105+
if (validationError != null) {
106+
request.setDecoderResult(DecoderResult.failure(validationError));
107+
droppingContent = true;
108+
}
109+
validatingRequest = false;
110+
ctx.fireChannelRead(request);
111+
});
248112
}
249113

250-
enum State {
251-
WAITING_TO_START,
252-
QUEUEING_DATA,
253-
FORWARDING_DATA_UNTIL_NEXT_REQUEST,
254-
DROPPING_DATA_UNTIL_NEXT_REQUEST,
255-
DROPPING_DATA_PERMANENTLY
256-
}
257114
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,16 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
139139
} else {
140140
var contentStream = new Netty4HttpRequestBodyStream(
141141
ctx.channel(),
142-
serverTransport.getThreadPool().getThreadContext(),
143-
activityTracker
142+
serverTransport.getThreadPool().getThreadContext()
144143
);
145144
currentRequestStream = contentStream;
146145
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
147146
}
148147
}
149148
handlePipelinedRequest(ctx, netty4HttpRequest);
149+
if (request instanceof FullHttpRequest) {
150+
ctx.read();
151+
}
150152
} else {
151153
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
152154
assert currentRequestStream != null : "current stream must exists before handling http content";

0 commit comments

Comments
 (0)