-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Force propagating the X-Opaque-Id to the settings update task if set #120877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Force propagating the X-Opaque-Id to the settings update task if set #120877
Conversation
… so we can correctly track deprecations (closes elastic#108628).
|
Pinging @elastic/es-core-infra (Team:Core/Infra) |
| String xOpaqueId = task.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER); | ||
| Runnable contextUpdater = Strings.hasText(xOpaqueId) | ||
| ? () -> threadPool.getThreadContext().putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, xOpaqueId) | ||
| : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this seems wrong, I think we should (a) propagate the whole thread context not just this one specific header, and (b) clean up the thread context at the end of the task execution rather than letting the polluted context fall through to the caller. This seems like a job for something like ThreadContext#newRestorableContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't able to use any of the provided paradigms (newRestorableContext, stash), though haven't spend time to look closer into it yet. Also, "pollution" is somehow necessary so warning headers are correctly returned on the response. Or do I misunderstand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this should be sufficient:
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
index b5cd2d2c3dc4..3ac5f26ed98f 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
@@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsUpdater;
import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
@@ -41,6 +42,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
@@ -157,7 +159,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
final ClusterState state,
final ActionListener<ClusterUpdateSettingsResponse> listener
) {
- submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
+ submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, threadPool.getThreadContext().newRestorableContext(true), listener) {
@Override
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
@@ -237,20 +239,24 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
protected volatile boolean reroute = false;
protected final SettingsUpdater updater;
protected final ClusterUpdateSettingsRequest request;
+ private final Supplier<ThreadContext.StoredContext> storedContext;
ClusterUpdateSettingsTask(
final ClusterSettings clusterSettings,
Priority priority,
ClusterUpdateSettingsRequest request,
+ Supplier<ThreadContext.StoredContext> storedContext,
ActionListener<? extends AcknowledgedResponse> listener
) {
super(priority, request, listener);
this.updater = new SettingsUpdater(clusterSettings);
this.request = request;
+ this.storedContext = storedContext;
}
@Override
public ClusterState execute(final ClusterState currentState) {
+ try (var ignored = storedContext.get()) {
final ClusterState clusterState = updater.updateSettings(
currentState,
request.transientSettings(),
@@ -259,6 +265,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
);
reroute = clusterState != currentState;
return clusterState;
+ }
}
}
But you're right, it's not. The point is that newRestorableContext(true) should propagate the response headers to the caller, which are themselves wrapped in a ClusterStateTaskExecutor.TaskContext#captureResponseHeaders that then propagate the response headers on to the listener.
It's not obvious to me why this isn't working, but I think we need to understand that instead of taking the approach in this PR. This won't be the only place where we're not propagating something correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's what I started with. I'll close this and will investigate more thoroughly 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up: #127799
Force propagating the X-Opaque-Id to the settings update task if set, so deprecation warnings are correctly tracked.
Interestingly, it looks like the behavior here changed. This test used to fail randomly, on current main this is failing consistently. Issue is that headers are not propagated into the update task.
(closes #108628)