Skip to content

Commit 1fbb4ab

Browse files
committed
auto reads sync
1 parent 485a553 commit 1fbb4ab

File tree

8 files changed

+129
-64
lines changed

8 files changed

+129
-64
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,10 @@ public void testEmptyContent() throws Exception {
117117
assertTrue(recvChunk.isLast);
118118
assertEquals(0, recvChunk.chunk.length());
119119
recvChunk.chunk.close();
120-
assertFalse(handler.streamClosed);
120+
assertTrue(handler.streamClosed);
121121

122122
// send response to process following request
123123
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
124-
assertBusy(() -> assertTrue(handler.streamClosed));
125124
}
126125
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
127126
}
@@ -154,10 +153,9 @@ public void testReceiveAllChunks() throws Exception {
154153
}
155154
}
156155

157-
assertFalse(handler.streamClosed);
156+
assertTrue(handler.streamClosed);
158157
assertEquals("sent and received payloads are not the same", sendData, recvData);
159158
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
160-
assertBusy(() -> assertTrue(handler.streamClosed));
161159
}
162160
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
163161
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.Channel;
13+
import io.netty.channel.ChannelConfig;
14+
import io.netty.util.AttributeKey;
15+
16+
import java.util.BitSet;
17+
18+
class AutoReadSync {
19+
20+
private static final AttributeKey<AutoReadSync> AUTO_READ_SYNC_KEY = AttributeKey.valueOf("AutoReadSync");
21+
private final Channel channel;
22+
private final ChannelConfig config;
23+
private final BitSet ids;
24+
private final BitSet toggles;
25+
26+
AutoReadSync(Channel channel) {
27+
this.channel = channel;
28+
this.config = channel.config();
29+
this.ids = new BitSet();
30+
this.toggles = new BitSet();
31+
}
32+
33+
static Handle from(Channel channel) {
34+
assert channel.eventLoop().inEventLoop();
35+
var autoRead = channel.attr(AUTO_READ_SYNC_KEY).get();
36+
if (autoRead == null) {
37+
autoRead = new AutoReadSync(channel);
38+
channel.attr(AUTO_READ_SYNC_KEY).set(autoRead);
39+
}
40+
return autoRead.getHandle();
41+
}
42+
43+
Handle getHandle() {
44+
var nextId = ids.nextClearBit(0);
45+
ids.set(nextId, true);
46+
return new Handle(nextId);
47+
}
48+
49+
class Handle {
50+
private final int id;
51+
private boolean released;
52+
53+
Handle(int id) {
54+
this.id = id;
55+
}
56+
57+
private void assertState() {
58+
assert channel.eventLoop().inEventLoop();
59+
assert released == false;
60+
}
61+
62+
boolean isEnabled() {
63+
assertState();
64+
return toggles.get(id) == false;
65+
}
66+
67+
void enable() {
68+
assertState();
69+
toggles.set(id, false);
70+
config.setAutoRead(toggles.isEmpty());
71+
}
72+
73+
void disable() {
74+
assertState();
75+
toggles.set(id, true);
76+
config.setAutoRead(false);
77+
}
78+
79+
void release() {
80+
assertState();
81+
enable();
82+
ids.set(id, false);
83+
released = true;
84+
}
85+
}
86+
87+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
3737

3838
private final HttpValidator validator;
3939
private final ThreadContext threadContext;
40+
private AutoReadSync.Handle autoRead;
4041
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
4142
private State state = WAITING_TO_START;
4243

@@ -45,12 +46,14 @@ public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadCo
4546
this.threadContext = threadContext;
4647
}
4748

48-
State getState() {
49-
return state;
49+
@Override
50+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
51+
autoRead = AutoReadSync.from(ctx.channel());
52+
super.channelRegistered(ctx);
5053
}
5154

52-
boolean canAutoRead() {
53-
return state != QUEUEING_DATA && state != DROPPING_DATA_PERMANENTLY;
55+
State getState() {
56+
return state;
5457
}
5558

5659
@SuppressWarnings("fallthrough")
@@ -65,7 +68,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6568
pending.add(ReferenceCountUtil.retain(httpObject));
6669
requestStart(ctx);
6770
assert state == QUEUEING_DATA;
68-
assert ctx.channel().config().isAutoRead() == false;
71+
assert autoRead.isEnabled() == false;
6972
break;
7073
case QUEUEING_DATA:
7174
pending.add(ReferenceCountUtil.retain(httpObject));
@@ -87,7 +90,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
8790
case DROPPING_DATA_PERMANENTLY:
8891
assert pending.isEmpty();
8992
ReferenceCountUtil.release(httpObject); // consume without enqueuing
90-
ctx.channel().config().setAutoRead(false);
93+
autoRead.disable();
9194
break;
9295
}
9396
}
@@ -110,7 +113,7 @@ private void requestStart(ChannelHandlerContext ctx) {
110113
}
111114

