Skip to content

Commit a9dac43

Browse files
authored
Http stream activity tracker and exceptions handling (#119564) (#119712)
1 parent 561e50c commit a9dac43

File tree

5 files changed

+123
-50
lines changed

5 files changed

+123
-50
lines changed

docs/changelog/119564.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119564
2+
summary: Http stream activity tracker and exceptions handling
3+
area: Network
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel
594594
@Override
595595
public void close() throws Exception {
596596
safeGet(clientChannel.close());
597-
safeGet(clientBootstrap.config().group().shutdownGracefully());
597+
safeGet(clientBootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS));
598598
clientRespQueue.forEach(o -> { if (o instanceof FullHttpResponse resp) resp.release(); });
599599
for (var opaqueId : ControlServerRequestPlugin.handlers.keySet()) {
600600
if (opaqueId.startsWith(testName)) {

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
139139
} else {
140140
var contentStream = new Netty4HttpRequestBodyStream(
141141
ctx.channel(),
142-
serverTransport.getThreadPool().getThreadContext()
142+
serverTransport.getThreadPool().getThreadContext(),
143+
activityTracker
143144
);
144145
currentRequestStream = contentStream;
145146
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.netty.handler.codec.http.HttpContent;
1717
import io.netty.handler.codec.http.LastHttpContent;
1818

19+
import org.elasticsearch.common.network.ThreadWatchdog;
1920
import org.elasticsearch.common.util.concurrent.ThreadContext;
2021
import org.elasticsearch.core.Releasables;
2122
import org.elasticsearch.http.HttpBody;
@@ -36,6 +37,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
3637
private final ChannelFutureListener closeListener = future -> doClose();
3738
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
3839
private final ThreadContext threadContext;
40+
private final ThreadWatchdog.ActivityTracker activityTracker;
3941
private ByteBuf buf;
4042
private boolean requested = false;
4143
private boolean closing = false;
@@ -46,10 +48,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
4648
private volatile int bufSize = 0;
4749
private volatile boolean hasLast = false;
4850

49-
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) {
51+
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) {
5052
this.channel = channel;
5153
this.threadContext = threadContext;
5254
this.requestContext = threadContext.newStoredContext();
55+
this.activityTracker = activityTracker;
5356
Netty4Utils.addListener(channel.closeFuture(), closeListener);
5457
channel.config().setAutoRead(false);
5558
}
@@ -76,15 +79,18 @@ public void next() {
7679
assert handler != null : "handler must be set before requesting next chunk";
7780
requestContext = threadContext.newStoredContext();
7881
channel.eventLoop().submit(() -> {
82+
activityTracker.startActivity();
7983
requested = true;
80-
if (buf == null) {
81-
channel.read();
82-
} else {
83-
try {
84+
try {
85+
if (buf == null) {
86+
channel.read();
87+
} else {
8488
send();
85-
} catch (Exception e) {
86-
channel.pipeline().fireExceptionCaught(e);
8789
}
90+
} catch (Throwable e) {
91+
channel.pipeline().fireExceptionCaught(e);
92+
} finally {
93+
activityTracker.stopActivity();
8894
}
8995
});
9096
}

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java

Lines changed: 102 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import io.netty.buffer.Unpooled;
1313
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelInboundHandlerAdapter;
15+
import io.netty.channel.DefaultEventLoop;
1416
import io.netty.channel.SimpleChannelInboundHandler;
1517
import io.netty.channel.embedded.EmbeddedChannel;
1618
import io.netty.handler.codec.http.DefaultHttpContent;
@@ -19,6 +21,7 @@
1921
import io.netty.handler.flow.FlowControlHandler;
2022

2123
import org.elasticsearch.common.bytes.ReleasableBytesReference;
24+
import org.elasticsearch.common.network.ThreadWatchdog;
2225
import org.elasticsearch.common.settings.Settings;
2326
import org.elasticsearch.common.util.concurrent.ThreadContext;
2427
import org.elasticsearch.http.HttpBody;
@@ -27,6 +30,8 @@
2730
import java.util.ArrayList;
2831
import java.util.HashMap;
2932
import java.util.Map;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
3035
import java.util.concurrent.atomic.AtomicBoolean;
3136
import java.util.concurrent.atomic.AtomicInteger;
3237
import java.util.concurrent.atomic.AtomicReference;
@@ -35,17 +40,18 @@
3540

3641
public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
3742

43+
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
3844
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
3945
private EmbeddedChannel channel;
4046
private Netty4HttpRequestBodyStream stream;
41-
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
47+
private ThreadWatchdog.ActivityTracker activityTracker;
4248

4349
@Override
4450
public void setUp() throws Exception {
4551
super.setUp();
4652
channel = new EmbeddedChannel();
47-
threadContext.putHeader("header1", "value1");
48-
stream = new Netty4HttpRequestBodyStream(channel, threadContext);
53+
activityTracker = new ThreadWatchdog.ActivityTracker();
54+
stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker);
4955
stream.setHandler(discardHandler); // set default handler, each test might override one
5056
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
5157
@Override
@@ -128,57 +134,112 @@ public void testReadFromChannel() {
128134
}
129135

130136
public void testReadFromHasCorrectThreadContext() throws InterruptedException {
131-
var gotLast = new AtomicBoolean(false);
132137
AtomicReference<Map<String, String>> headers = new AtomicReference<>();
133-
stream.setHandler(new HttpBody.ChunkHandler() {
134-
@Override
135-
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
136-
headers.set(threadContext.getHeaders());
137-
gotLast.set(isLast);
138-
chunk.close();
139-
}
140-
141-
@Override
142-
public void close() {
143-
headers.set(threadContext.getHeaders());
144-
}
145-
});
146-
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
138+
var eventLoop = new DefaultEventLoop();
139+
var gotLast = new AtomicBoolean(false);
147140
var chunkSize = 1024;
141+
threadContext.putHeader("header1", "value1");
142+
try {
143+
// activity tracker requires stream execution in the same thread, setting up stream inside event-loop
144+
eventLoop.submit(() -> {
145+
channel = new EmbeddedChannel();
146+
stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker());
147+
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
148+
@Override
149+
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
150+
stream.handleNettyContent(msg);
151+
}
152+
});
153+
stream.setHandler(new HttpBody.ChunkHandler() {
154+
@Override
155+
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
156+
headers.set(threadContext.getHeaders());
157+
gotLast.set(isLast);
158+
chunk.close();
159+
}
160+
161+
@Override
162+
public void close() {
163+
headers.set(threadContext.getHeaders());
164+
}
165+
});
166+
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
167+
}).await();
148168

