Skip to content

Commit 6ec7a9a

Browse files
authored
Introduce utils for _really_ stashing the thread context (#114786)
`ThreadContext#stashContext` does not yield a completely fresh context: it preserves headers related to tracing the original request. That may be appropriate in many situations, but sometimes we really do want to detach processing entirely from the original task. This commit introduces new utilities to do that.
1 parent 24b26f2 commit 6ec7a9a

File tree

19 files changed

+140
-90
lines changed

19 files changed

+140
-90
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
115115
}
116116

117117
private void executeWithSystemContext(Request request, ThreadContext threadContext, ActionListener<Response> listener) {
118-
try (var ignore = threadContext.stashContext()) {
119-
threadContext.markAsSystemContext();
118+
try (var ignore = threadContext.newEmptySystemContext()) {
120119
if (request.remoteClusterServer) {
121120
final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().clear()
122121
.addMetrics(NodesInfoMetrics.Metric.REMOTE_CLUSTER_SERVER.metricName());

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
141141
if (shouldBackOff()) {
142142
final boolean isFirstRequest = incrementalRequestSubmitted == false;
143143
incrementalRequestSubmitted = true;
144-
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
145-
requestContext.restore();
144+
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
146145
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
147146
releasables.clear();
148147
bulkInProgress = true;
@@ -188,8 +187,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
188187
} else {
189188
assert bulkRequest != null;
190189
if (internalAddItems(items, releasable)) {
191-
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
192-
requestContext.restore();
190+
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
193191
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
194192
releasables.clear();
195193
// We do not need to set this back to false as this will be the last request.

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.common.settings.Setting;
3131
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3232
import org.elasticsearch.common.util.concurrent.ListenableFuture;
33-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3433
import org.elasticsearch.core.Nullable;
3534
import org.elasticsearch.core.Releasable;
3635
import org.elasticsearch.core.Releasables;
@@ -194,9 +193,7 @@ public void start() {
194193
* system context.
195194
*/
196195
if (clusterService.localNode().isMasterNode() == false) {
197-
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
198-
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
199-
threadContext.markAsSystemContext();
196+
try (var ignored = transportService.getThreadPool().getThreadContext().newEmptySystemContext()) {
200197
beginPollingRemoteMasterStabilityDiagnostic();
201198
}
202199
}

server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2626
import org.elasticsearch.common.util.concurrent.PrioritizedThrottledTaskRunner;
27-
import org.elasticsearch.common.util.concurrent.ThreadContext;
2827
import org.elasticsearch.core.Releasable;
2928
import org.elasticsearch.core.Releasables;
3029
import org.elasticsearch.core.TimeValue;
@@ -271,9 +270,7 @@ public void onFailure(Exception e) {
271270
@Override
272271
public void onResponse(Releasable releasable) {
273272
boolean success = false;
274-
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
275-
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
276-
threadContext.markAsSystemContext();
273+
try (var ignored = transportService.getThreadPool().getThreadContext().newEmptySystemContext()) {
277274
client.execute(
278275
TransportNodesHotThreadsAction.TYPE,
279276
new NodesHotThreadsRequest(

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,7 @@ private void submitStateUpdateTask(
334334
final ThreadContext threadContext = threadPool.getThreadContext();
335335
final Supplier<ThreadContext.StoredContext> storedContextSupplier = threadContext.newRestorableContext(true);
336336

337-
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
338-
threadContext.markAsSystemContext();
337+
try (var ignore = threadContext.newEmptySystemContext()) {
339338
threadPoolExecutor.execute(
340339
new UpdateTask(
341340
priority,

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public ThreadContext(Settings settings) {
101101
}
102102

103103
/**
104-
* Removes the current context and resets a default context. The removed context can be
104+
* Removes the current context and resets a default context except for headers involved in task tracing. The removed context can be
105105
* restored by closing the returned {@link StoredContext}.
106106
* @return a stored context that will restore the current context to its state at the point this method was called
107107
*/
@@ -159,6 +159,28 @@ public StoredContext stashContextPreservingRequestHeaders(final String... reques
159159
return stashContextPreservingRequestHeaders(Set.of(requestHeaders));
160160
}
161161

162+
/**
163+
* Removes the current context and replaces it with a completely empty default context, detaching execution entirely from the calling
164+
* context. The calling context can be restored by closing the returned {@link StoredContext}. Similar to {@link #stashContext()} except
165+
* that this method does not even preserve tracing-related headers.
166+
*/
167+
public StoredContext newEmptyContext() {
168+
final var callingContext = threadLocal.get();
169+
threadLocal.set(DEFAULT_CONTEXT);
170+
return storedOriginalContext(callingContext);
171+
}
172+
173+
/**
174+
* Removes the current context and replaces it with a completely empty system context, detaching execution entirely from the calling
175+
* context. The calling context can be restored by closing the returned {@link StoredContext}. Similar to {@link #stashContext()} except
176+
* that this method does not even preserve tracing-related headers.
177+
*/
178+
public StoredContext newEmptySystemContext() {
179+
final var callingContext = threadLocal.get();
180+
threadLocal.set(DEFAULT_CONTEXT.setSystemContext());
181+
return storedOriginalContext(callingContext);
182+
}
183+
162184
/**
163185
* When using a {@link org.elasticsearch.telemetry.tracing.Tracer} to capture activity in Elasticsearch, when a parent span is already
164186
* in progress, it is necessary to start a new context before beginning a child span. This method creates a context,
@@ -330,6 +352,16 @@ public StoredContext newStoredContextPreservingResponseHeaders() {
330352
};
331353
}
332354

355+
/**
356+
* Capture the current context and then restore the given context, returning a {@link StoredContext} that reverts back to the current
357+
* context again. Equivalent to using {@link #newStoredContext()} and then calling {@code existingContext.restore()}.
358+
*/
359+
public StoredContext restoreExistingContext(StoredContext existingContext) {
360+
final var originalContext = threadLocal.get();
361+
existingContext.restore();
362+
return storedOriginalContext(originalContext);
363+
}
364+
333365
/**
334366
* Just like {@link #stashContext()} but no default context is set.
335367
*/
@@ -914,14 +946,13 @@ private class ContextPreservingRunnable implements WrappedRunnable {
914946
private final ThreadContext.StoredContext ctx;
915947

916948
private ContextPreservingRunnable(Runnable in) {
917-
ctx = newStoredContext();
949+
this.ctx = newStoredContext();
918950
this.in = in;
919951
}
920952

921953
@Override
922954
public void run() {
923-
try (ThreadContext.StoredContext ignore = stashContext()) {
924-
ctx.restore();
955+
try (var ignore = restoreExistingContext(ctx)) {
925956
in.run();
926957
}
927958
}

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
2525
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.common.util.concurrent.ThreadContext;
2726
import org.elasticsearch.index.shard.IndexShard;
2827
import org.elasticsearch.index.shard.ShardId;
2928
import org.elasticsearch.indices.IndicesService;
@@ -95,14 +94,9 @@ protected void doExecute(Task task, Request request, ActionListener<ReplicationR
9594
}
9695

9796
final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
98-
final ThreadContext threadContext = threadPool.getThreadContext();
99-
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
97+
try (var ignore = threadPool.getThreadContext().newEmptySystemContext()) {
10098
// we have to execute under the system context so that if security is enabled the sync is authorized
101-
threadContext.markAsSystemContext();
102-
final Request request = new Request(shardId, retentionLeases);
103-
try (var ignored = threadContext.newTraceContext()) {
104-
sendRetentionLeaseSyncAction(shardId, primaryAllocationId, primaryTerm, request);
105-
}
99+
sendRetentionLeaseSyncAction(shardId, primaryAllocationId, primaryTerm, new Request(shardId, retentionLeases));
106100
}
107101
}
108102

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.common.io.stream.StreamInput;
3030
import org.elasticsearch.common.io.stream.StreamOutput;
3131
import org.elasticsearch.common.settings.Settings;
32-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3332
import org.elasticsearch.index.IndexNotFoundException;
3433
import org.elasticsearch.index.IndexingPressure;
3534
import org.elasticsearch.index.shard.IndexShard;
@@ -111,47 +110,43 @@ final void sync(
111110
RetentionLeases retentionLeases,
112111
ActionListener<ReplicationResponse> listener
113112
) {
114-
final ThreadContext threadContext = threadPool.getThreadContext();
115-
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
113+
try (var ignore = threadPool.getThreadContext().newEmptySystemContext()) {
116114
// we have to execute under the system context so that if security is enabled the sync is authorized
117-
threadContext.markAsSystemContext();
118115
final Request request = new Request(shardId, retentionLeases);
119-
try (var ignored = threadContext.newTraceContext()) {
120-
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
121-
transportService.sendChildRequest(
122-
clusterService.localNode(),
123-
transportPrimaryAction,
124-
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
125-
task,
126-
transportOptions,
127-
new TransportResponseHandler<ReplicationResponse>() {
128-
@Override
129-
public ReplicationResponse read(StreamInput in) throws IOException {
130-
return newResponseInstance(in);
131-
}
132-
133-
@Override
134-
public Executor executor() {
135-
return TransportResponseHandler.TRANSPORT_WORKER;
136-
}
137-
138-
@Override
139-
public void handleResponse(ReplicationResponse response) {
140-
task.setPhase("finished");
141-
taskManager.unregister(task);
142-
listener.onResponse(response);
143-
}
144-
145-
@Override
146-
public void handleException(TransportException e) {
147-
LOGGER.log(getExceptionLogLevel(e), () -> format("%s retention lease sync failed", shardId), e);
148-
task.setPhase("finished");
149-
taskManager.unregister(task);
150-
listener.onFailure(e);
151-
}
116+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
117+
transportService.sendChildRequest(
118+
clusterService.localNode(),
119+
transportPrimaryAction,
120+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
121+
task,
122+
transportOptions,
123+
new TransportResponseHandler<ReplicationResponse>() {
124+
@Override
125+
public ReplicationResponse read(StreamInput in) throws IOException {
126+
return newResponseInstance(in);
152127
}
153-
);
154-
}
128+
129+
@Override
130+
public Executor executor() {
131+
return TransportResponseHandler.TRANSPORT_WORKER;
132+
}
133+
134+
@Override
135+
public void handleResponse(ReplicationResponse response) {
136+
task.setPhase("finished");
137+
taskManager.unregister(task);
138+
listener.onResponse(response);
139+
}
140+
141+
@Override
142+
public void handleException(TransportException e) {
143+
LOGGER.log(getExceptionLogLevel(e), () -> format("%s retention lease sync failed", shardId), e);
144+
task.setPhase("finished");
145+
taskManager.unregister(task);
146+
listener.onFailure(e);
147+
}
148+
}
149+
);
155150
}
156151
}
157152

server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
147147
if (refreshForcers == 0 && roomForListener(maxRefreshes, listeners, checkpointRefreshListeners)) {
148148
ThreadContext.StoredContext storedContext = threadContext.newStoredContextPreservingResponseHeaders();
149149
Consumer<Boolean> contextPreservingListener = forced -> {
150-
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
151-
storedContext.restore();
150+
try (var ignore = threadContext.restoreExistingContext(storedContext)) {
152151
listener.accept(forced);
153152
}
154153
};

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
109109

110110
TransportResponseHandler<?> responseHandler = null;
111111
ThreadContext threadContext = threadPool.getThreadContext();
112-
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
112+
assert threadContext.isDefaultContext();
113+
try (var ignored = threadContext.newStoredContext()) {
113114
// Place the context with the headers from the message
114115
threadContext.setHeaders(header.getHeaders());
115116
threadContext.putTransient("_remote_address", remoteAddress);

0 commit comments

Comments
 (0)