Skip to content

Commit a9e220e

Browse files
authored
Prevent ThreadContext header leak when sending response (#68649)
We need to stash thread context in DefaultRestChannel before we call channel.sendResponse because when calling this method, our execution might be delayed and the thread be reused for another task - like sending another response. And it would see thread context from the initial “delayed” work. This commit also expands the assertions on the empty thread context to make sure it does not contains response headers. closes #68278
1 parent 02fc46a commit a9e220e

File tree

7 files changed

+29
-4
lines changed

7 files changed

+29
-4
lines changed

docs/changelog/68649.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 68649
2+
summary: Prevent `ThreadContext` header leak when sending response
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 68278

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import org.elasticsearch.ExceptionsHelper;
1616
import org.elasticsearch.http.HttpPipelinedRequest;
17+
import org.elasticsearch.transport.Transports;
1718

1819
@ChannelHandler.Sharable
1920
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
@@ -26,6 +27,8 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
2627

2728
@Override
2829
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
30+
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
31+
assert Transports.assertTransportThread();
2932
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
3033
boolean success = false;
3134
try {
@@ -41,6 +44,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest http
4144
@Override
4245
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
4346
ExceptionsHelper.maybeDieOnAnotherThread(cause);
47+
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
48+
4449
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
4550
if (cause instanceof Error) {
4651
serverTransport.onException(channel, new Exception(cause));

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
304304
protected void initChannel(Channel ch) throws Exception {
305305
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
306306
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
307-
ch.pipeline().addLast("chunked_writer", new Netty4WriteThrottlingHandler());
307+
ch.pipeline().addLast("chunked_writer", new Netty4WriteThrottlingHandler(transport.getThreadPool().getThreadContext()));
308308
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
309309
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
310310
final HttpRequestDecoder decoder = new HttpRequestDecoder(

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ private void setupPipeline(Channel ch) {
374374
ch.pipeline()
375375
.addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE)
376376
.addLast("logging", ESLoggingHandler.INSTANCE)
377-
.addLast("chunked_writer", new Netty4WriteThrottlingHandler())
377+
.addLast("chunked_writer", new Netty4WriteThrottlingHandler(getThreadPool().getThreadContext()))
378378
.addLast("dispatcher", new Netty4MessageInboundHandler(this, recycler));
379379
}
380380

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import io.netty.channel.ChannelHandlerContext;
1616
import io.netty.channel.ChannelPromise;
1717

18+
import org.elasticsearch.common.util.concurrent.ThreadContext;
19+
import org.elasticsearch.transport.Transports;
20+
1821
import java.nio.channels.ClosedChannelException;
1922
import java.util.ArrayDeque;
2023
import java.util.Queue;
@@ -28,13 +31,18 @@ public final class Netty4WriteThrottlingHandler extends ChannelDuplexHandler {
2831

2932
private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();
3033

34+
private final ThreadContext threadContext;
3135
private WriteOperation currentWrite;
3236

33-
public Netty4WriteThrottlingHandler() {}
37+
public Netty4WriteThrottlingHandler(ThreadContext threadContext) {
38+
this.threadContext = threadContext;
39+
}
3440

3541
@Override
3642
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
3743
assert msg instanceof ByteBuf;
44+
assert Transports.assertDefaultThreadContext(threadContext);
45+
assert Transports.assertTransportThread();
3846
final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
3947
assert queued;
4048
}

server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,4 +492,8 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
492492
return ActionListener.noop();
493493
}
494494
}
495+
496+
public ThreadPool getThreadPool() {
497+
return threadPool;
498+
}
495499
}

server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ public void sendResponse(RestResponse restResponse) {
131131
addCookies(httpResponse);
132132

133133
ActionListener<Void> listener = ActionListener.wrap(() -> Releasables.close(toClose));
134-
httpChannel.sendResponse(httpResponse, listener);
134+
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
135+
httpChannel.sendResponse(httpResponse, listener);
136+
}
135137
success = true;
136138
} finally {
137139
if (success == false) {

0 commit comments

Comments
 (0)