Skip to content

Commit 0dc6df6

Browse files
committed
Fix thread context propagation in TransportClusterUpdateSettingsAction
This fixes the settings API to properly return deprecation warning headers when updating deprecated settings, while properly associating them with the correct correlation id. Previously, the correlation id of the original context wasn't propagated to the cluster state update task. However, using `newRestorableContext(preserveResponseHeaders=true)` isn't enough to also - in case - propagate warning headers back to the task's thread context and hence can't be captured by the task executor in MasterService to return the response headers back to the caller. This PR adds a new variant of `newRestorableContext` that supports propagation of response headers if required. Fixes #108628 (DeprecationHttpIT.testDeprecatedSettingsReturnWarnings)
1 parent 1df4a90 commit 0dc6df6

File tree

3 files changed

+58
-17
lines changed

3 files changed

+58
-17
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ tests:
6666
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
6767
method: test {p0=transform/transforms_start_stop/Test start already started transform}
6868
issue: https://github.com/elastic/elasticsearch/issues/98802
69-
- class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT
70-
method: testDeprecatedSettingsReturnWarnings
71-
issue: https://github.com/elastic/elasticsearch/issues/108628
7269
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
7370
method: testAllocationPreventedForRemoval
7471
issue: https://github.com/elastic/elasticsearch/issues/116363

server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.common.settings.SettingsUpdater;
3232
import org.elasticsearch.common.util.concurrent.EsExecutors;
33+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3334
import org.elasticsearch.core.SuppressForbidden;
3435
import org.elasticsearch.injection.guice.Inject;
3536
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
@@ -40,6 +41,7 @@
4041
import java.util.HashSet;
4142
import java.util.Optional;
4243
import java.util.Set;
44+
import java.util.function.Supplier;
4345

4446
import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
4547

@@ -155,7 +157,8 @@ protected void masterOperation(
155157
final ClusterState state,
156158
final ActionListener<ClusterUpdateSettingsResponse> listener
157159
) {
158-
submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
160+
var storedContext = threadPool.getThreadContext().newRestorableContext(false, true);
161+
var updateSettingsTask = new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, storedContext, listener) {
159162
@Override
160163
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
161164
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
@@ -228,35 +231,41 @@ public void onFailure(Exception e) {
228231
logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
229232
super.onFailure(e);
230233
}
231-
});
234+
};
235+
submitUnbatchedTask(UPDATE_TASK_SOURCE, updateSettingsTask);
232236
}
233237

234238
private static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
235239
protected volatile boolean reroute = false;
236240
protected final SettingsUpdater updater;
237241
protected final ClusterUpdateSettingsRequest request;
242+
private final Supplier<ThreadContext.StoredContext> storedContext;
238243

239244
ClusterUpdateSettingsTask(
240245
final ClusterSettings clusterSettings,
241246
Priority priority,
242247
ClusterUpdateSettingsRequest request,
248+
Supplier<ThreadContext.StoredContext> storedContext,
243249
ActionListener<? extends AcknowledgedResponse> listener
244250
) {
245251
super(priority, request, listener);
246252
this.updater = new SettingsUpdater(clusterSettings);
247253
this.request = request;
254+
this.storedContext = storedContext;
248255
}
249256

250257
@Override
251258
public ClusterState execute(final ClusterState currentState) {
252-
final ClusterState clusterState = updater.updateSettings(
253-
currentState,
254-
request.transientSettings(),
255-
request.persistentSettings(),
256-
logger
257-
);
258-
reroute = clusterState != currentState;
259-
return clusterState;
259+
try (var ignored = storedContext.get()) {
260+
final ClusterState clusterState = updater.updateSettings(
261+
currentState,
262+
request.transientSettings(),
263+
request.persistentSettings(),
264+
logger
265+
);
266+
reroute = clusterState != currentState;
267+
return clusterState;
268+
}
260269
}
261270
}
262271

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -449,27 +449,62 @@ private StoredContext newStoredContext(
449449
* new Thread() {
450450
* public void run() {
451451
* try (ThreadContext.StoredContext ctx = restorable.get()) {
452-
* // execute with the parents context and restore the threads context afterwards
452+
* // execute with the parents context and restore the thread's context afterwards
453+
* // response headers of this child context will be propagated back to the parent context
454+
* // if `propagateResponseHeaders` is true
453455
* }
454456
* }
455457
*
456458
* }.start();
457459
* </pre>
458460
*
459461
* @param preserveResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
462+
* @param propagateResponseHeaders if set to <code>true</code> the response headers of the child context will be propagated back.
460463
* @return a restorable context supplier
461464
*/
462-
public Supplier<StoredContext> newRestorableContext(boolean preserveResponseHeaders) {
463-
return wrapRestorable(preserveResponseHeaders ? newStoredContextPreservingResponseHeaders() : newStoredContext());
465+
public Supplier<StoredContext> newRestorableContext(boolean preserveResponseHeaders, boolean propagateResponseHeaders) {
466+
return wrapRestorable(
467+
preserveResponseHeaders ? newStoredContextPreservingResponseHeaders() : newStoredContext(),
468+
propagateResponseHeaders
469+
);
470+
}
471+
472+
/**
473+
* Returns a supplier that gathers a {@link #newStoredContextPreservingResponseHeaders()} and restores it once the
474+
* returned supplier is invoked. The context returned from the supplier is a stored version of the
475+
* suppliers callers context that should be restored once the originally gathered context is not needed anymore.
476+
* For instance this method should be used like this:
477+
*
478+
* <pre>
479+
* Supplier&lt;ThreadContext.StoredContext&gt; restorable = context.newRestorableContext(true);
480+
* new Thread() {
481+
* public void run() {
482+
* try (ThreadContext.StoredContext ctx = restorable.get()) {
483+
* // execute with the parents context and restore the thread's context afterwards
484+
* }
485+
* }
486+
*
487+
* }.start();
488+
* </pre>
489+
*
490+
* @param preserveExistingResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
491+
* @return a restorable context supplier
492+
*/
493+
public Supplier<StoredContext> newRestorableContext(boolean preserveExistingResponseHeaders) {
494+
return newRestorableContext(preserveExistingResponseHeaders, false);
464495
}
465496

466497
/**
467498
* Same as {@link #newRestorableContext(boolean)} but wraps an existing context to restore.
468499
* @param storedContext the context to restore
469500
*/
470501
public Supplier<StoredContext> wrapRestorable(StoredContext storedContext) {
502+
return wrapRestorable(storedContext, false);
503+
}
504+
505+
private Supplier<StoredContext> wrapRestorable(StoredContext storedContext, boolean propagateResponseHeaders) {
471506
return () -> {
472-
StoredContext context = newStoredContext();
507+
var context = propagateResponseHeaders ? newStoredContextPreservingResponseHeaders() : newStoredContext();
473508
storedContext.restore();
474509
return context;
475510
};

0 commit comments

Comments
 (0)