Skip to content

Commit 23e1116

Browse files
authored
Ensure thread context set for streaming (#115683)
Currently the thread context is lost between streaming context switches. This commit ensures that each time the thread context is properly set before providing new data to the stream.
1 parent 6182921 commit 23e1116

File tree

11 files changed

+148
-96
lines changed

11 files changed

+148
-96
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
137137
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
138138
currentRequestStream = null;
139139
} else {
140-
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
140+
var contentStream = new Netty4HttpRequestBodyStream(
141+
ctx.channel(),
142+
serverTransport.getThreadPool().getThreadContext()
143+
);
141144
currentRequestStream = contentStream;
142145
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
143146
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.netty.handler.codec.http.HttpContent;
1717
import io.netty.handler.codec.http.LastHttpContent;
1818

19+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1920
import org.elasticsearch.core.Releasables;
2021
import org.elasticsearch.http.HttpBody;
2122
import org.elasticsearch.transport.netty4.Netty4Utils;
@@ -34,14 +35,18 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
3435
private final Channel channel;
3536
private final ChannelFutureListener closeListener = future -> doClose();
3637
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
38+
private final ThreadContext threadContext;
3739
private ByteBuf buf;
3840
private boolean hasLast = false;
3941
private boolean requested = false;
4042
private boolean closing = false;
4143
private HttpBody.ChunkHandler handler;
44+
private ThreadContext.StoredContext requestContext;
4245

43-
public Netty4HttpRequestBodyStream(Channel channel) {
46+
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) {
4447
this.channel = channel;
48+
this.threadContext = threadContext;
49+
this.requestContext = threadContext.newStoredContext();
4550
Netty4Utils.addListener(channel.closeFuture(), closeListener);
4651
channel.config().setAutoRead(false);
4752
}
@@ -66,6 +71,7 @@ public void addTracingHandler(ChunkHandler chunkHandler) {
6671
public void next() {
6772
assert closing == false : "cannot request next chunk on closing stream";
6873
assert handler != null : "handler must be set before requesting next chunk";
74+
requestContext = threadContext.newStoredContext();
6975
channel.eventLoop().submit(() -> {
7076
requested = true;
7177
if (buf == null) {
@@ -108,11 +114,6 @@ private void addChunk(ByteBuf chunk) {
108114
}
109115
}
110116

111-
// visible for test
112-
Channel channel() {
113-
return channel;
114-
}
115-
116117
// visible for test
117118
ByteBuf buf() {
118119
return buf;
@@ -129,10 +130,12 @@ private void send() {
129130
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
130131
requested = false;
131132
buf = null;
132-
for (var tracer : tracingHandlers) {
133-
tracer.onNext(bytesRef, hasLast);
133+
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
134+
for (var tracer : tracingHandlers) {
135+
tracer.onNext(bytesRef, hasLast);
136+
}
137+
handler.onNext(bytesRef, hasLast);
134138
}
135-
handler.onNext(bytesRef, hasLast);
136139
if (hasLast) {
137140
channel.config().setAutoRead(true);
138141
channel.closeFuture().removeListener(closeListener);
@@ -150,11 +153,13 @@ public void close() {
150153

151154
private void doClose() {
152155
closing = true;
153-
for (var tracer : tracingHandlers) {
154-
Releasables.closeExpectNoException(tracer);
155-
}
156-
if (handler != null) {
157-
handler.close();
156+
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
157+
for (var tracer : tracingHandlers) {
158+
Releasables.closeExpectNoException(tracer);
159+
}
160+
if (handler != null) {
161+
handler.close();
162+
}
158163
}
159164
if (buf != null) {
160165
buf.release();

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,33 @@
1919
import io.netty.handler.flow.FlowControlHandler;
2020

2121
import org.elasticsearch.common.bytes.ReleasableBytesReference;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2224
import org.elasticsearch.http.HttpBody;
2325
import org.elasticsearch.test.ESTestCase;
2426

2527
import java.util.ArrayList;
28+
import java.util.HashMap;
29+
import java.util.Map;
2630
import java.util.concurrent.atomic.AtomicBoolean;
2731
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
34+
import static org.hamcrest.Matchers.hasEntry;
2835

2936
public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
3037

31-
EmbeddedChannel channel;
32-
Netty4HttpRequestBodyStream stream;
38+
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
39+
private EmbeddedChannel channel;
40+
private Netty4HttpRequestBodyStream stream;
3341
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
3442

3543
@Override
3644
public void setUp() throws Exception {
3745
super.setUp();
3846
channel = new EmbeddedChannel();
39-
stream = new Netty4HttpRequestBodyStream(channel);
47+
threadContext.putHeader("header1", "value1");
48+
stream = new Netty4HttpRequestBodyStream(channel, threadContext);
4049
stream.setHandler(discardHandler); // set default handler, each test might override one
4150
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
4251
@Override
@@ -118,6 +127,60 @@ public void testReadFromChannel() {
118127
assertTrue("should receive last content", gotLast.get());
119128
}
120129

130+
public void testReadFromHasCorrectThreadContext() throws InterruptedException {
131+
var gotLast = new AtomicBoolean(false);
132+
AtomicReference<Map<String, String>> headers = new AtomicReference<>();
133+
stream.setHandler(new HttpBody.ChunkHandler() {
134+
@Override
135+
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
136+
headers.set(threadContext.getHeaders());
137+
gotLast.set(isLast);
138+
chunk.close();
139+
}
140+
141+
@Override
142+
public void close() {
143+
headers.set(threadContext.getHeaders());
144+
}
145+
});
146+
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
147+
var chunkSize = 1024;
148+
149+
channel.writeInbound(randomContent(chunkSize));
150+
channel.writeInbound(randomLastContent(chunkSize));
151+
152+
threadContext.putHeader("header2", "value2");
153+
stream.next();
154+
155+
Thread thread = new Thread(() -> channel.runPendingTasks());
156+
thread.start();
157+
thread.join();
158+
159+
assertThat(headers.get(), hasEntry("header1", "value1"));
160+
assertThat(headers.get(), hasEntry("header2", "value2"));
161+
162+
threadContext.putHeader("header3", "value3");
163+
stream.next();
164+
165+
thread = new Thread(() -> channel.runPendingTasks());
166+
thread.start();
167+
thread.join();
168+
169+
assertThat(headers.get(), hasEntry("header1", "value1"));
170+
assertThat(headers.get(), hasEntry("header2", "value2"));
171+
assertThat(headers.get(), hasEntry("header3", "value3"));
172+
173+
assertTrue("should receive last content", gotLast.get());
174+
175+
headers.set(new HashMap<>());
176+
177+
stream.close();
178+
179+
assertThat(headers.get(), hasEntry("header1", "value1"));
180+
assertThat(headers.get(), hasEntry("header2", "value2"));
181+
assertThat(headers.get(), hasEntry("header3", "value3"));
182+
}
183+
121184
HttpContent randomContent(int size, boolean isLast) {
122185
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
123186
if (isLast) {

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.common.settings.ClusterSettings;
1818
import org.elasticsearch.common.settings.Setting;
1919
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
20-
import org.elasticsearch.common.util.concurrent.ThreadContext;
2120
import org.elasticsearch.core.Nullable;
2221
import org.elasticsearch.core.Releasable;
2322
import org.elasticsearch.core.Releasables;
@@ -43,12 +42,10 @@ public class IncrementalBulkService {
4342
private final Client client;
4443
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
4544
private final IndexingPressure indexingPressure;
46-
private final ThreadContext threadContext;
4745

48-
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
46+
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
4947
this.client = client;
5048
this.indexingPressure = indexingPressure;
51-
this.threadContext = threadContext;
5249
}
5350

5451
public Handler newBulkRequest() {
@@ -58,7 +55,7 @@ public Handler newBulkRequest() {
5855

5956
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
6057
ensureEnabled();
61-
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
58+
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
6259
}
6360

6461
private void ensureEnabled() {
@@ -94,7 +91,6 @@ public static class Handler implements Releasable {
9491
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
9592

9693
private final Client client;
97-
private final ThreadContext threadContext;
9894
private final IndexingPressure indexingPressure;
9995
private final ActiveShardCount waitForActiveShards;
10096
private final TimeValue timeout;
@@ -106,22 +102,18 @@ public static class Handler implements Releasable {
106102
private boolean globalFailure = false;
107103
private boolean incrementalRequestSubmitted = false;
108104
private boolean bulkInProgress = false;
109-
private ThreadContext.StoredContext requestContext;
110105
private Exception bulkActionLevelFailure = null;
111106
private long currentBulkSize = 0L;
112107
private BulkRequest bulkRequest = null;
113108

114109
protected Handler(
115110
Client client,
116-
ThreadContext threadContext,
117111
IndexingPressure indexingPressure,
118112
@Nullable String waitForActiveShards,
119113
@Nullable TimeValue timeout,
120114
@Nullable String refresh
121115
) {
122116
this.client = client;
123-
this.threadContext = threadContext;
124-
this.requestContext = threadContext.newStoredContext();
125117
this.indexingPressure = indexingPressure;
126118
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
127119
this.timeout = timeout;
@@ -141,31 +133,28 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
141133
if (shouldBackOff()) {
142134
final boolean isFirstRequest = incrementalRequestSubmitted == false;
143135
incrementalRequestSubmitted = true;
144-
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
145-
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
146-
releasables.clear();
147-
bulkInProgress = true;
148-
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
149-
150-
@Override
151-
public void onResponse(BulkResponse bulkResponse) {
152-
handleBulkSuccess(bulkResponse);
153-
createNewBulkRequest(
154-
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
155-
);
156-
}
157-
158-
@Override
159-
public void onFailure(Exception e) {
160-
handleBulkFailure(isFirstRequest, e);
161-
}
162-
}, () -> {
163-
bulkInProgress = false;
164-
requestContext = threadContext.newStoredContext();
165-
toRelease.forEach(Releasable::close);
166-
nextItems.run();
167-
}));
168-
}
136+
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
137+
releasables.clear();
138+
bulkInProgress = true;
139+
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
140+
141+
@Override
142+
public void onResponse(BulkResponse bulkResponse) {
143+
handleBulkSuccess(bulkResponse);
144+
createNewBulkRequest(
145+
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
146+
);
147+
}
148+
149+
@Override
150+
public void onFailure(Exception e) {
151+
handleBulkFailure(isFirstRequest, e);
152+
}
153+
}, () -> {
154+
bulkInProgress = false;
155+
toRelease.forEach(Releasable::close);
156+
nextItems.run();
157+
}));
169158
} else {
170159
nextItems.run();
171160
}
@@ -187,28 +176,26 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
187176
} else {
188177
assert bulkRequest != null;
189178
if (internalAddItems(items, releasable)) {
190-
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
191-
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
192-
releasables.clear();
193-
// We do not need to set this back to false as this will be the last request.
194-
bulkInProgress = true;
195-
client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() {
196-
197-
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
198-
199-
@Override
200-
public void onResponse(BulkResponse bulkResponse) {
201-
handleBulkSuccess(bulkResponse);
202-
listener.onResponse(combineResponses());
203-
}
179+
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
180+
releasables.clear();
181+
// We do not need to set this back to false as this will be the last request.
182+
bulkInProgress = true;
183+
client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() {
184+
185+
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
186+
187+
@Override
188+
public void onResponse(BulkResponse bulkResponse) {
189+
handleBulkSuccess(bulkResponse);
190+
listener.onResponse(combineResponses());
191+
}
204192

205-
@Override
206-
public void onFailure(Exception e) {
207-
handleBulkFailure(isFirstRequest, e);
208-
errorResponse(listener);
209-
}
210-
}, () -> toRelease.forEach(Releasable::close)));
211-
}
193+
@Override
194+
public void onFailure(Exception e) {
195+
handleBulkFailure(isFirstRequest, e);
196+
errorResponse(listener);
197+
}
198+
}, () -> toRelease.forEach(Releasable::close)));
212199
} else {
213200
errorResponse(listener);
214201
}

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -915,11 +915,7 @@ private void construct(
915915
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
916916

917917
final IndexingPressure indexingLimits = new IndexingPressure(settings);
918-
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
919-
client,
920-
indexingLimits,
921-
threadPool.getThreadContext()
922-
);
918+
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
923919

924920
ActionModule actionModule = new ActionModule(
925921
settings,

server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl
125125
if (request.isStreamedContent()) {
126126
assert action instanceof RequestBodyChunkConsumer;
127127
var chunkConsumer = (RequestBodyChunkConsumer) action;
128+
128129
request.contentStream().setHandler(new HttpBody.ChunkHandler() {
129130
@Override
130131
public void onNext(ReleasableBytesReference chunk, boolean isLast) {

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
173173
this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
174174
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
175175
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
176-
// TODO: Fix type deprecation logging
177-
this.parser = new BulkRequestParser(false, request.getRestApiVersion());
176+
this.parser = new BulkRequestParser(true, request.getRestApiVersion());
178177
this.handlerSupplier = handlerSupplier;
179178
}
180179

0 commit comments

Comments
 (0)