Skip to content

Commit 6e97ca1

Browse files
committed
add gated auto read
1 parent 1b821e9 commit 6e97ca1

File tree

5 files changed

+176
-10
lines changed

5 files changed

+176
-10
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.Channel;
13+
import io.netty.channel.ChannelConfig;
14+
import io.netty.util.AttributeKey;
15+
16+
import org.elasticsearch.core.Releasable;
17+
18+
import java.util.BitSet;
19+
20+
/**
21+
* This class provides gated toggle to {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}.
22+
* Channel handlers should get own instance of {@link Gate} to change autoRead config. Setting any
23+
* Gate to false will turn off auto-read. The auto-read will turn-on once all gates are set to true,
24+
* or closed ({@link Releasable#close()}).
25+
* <br><br>
26+
* This class is not thread-safe and should be used inside channels' {@link io.netty.channel.EventLoop}.
27+
* <br><br>
28+
* When channel pipeline has multiple handlers that toggle autoRead it might become problematic.
29+
* For example, we use HTTP pipelining feature, that means we dont block incoming requests and
30+
* several ones can be flow in the pipeline. Some request might wait for authentication and some hit by
31+
* backpressure, abd both controlled by channel autoRead. The backpressure handler might allow unauthenticated
32+
* content keep flowing to the Authentication handler.
33+
*/
34+
public class GatedAutoRead {
35+
36+
private static final AttributeKey<GatedAutoRead> GATED_AUTO_READ = AttributeKey.valueOf("GatedAutoRead");
37+
private final Channel channel;
38+
private final ChannelConfig config;
39+
private final BitSet gateIds = new BitSet(1);
40+
private final BitSet gateStates = new BitSet(1);
41+
42+
GatedAutoRead(Channel channel) {
43+
this.channel = channel;
44+
this.config = channel.config();
45+
}
46+
47+
/**
48+
* Returns a new instance to toggle channel's autoRead.
49+
* AutoRead will be set to true when ALL gates toggled to true. Toggle-off applies immediately.
50+
* Must be closed when goes out of scope, otherwise might leave channel permanently in disabled state.
51+
*/
52+
public static Gate newGate(Channel channel) {
53+
assert channel.eventLoop().inEventLoop();
54+
var gates = channel.attr(GATED_AUTO_READ).get();
55+
if (gates == null) {
56+
gates = new GatedAutoRead(channel);
57+
channel.attr(GATED_AUTO_READ).set(gates);
58+
}
59+
return gates.newGate();
60+
}
61+
62+
Gate newGate() {
63+
return new Gate();
64+
}
65+
66+
public class Gate implements Releasable {
67+
final int id;
68+
69+
Gate() {
70+
this.id = gateIds.nextClearBit(0); // next unused id
71+
gateIds.set(id, true);
72+
}
73+
74+
/**
75+
* Set {@link ChannelConfig#setAutoRead(boolean)}. Setting to false will take immediate effect.
76+
* Setting to true will apply only when ALL gates are set to true.
77+
*/
78+
public void set(boolean flag) {
79+
assert channel.eventLoop().inEventLoop();
80+
gateStates.set(id, flag == false); // gate=true means closed, autoRead=false
81+
config.setAutoRead(gateStates.isEmpty());
82+
}
83+
84+
/**
85+
* @return {@link ChannelConfig#isAutoRead()}.
86+
*/
87+
public boolean get() {
88+
return config.isAutoRead();
89+
}
90+
91+
@Override
92+
public void close() {
93+
set(true);
94+
gateIds.set(id, false); // release id
95+
}
96+
}
97+
98+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
3737

3838
private final HttpValidator validator;
3939
private final ThreadContext threadContext;
40+
private GatedAutoRead.Gate autoRead;
4041
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
4142
private State state = WAITING_TO_START;
4243

@@ -49,6 +50,12 @@ State getState() {
4950
return state;
5051
}
5152

53+
@Override
54+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
55+
autoRead = GatedAutoRead.newGate(ctx.channel());
56+
super.channelActive(ctx);
57+
}
58+
5259
@SuppressWarnings("fallthrough")
5360
@Override
5461
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -61,7 +68,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6168
pending.add(ReferenceCountUtil.retain(httpObject));
6269
requestStart(ctx);
6370
assert state == QUEUEING_DATA;
64-
assert ctx.channel().config().isAutoRead() == false;
71+
assert autoRead.get() == false;
6572
break;
6673
case QUEUEING_DATA:
6774
pending.add(ReferenceCountUtil.retain(httpObject));
@@ -83,7 +90,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
8390
case DROPPING_DATA_PERMANENTLY:
8491
assert pending.isEmpty();
8592
ReferenceCountUtil.release(httpObject); // consume without enqueuing
86-
ctx.channel().config().setAutoRead(false);
93+
autoRead.set(false);
8794
break;
8895
}
8996
}
@@ -106,7 +113,7 @@ private void requestStart(ChannelHandlerContext ctx) {
106113
}
107114