112115
state = QUEUEING_DATA;
113-
ctx.channel().config().setAutoRead(false);
116+
autoRead.disable();
114117

115118
if (httpRequest == null) {
116119
// this looks like a malformed request and will forward without validation
@@ -153,10 +156,10 @@ public void onFailure(Exception e) {
153156
private void forwardFullRequest(ChannelHandlerContext ctx) {
154157
Transports.assertDefaultThreadContext(threadContext);
155158
assert ctx.channel().eventLoop().inEventLoop();
156-
assert ctx.channel().config().isAutoRead() == false;
159+
assert autoRead.isEnabled() == false;
157160
assert state == QUEUEING_DATA;
158161

159-
ctx.channel().config().setAutoRead(true);
162+
autoRead.enable();
160163
boolean fullRequestForwarded = forwardData(ctx, pending);
161164

162165
assert fullRequestForwarded || pending.isEmpty();
@@ -173,7 +176,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
173176
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
174177
Transports.assertDefaultThreadContext(threadContext);
175178
assert ctx.channel().eventLoop().inEventLoop();
176-
assert ctx.channel().config().isAutoRead() == false;
179+
assert autoRead.isEnabled() == false;
177180
assert state == QUEUEING_DATA;
178181

179182
HttpObject messageToForward = pending.getFirst();
@@ -184,7 +187,7 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
184187
}
185188
messageToForward.setDecoderResult(DecoderResult.failure(e));
186189

187-
ctx.channel().config().setAutoRead(true);
190+
autoRead.enable();
188191
ctx.fireChannelRead(messageToForward);
189192

190193
assert fullRequestDropped || pending.isEmpty();

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27-
import java.util.function.Supplier;
2827

2928
/**
3029
* Netty based implementation of {@link HttpBody.Stream}.
@@ -37,15 +36,16 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
3736
private final Channel channel;
3837
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
3938
private final ThreadContext threadContext;
40-
private final Supplier<Boolean> canAutoRead;
39+
private final AutoReadSync.Handle autoRead;
4140
private final ThreadWatchdog.ActivityTracker activityTracker;
4241
private ByteBuf buf;
4342
private boolean requested = false;
4443
private final ChannelFutureListener closeListener = future -> doClose();
45-
private boolean closed = false;
4644
private HttpBody.ChunkHandler handler;
4745
private ThreadContext.StoredContext requestContext;
4846
private boolean hasLast = false;
47+
private boolean closed = false;
48+
4949
// used in tests
5050
private volatile int bufSize = 0;
5151

@@ -54,25 +54,9 @@ public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext,
5454
this.threadContext = threadContext;
5555
this.activityTracker = activityTracker;
5656
this.requestContext = threadContext.newStoredContext();
57-
this.canAutoRead = httpValidatorAutoRead(channel);
57+
this.autoRead = AutoReadSync.from(channel);
5858
Netty4Utils.addListener(channel.closeFuture(), closeListener);
59-
channel.config().setAutoRead(false);
60-
}
61-
62-
// HTTP validator has priority on enabling autoRead
63-
private Supplier<Boolean> httpValidatorAutoRead(Channel channel) {
64-
var validator = channel.pipeline().get(Netty4HttpHeaderValidator.class);
65-
if (validator == null) {
66-
return () -> true;
67-
} else {
68-
return validator::canAutoRead;
69-
}
70-
}
71-
72-
private void turnOnAutoRead() {
73-
if (canAutoRead.get()) {
74-
channel.config().setAutoRead(true);
75-
}
59+
autoRead.disable();
7660
}
7761

7862
@Override
@@ -168,8 +152,7 @@ private void send() throws Exception {
168152
handler.onNext(bytesRef, hasLast);
169153
}
170154
if (hasLast) {
171-
turnOnAutoRead();
172-
channel.closeFuture().removeListener(closeListener);
155+
doClose();
173156
}
174157
}
175158

@@ -194,14 +177,15 @@ private void doClose() {
194177
if (handler != null) {
195178
handler.close();
196179
}
180+
} finally {
181+
if (buf != null) {
182+
buf.release();
183+
buf = null;
184+
bufSize = 0;
185+
}
186+
autoRead.release();
187+
channel.closeFuture().removeListener(closeListener);
197188
}
198-
if (buf != null) {
199-
buf.release();
200-
buf = null;
201-
bufSize = 0;
202-
}
203-
turnOnAutoRead();
204-
channel.closeFuture().removeListener(closeListener);
205189
}
206190
}
207191

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public void setUp() throws Exception {
6262
}
6363

6464
private void reset() {
65-
channel = new EmbeddedChannel();
6665
header.set(null);
6766
listener.set(null);
6867
validationException.set(null);
@@ -75,7 +74,7 @@ private void reset() {
7574
listener.set(validationCompleteListener);
7675
};
7776
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
78-
channel.pipeline().addLast(netty4HttpHeaderValidator);
77+
channel = new EmbeddedChannel(true, false, netty4HttpHeaderValidator);
7978
}
8079

8180
public void testValidationPausesAndResumesData() {

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.test.ESTestCase;
2727

2828
import java.util.ArrayList;
29-
import java.util.HashMap;
3029
import java.util.Map;
3130
import java.util.concurrent.atomic.AtomicBoolean;
3231
import java.util.concurrent.atomic.AtomicInteger;
@@ -170,16 +169,7 @@ public void close() {
170169
assertThat(headers.get(), hasEntry("header1", "value1"));
171170
assertThat(headers.get(), hasEntry("header2", "value2"));
172171
assertThat(headers.get(), hasEntry("header3", "value3"));
173-
174172
assertTrue("should receive last content", gotLast.get());
175-
176-
headers.set(new HashMap<>());
177-
178-
stream.close();
179-
180-
assertThat(headers.get(), hasEntry("header1", "value1"));
181-
assertThat(headers.get(), hasEntry("header2", "value2"));
182-
assertThat(headers.get(), hasEntry("header3", "value3"));
183173
}
184174

185175
HttpContent randomContent(int size, boolean isLast) {

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,9 +970,9 @@ public void testMultipleValidationsOnTheSameChannel() throws InterruptedExceptio
970970
final Set<String> okURIs = ConcurrentHashMap.newKeySet();
971971
final Set<String> nokURIs = ConcurrentHashMap.newKeySet();
972972
final SetOnce<Channel> channelSetOnce = new SetOnce<>();
973-
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
973+
final HttpServerTransport.Dispatcher dispatcher = new AggregatingDispatcher() {
974974
@Override
975-
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
975+
public void dispatchAggregatedRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
976976
assertThat(okURIs.contains(request.uri()), is(true));
977977
// assert validated request is dispatched
978978
okURIs.remove(request.uri());

server/src/test/java/org/elasticsearch/rest/RestContentAggregatorTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import static org.elasticsearch.rest.RestRequest.Method.GET;
2929
import static org.elasticsearch.test.rest.FakeRestRequest.FakeHttpRequest;
30-
import static org.hamcrest.Matchers.isA;
3130

3231
public class RestContentAggregatorTests extends ESTestCase {
3332

@@ -39,7 +38,7 @@ static RestRequest restRequest(HttpStream stream) {
3938
return restRequest(new FakeHttpRequest(GET, "/", stream, Map.of("content-length", List.of("" + stream.contentLength))));
4039
}
4140

42-
static RestChannel restChan(RestRequest request) {
41+
static FakeRestChannel restChan(RestRequest request) {
4342
return new FakeRestChannel(request, false, 1);
4443
}
4544

@@ -71,12 +70,17 @@ public void testNoContent() {
7170
}
7271
}
7372

74-
public void testUnexpectedContent() {
73+
public void testUnexpectedContent() throws Exception {
7574
try (var stream = HttpStream.of(new byte[] { 1 })) {
7675
var httpReq = new FakeHttpRequest(GET, "/", stream, Map.of()); // no content-length header, assumes empty
7776
var request = restRequest(httpReq);
78-
RestContentAggregator.aggregate(request, restChan(request), (r, c) -> {});
79-
assertThat(safeAwait(stream.err), isA(IllegalStateException.class));
77+
var chan = restChan(request);
78+
RestContentAggregator.aggregate(request, chan, (r, c) -> {});
79+
assertBusy(() -> {
80+
var resp = chan.capturedResponse();
81+
assertNotNull(resp);
82+
assertEquals(RestStatus.BAD_REQUEST, resp.status());
83+
});
8084
}
8185
}
8286

0 commit comments

Comments
 (0)