Skip to content
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ tests:
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Test start already started transform}
issue: https://github.com/elastic/elasticsearch/issues/98802
- class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT
method: testDeprecatedSettingsReturnWarnings
issue: https://github.com/elastic/elasticsearch/issues/108628
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
method: testAllocationPreventedForRemoval
issue: https://github.com/elastic/elasticsearch/issues/116363
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsUpdater;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
Expand All @@ -40,6 +41,7 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;

Expand Down Expand Up @@ -155,7 +157,8 @@ protected void masterOperation(
final ClusterState state,
final ActionListener<ClusterUpdateSettingsResponse> listener
) {
submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
var storedContext = threadPool.getThreadContext().newRestorableContext(false, true);
var updateSettingsTask = new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, storedContext, listener) {
@Override
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
Expand Down Expand Up @@ -228,35 +231,41 @@ public void onFailure(Exception e) {
logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
super.onFailure(e);
}
});
};
submitUnbatchedTask(UPDATE_TASK_SOURCE, updateSettingsTask);
}

private static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
protected volatile boolean reroute = false;
protected final SettingsUpdater updater;
protected final ClusterUpdateSettingsRequest request;
private final Supplier<ThreadContext.StoredContext> storedContext;

ClusterUpdateSettingsTask(
final ClusterSettings clusterSettings,
Priority priority,
ClusterUpdateSettingsRequest request,
Supplier<ThreadContext.StoredContext> storedContext,
ActionListener<? extends AcknowledgedResponse> listener
) {
super(priority, request, listener);
this.updater = new SettingsUpdater(clusterSettings);
this.request = request;
this.storedContext = storedContext;
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
currentState,
request.transientSettings(),
request.persistentSettings(),
logger
);
reroute = clusterState != currentState;
return clusterState;
try (var ignored = storedContext.get()) {
final ClusterState clusterState = updater.updateSettings(
currentState,
request.transientSettings(),
request.persistentSettings(),
logger
);
reroute = clusterState != currentState;
return clusterState;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,27 +449,62 @@ private StoredContext newStoredContext(
* new Thread() {
* public void run() {
* try (ThreadContext.StoredContext ctx = restorable.get()) {
* // execute with the parents context and restore the threads context afterwards
* // execute with the parents context and restore the thread's context afterwards
* // response headers of this child context will be propagated back to the parent context
* // if `propagateResponseHeaders` is true
* }
* }
*
* }.start();
* </pre>
*
* @param preserveResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
* @param propagateResponseHeaders if set to <code>true</code> the response headers of the child context will be propagated back.
* @return a restorable context supplier
*/
public Supplier<StoredContext> newRestorableContext(boolean preserveResponseHeaders) {
return wrapRestorable(preserveResponseHeaders ? newStoredContextPreservingResponseHeaders() : newStoredContext());
public Supplier<StoredContext> newRestorableContext(boolean preserveResponseHeaders, boolean propagateResponseHeaders) {
return wrapRestorable(
preserveResponseHeaders ? newStoredContextPreservingResponseHeaders() : newStoredContext(),
propagateResponseHeaders
);
}

/**
* Returns a supplier that gathers a {@link #newStoredContextPreservingResponseHeaders()} and restores it once the
* returned supplier is invoked. The context returned from the supplier is a stored version of the
* suppliers callers context that should be restored once the originally gathered context is not needed anymore.
* For instance this method should be used like this:
*
* <pre>
* Supplier&lt;ThreadContext.StoredContext&gt; restorable = context.newRestorableContext(true);
* new Thread() {
* public void run() {
* try (ThreadContext.StoredContext ctx = restorable.get()) {
* // execute with the parents context and restore the thread's context afterwards
* }
* }
*
* }.start();
* </pre>
*
* @param preserveExistingResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
* @return a restorable context supplier
*/
public Supplier<StoredContext> newRestorableContext(boolean preserveExistingResponseHeaders) {
return newRestorableContext(preserveExistingResponseHeaders, false);
}

/**
* Same as {@link #newRestorableContext(boolean)} but wraps an existing context to restore.
* @param storedContext the context to restore
*/
public Supplier<StoredContext> wrapRestorable(StoredContext storedContext) {
return wrapRestorable(storedContext, false);
}

private Supplier<StoredContext> wrapRestorable(StoredContext storedContext, boolean propagateResponseHeaders) {
return () -> {
StoredContext context = newStoredContext();
var context = propagateResponseHeaders ? newStoredContextPreservingResponseHeaders() : newStoredContext();
storedContext.restore();
return context;
};
Expand Down