108115
state = QUEUEING_DATA;
109-
ctx.channel().config().setAutoRead(false);
116+
autoRead.set(false);
110117

111118
if (httpRequest == null) {
112119
// this looks like a malformed request and will forward without validation
@@ -149,10 +156,10 @@ public void onFailure(Exception e) {
149156
private void forwardFullRequest(ChannelHandlerContext ctx) {
150157
Transports.assertDefaultThreadContext(threadContext);
151158
assert ctx.channel().eventLoop().inEventLoop();
152-
assert ctx.channel().config().isAutoRead() == false;
159+
assert autoRead.get() == false;
153160
assert state == QUEUEING_DATA;
154161

155-
ctx.channel().config().setAutoRead(true);
162+
autoRead.set(true);
156163
boolean fullRequestForwarded = forwardData(ctx, pending);
157164

158165
assert fullRequestForwarded || pending.isEmpty();
@@ -169,7 +176,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
169176
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
170177
Transports.assertDefaultThreadContext(threadContext);
171178
assert ctx.channel().eventLoop().inEventLoop();
172-
assert ctx.channel().config().isAutoRead() == false;
179+
assert autoRead.get() == false;
173180
assert state == QUEUEING_DATA;
174181

175182
HttpObject messageToForward = pending.getFirst();
@@ -180,7 +187,7 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
180187
}
181188
messageToForward.setDecoderResult(DecoderResult.failure(e));
182189

183-
ctx.channel().config().setAutoRead(true);
190+
autoRead.set(true);
184191
ctx.fireChannelRead(messageToForward);
185192

186193
assert fullRequestDropped || pending.isEmpty();

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
3434

3535
private final Channel channel;
36+
private final GatedAutoRead.Gate autoRead;
3637
private final ChannelFutureListener closeListener = future -> doClose();
3738
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
3839
private final ThreadContext threadContext;
@@ -48,10 +49,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
4849

4950
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) {
5051
this.channel = channel;
52+
this.autoRead = GatedAutoRead.newGate(channel);
5153
this.threadContext = threadContext;
5254
this.requestContext = threadContext.newStoredContext();
5355
Netty4Utils.addListener(channel.closeFuture(), closeListener);
54-
channel.config().setAutoRead(false);
56+
autoRead.set(false);
5557
}
5658

5759
@Override
@@ -142,7 +144,7 @@ private void send() {
142144
handler.onNext(bytesRef, hasLast);
143145
}
144146
if (hasLast) {
145-
channel.config().setAutoRead(true);
147+
autoRead.close();
146148
channel.closeFuture().removeListener(closeListener);
147149
}
148150
}
@@ -171,6 +173,6 @@ private void doClose() {
171173
buf = null;
172174
bufSize = 0;
173175
}
174-
channel.config().setAutoRead(true);
176+
autoRead.close();
175177
}
176178
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.embedded.EmbeddedChannel;
13+
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.test.ESTestCase;
16+
17+
import java.util.stream.IntStream;
18+
19+
public class GatedAutoReadTests extends ESTestCase {
20+
21+
public void testToggleGates() {
22+
var chan = new EmbeddedChannel();
23+
var config = chan.config();
24+
config.setAutoRead(true);
25+
26+
var n = between(10, 20);
27+
var gates = IntStream.range(0, n).mapToObj(i -> GatedAutoRead.newGate(chan)).toList();
28+
assertTrue("new gate should preserve current value", config.isAutoRead());
29+
30+
var gateId1 = between(0, n);
31+
var gateId2 = (gateId1 + 1) % n;
32+
33+
var autoRead1 = gates.get(gateId1);
34+
var autoRead2 = gates.get(gateId2);
35+
36+
autoRead1.set(false);
37+
assertFalse("any gate toggle-off should disable reads", config.isAutoRead());
38+
39+
autoRead2.set(false);
40+
autoRead1.set(true);
41+
assertFalse("toggle-on should not enable reads while other gate is off", config.isAutoRead());
42+
43+
autoRead2.close();
44+
assertTrue("removing toggled-off gate should enable reads", config.isAutoRead());
45+
}
46+
47+
public void testGateIdReuse() {
48+
var chan = new EmbeddedChannel();
49+
for (int i = 0; i < between(1, 100); i++) {
50+
var n = between(1, 10);
51+
var gates = IntStream.range(0, n).mapToObj(j -> GatedAutoRead.newGate(chan)).toList();
52+
for (int j = 0; j < gates.size(); j++) {
53+
assertEquals(j, gates.get(j).id);
54+
}
55+
Releasables.close(gates);
56+
}
57+
}
58+
}

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private void reset() {
7676
};
7777
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
7878
channel.pipeline().addLast(netty4HttpHeaderValidator);
79+
channel.pipeline().fireChannelActive();
7980
}
8081

8182
public void testValidationPausesAndResumesData() {

0 commit comments

Comments
 (0)