Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ tests:
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
method: testSearchWithRandomDisconnects
issue: https://github.com/elastic/elasticsearch/issues/116175
- class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT
method: testDeprecatedSettingsReturnWarnings
issue: https://github.com/elastic/elasticsearch/issues/108628
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
method: testAllocationPreventedForRemoval
issue: https://github.com/elastic/elasticsearch/issues/116363
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsUpdater;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
Expand Down Expand Up @@ -157,7 +159,12 @@ protected void masterOperation(
final ClusterState state,
final ActionListener<ClusterUpdateSettingsResponse> listener
) {
submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
// Force propagating the X-Opaque-Id to the settings update task if set, so we can correctly track deprecations.
String xOpaqueId = task.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER);
Runnable contextUpdater = Strings.hasText(xOpaqueId)
? () -> threadPool.getThreadContext().putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, xOpaqueId)
: null;
Comment on lines +163 to +166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this seems wrong, I think we should (a) propagate the whole thread context not just this one specific header, and (b) clean up the thread context at the end of the task execution rather than letting the polluted context fall through to the caller. This seems like a job for something like ThreadContext#newRestorableContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to use any of the provided paradigms (newRestorableContext, stash), though haven't spend time to look closer into it yet. Also, "pollution" is somehow necessary so warning headers are correctly returned on the response. Or do I misunderstand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this should be sufficient:

diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
index b5cd2d2c3dc4..3ac5f26ed98f 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
@@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsUpdater;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
@@ -41,6 +42,7 @@ import org.elasticsearch.transport.TransportService;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
 
@@ -157,7 +159,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
         final ClusterState state,
         final ActionListener<ClusterUpdateSettingsResponse> listener
     ) {
-        submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
+        submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, threadPool.getThreadContext().newRestorableContext(true), listener) {
             @Override
             protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
                 return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
@@ -237,20 +239,24 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
         protected volatile boolean reroute = false;
         protected final SettingsUpdater updater;
         protected final ClusterUpdateSettingsRequest request;
+        private final Supplier<ThreadContext.StoredContext> storedContext;
 
         ClusterUpdateSettingsTask(
             final ClusterSettings clusterSettings,
             Priority priority,
             ClusterUpdateSettingsRequest request,
+            Supplier<ThreadContext.StoredContext> storedContext,
             ActionListener<? extends AcknowledgedResponse> listener
         ) {
             super(priority, request, listener);
             this.updater = new SettingsUpdater(clusterSettings);
             this.request = request;
+            this.storedContext = storedContext;
         }
 
         @Override
         public ClusterState execute(final ClusterState currentState) {
+            try (var ignored = storedContext.get()) {
             final ClusterState clusterState = updater.updateSettings(
                 currentState,
                 request.transientSettings(),
@@ -259,6 +265,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
             );
             reroute = clusterState != currentState;
             return clusterState;
+            }
         }
     }
 

But you're right, it's not. The point is that newRestorableContext(true) should propagate the response headers to the caller, which are themselves wrapped in a ClusterStateTaskExecutor.TaskContext#captureResponseHeaders that then propagate the response headers on to the listener.

It's not obvious to me why this isn't working, but I think we need to understand that instead of taking the approach in this PR. This won't be the only place where we're not propagating something correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's what I started with. I'll close this and will investigate more thoroughly 👍

Copy link
Contributor Author

@mosche mosche May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up: #127799

submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, request, listener, contextUpdater) {
@Override
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
Expand Down Expand Up @@ -237,20 +244,25 @@ private static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTa
protected volatile boolean reroute = false;
protected final SettingsUpdater updater;
protected final ClusterUpdateSettingsRequest request;
protected final @Nullable Runnable threadContextUpdater;

ClusterUpdateSettingsTask(
final ClusterSettings clusterSettings,
Priority priority,
ClusterUpdateSettingsRequest request,
ActionListener<? extends AcknowledgedResponse> listener
ActionListener<? extends AcknowledgedResponse> listener,
@Nullable Runnable threadContextUpdater
) {
super(priority, request, listener);
super(Priority.IMMEDIATE, request, listener);
this.updater = new SettingsUpdater(clusterSettings);
this.request = request;
this.threadContextUpdater = threadContextUpdater;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (threadContextUpdater != null) {
threadContextUpdater.run();
}
final ClusterState clusterState = updater.updateSettings(
currentState,
request.transientSettings(),
Expand Down