From 01ea97eea8a76da4b0de4f42a28e11dc93146dcb Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Sat, 15 Feb 2025 18:22:44 +1000 Subject: [PATCH 1/2] Run `TransportGetWatcherSettingsAction` on local node This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805 --- .../actions/put/GetWatcherSettingsAction.java | 33 +++++++++++------- .../action/RestGetWatcherSettingsAction.java | 7 +++- .../TransportGetWatcherSettingsAction.java | 34 +++++++++++++------ 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/put/GetWatcherSettingsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/put/GetWatcherSettingsAction.java index c6a22ec289be9..8b96ca44b5f93 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/put/GetWatcherSettingsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/put/GetWatcherSettingsAction.java @@ -11,15 +11,20 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Map; public class GetWatcherSettingsAction extends ActionType { @@ -30,12 +35,17 @@ public GetWatcherSettingsAction() { super(NAME); } - public static class Request extends MasterNodeReadRequest { + public static class Request extends LocalClusterStateRequest { public Request(TimeValue masterNodeTimeout) { super(masterNodeTimeout); } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) public static Request readFrom(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) { return new Request(in); @@ -49,15 +59,13 @@ private Request(StreamInput in) throws IOException { } @Override - public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) { - super.writeTo(out); - } + public ActionRequestValidationException validate() { + return null; } @Override - public ActionRequestValidationException validate() { - return null; + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); } } @@ -69,10 +77,11 @@ public Response(Settings settings) { this.settings = settings; } - public Response(StreamInput in) throws IOException { - this.settings = Settings.readSettingsFromStream(in); - } - + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { this.settings.writeTo(out); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatcherSettingsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatcherSettingsAction.java index ff9e8c45d72f3..c9cb22ffc0ae4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatcherSettingsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatcherSettingsAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction; @@ -37,6 +38,10 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { GetWatcherSettingsAction.Request req = new GetWatcherSettingsAction.Request(RestUtils.getMasterNodeTimeout(request)); - return channel -> client.execute(GetWatcherSettingsAction.INSTANCE, req, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + GetWatcherSettingsAction.INSTANCE, + req, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java index 73daf47fcac04..d138e0719ce64 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java @@ -9,7 +9,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalClusterStateAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -18,9 +19,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction; @@ -30,40 +32,52 @@ import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME; import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST; -public class TransportGetWatcherSettingsAction extends TransportMasterNodeAction< +public class TransportGetWatcherSettingsAction extends TransportLocalClusterStateAction< GetWatcherSettingsAction.Request, GetWatcherSettingsAction.Response> { private final IndexNameExpressionResolver indexNameExpressionResolver; + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject public TransportGetWatcherSettingsAction( TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver ) { super( GetWatcherSettingsAction.NAME, - transportService, - clusterService, - threadPool, actionFilters, - GetWatcherSettingsAction.Request::readFrom, - GetWatcherSettingsAction.Response::new, + transportService.getTaskManager(), + clusterService, EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.indexNameExpressionResolver = indexNameExpressionResolver; + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + GetWatcherSettingsAction.Request::readFrom, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, GetWatcherSettingsAction.Request request, ClusterState state, ActionListener listener ) { + ((CancellableTask) task).ensureNotCancelled(); IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME); if (metadata == null) { listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY)); From 83d7a77f3d540deac088e1498c6d81f4a6b901df Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Tue, 18 Feb 2025 15:02:24 +0100 Subject: [PATCH 2/2] Update docs/changelog/122857.yaml --- docs/changelog/122857.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/122857.yaml diff --git a/docs/changelog/122857.yaml b/docs/changelog/122857.yaml new file mode 100644 index 0000000000000..6d14acc02f2fe --- /dev/null +++ b/docs/changelog/122857.yaml @@ -0,0 +1,5 @@ +pr: 122857 +summary: Run `TransportGetWatcherSettingsAction` on local node +area: Watcher +type: enhancement +issues: []