149-
channel.writeInbound(randomContent(chunkSize));
150-
channel.writeInbound(randomLastContent(chunkSize));
169+
channel.writeInbound(randomContent(chunkSize));
170+
channel.writeInbound(randomLastContent(chunkSize));
151171

152-
threadContext.putHeader("header2", "value2");
153-
stream.next();
172+
threadContext.putHeader("header2", "value2");
173+
stream.next();
154174

155-
Thread thread = new Thread(() -> channel.runPendingTasks());
156-
thread.start();
157-
thread.join();
175+
eventLoop.submit(() -> channel.runPendingTasks()).await();
176+
assertThat(headers.get(), hasEntry("header1", "value1"));
177+
assertThat(headers.get(), hasEntry("header2", "value2"));
158178

159-
assertThat(headers.get(), hasEntry("header1", "value1"));
160-
assertThat(headers.get(), hasEntry("header2", "value2"));
179+
threadContext.putHeader("header3", "value3");
180+
stream.next();
161181

162-
threadContext.putHeader("header3", "value3");
163-
stream.next();
182+
eventLoop.submit(() -> channel.runPendingTasks()).await();
183+
assertThat(headers.get(), hasEntry("header1", "value1"));
184+
assertThat(headers.get(), hasEntry("header2", "value2"));
185+
assertThat(headers.get(), hasEntry("header3", "value3"));
164186

165-
thread = new Thread(() -> channel.runPendingTasks());
166-
thread.start();
167-
thread.join();
187+
assertTrue("should receive last content", gotLast.get());
168188

169-
assertThat(headers.get(), hasEntry("header1", "value1"));
170-
assertThat(headers.get(), hasEntry("header2", "value2"));
171-
assertThat(headers.get(), hasEntry("header3", "value3"));
189+
headers.set(new HashMap<>());
172190

173-
assertTrue("should receive last content", gotLast.get());
191+
stream.close();
192+
193+
assertThat(headers.get(), hasEntry("header1", "value1"));
194+
assertThat(headers.get(), hasEntry("header2", "value2"));
195+
assertThat(headers.get(), hasEntry("header3", "value3"));
196+
} finally {
197+
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
198+
}
199+
}
174200

175-
headers.set(new HashMap<>());
201+
public void testStreamNextActivityTracker() {
202+
var t0 = activityTracker.get();
203+
var N = between(1, 10);
204+
for (int i = 0; i < N; i++) {
205+
channel.writeInbound(randomContent(1024));
206+
stream.next();
207+
channel.runPendingTasks();
208+
}
209+
var t1 = activityTracker.get();
210+
assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L);
211+
}
176212

177-
stream.close();
213+
// ensure that we catch all exceptions and throw them into channel pipeline
214+
public void testCatchExceptions() {
215+
var gotExceptions = new CountDownLatch(3); // number of tests below
216+
217+
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
218+
@Override
219+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
220+
gotExceptions.countDown();
221+
}
222+
});
223+
224+
// catch exception for not buffered chunk, will be thrown on channel.fireChannelRead()
225+
stream.setHandler((a, b) -> { throw new RuntimeException(); });
226+
stream.next();
227+
channel.runPendingTasks();
228+
channel.writeInbound(randomContent(1));
229+
230+
// catch exception for buffered chunk, will be thrown from eventLoop.submit()
231+
channel.writeInbound(randomContent(1));
232+
stream.next();
233+
channel.runPendingTasks();
234+
235+
// should catch OOM exceptions too, see DieWithDignity
236+
// swallowing exceptions can result in dangling streams, hanging channels, and delayed shutdowns
237+
stream.setHandler((a, b) -> { throw new OutOfMemoryError(); });
238+
channel.writeInbound(randomContent(1));
239+
stream.next();
240+
channel.runPendingTasks();
178241

179-
assertThat(headers.get(), hasEntry("header1", "value1"));
180-
assertThat(headers.get(), hasEntry("header2", "value2"));
181-
assertThat(headers.get(), hasEntry("header3", "value3"));
242+
safeAwait(gotExceptions);
182243
}
183244

184245
HttpContent randomContent(int size, boolean isLast) {

0 commit comments

Comments
 (0)