Skip to content

Commit 01ea97e

Browse files
committed
Run TransportGetWatcherSettingsAction on local node
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805
1 parent 229d392 commit 01ea97e

File tree

3 files changed

+51
-23
lines changed

3 files changed

+51
-23
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/put/GetWatcherSettingsAction.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,20 @@
1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.action.ActionResponse;
1313
import org.elasticsearch.action.ActionType;
14-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
14+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.core.UpdateForV10;
20+
import org.elasticsearch.tasks.CancellableTask;
21+
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
1923
import org.elasticsearch.xcontent.ToXContentObject;
2024
import org.elasticsearch.xcontent.XContentBuilder;
2125

2226
import java.io.IOException;
27+
import java.util.Map;
2328

2429
public class GetWatcherSettingsAction extends ActionType<GetWatcherSettingsAction.Response> {
2530

@@ -30,12 +35,17 @@ public GetWatcherSettingsAction() {
3035
super(NAME);
3136
}
3237

33-
public static class Request extends MasterNodeReadRequest<Request> {
38+
public static class Request extends LocalClusterStateRequest {
3439

3540
public Request(TimeValue masterNodeTimeout) {
3641
super(masterNodeTimeout);
3742
}
3843

44+
/**
45+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
46+
* we no longer need to support calling this action remotely.
47+
*/
48+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
3949
public static Request readFrom(StreamInput in) throws IOException {
4050
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
4151
return new Request(in);
@@ -49,15 +59,13 @@ private Request(StreamInput in) throws IOException {
4959
}
5060

5161
@Override
52-
public void writeTo(StreamOutput out) throws IOException {
53-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
54-
super.writeTo(out);
55-
}
62+
public ActionRequestValidationException validate() {
63+
return null;
5664
}
5765

5866
@Override
59-
public ActionRequestValidationException validate() {
60-
return null;
67+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
68+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
6169
}
6270
}
6371

@@ -69,10 +77,11 @@ public Response(Settings settings) {
6977
this.settings = settings;
7078
}
7179

72-
public Response(StreamInput in) throws IOException {
73-
this.settings = Settings.readSettingsFromStream(in);
74-
}
75-
80+
/**
81+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
82+
* we no longer need to support calling this action remotely.
83+
*/
84+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7685
@Override
7786
public void writeTo(StreamOutput out) throws IOException {
7887
this.settings.writeTo(out);

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatcherSettingsAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.rest.BaseRestHandler;
1212
import org.elasticsearch.rest.RestRequest;
1313
import org.elasticsearch.rest.RestUtils;
14+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1415
import org.elasticsearch.rest.action.RestToXContentListener;
1516
import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction;
1617

@@ -37,6 +38,10 @@ public List<Route> routes() {
3738
@Override
3839
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
3940
GetWatcherSettingsAction.Request req = new GetWatcherSettingsAction.Request(RestUtils.getMasterNodeTimeout(request));
40-
return channel -> client.execute(GetWatcherSettingsAction.INSTANCE, req, new RestToXContentListener<>(channel));
41+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
42+
GetWatcherSettingsAction.INSTANCE,
43+
req,
44+
new RestToXContentListener<>(channel)
45+
);
4146
}
4247
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
12+
import org.elasticsearch.action.support.ChannelActionListener;
13+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1314
import org.elasticsearch.cluster.ClusterState;
1415
import org.elasticsearch.cluster.block.ClusterBlockException;
1516
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -18,9 +19,10 @@
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.util.concurrent.EsExecutors;
22+
import org.elasticsearch.core.UpdateForV10;
2123
import org.elasticsearch.injection.guice.Inject;
24+
import org.elasticsearch.tasks.CancellableTask;
2225
import org.elasticsearch.tasks.Task;
23-
import org.elasticsearch.threadpool.ThreadPool;
2426
import org.elasticsearch.transport.TransportService;
2527
import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction;
2628

@@ -30,40 +32,52 @@
3032
import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME;
3133
import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST;
3234

33-
public class TransportGetWatcherSettingsAction extends TransportMasterNodeAction<
35+
public class TransportGetWatcherSettingsAction extends TransportLocalClusterStateAction<
3436
GetWatcherSettingsAction.Request,
3537
GetWatcherSettingsAction.Response> {
3638

3739
private final IndexNameExpressionResolver indexNameExpressionResolver;
3840

41+
/**
42+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
43+
* we no longer need to support calling this action remotely.
44+
*/
45+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
46+
@SuppressWarnings("this-escape")
3947
@Inject
4048
public TransportGetWatcherSettingsAction(
4149
TransportService transportService,
4250
ClusterService clusterService,
43-
ThreadPool threadPool,
4451
ActionFilters actionFilters,
4552
IndexNameExpressionResolver indexNameExpressionResolver
4653
) {
4754
super(
4855
GetWatcherSettingsAction.NAME,
49-
transportService,
50-
clusterService,
51-
threadPool,
5256
actionFilters,
53-
GetWatcherSettingsAction.Request::readFrom,
54-
GetWatcherSettingsAction.Response::new,
57+
transportService.getTaskManager(),
58+
clusterService,
5559
EsExecutors.DIRECT_EXECUTOR_SERVICE
5660
);
5761
this.indexNameExpressionResolver = indexNameExpressionResolver;
62+
63+
transportService.registerRequestHandler(
64+
actionName,
65+
executor,
66+
false,
67+
true,
68+
GetWatcherSettingsAction.Request::readFrom,
69+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
70+
);
5871
}
5972

6073
@Override
61-
protected void masterOperation(
74+
protected void localClusterStateOperation(
6275
Task task,
6376
GetWatcherSettingsAction.Request request,
6477
ClusterState state,
6578
ActionListener<GetWatcherSettingsAction.Response> listener
6679
) {
80+
((CancellableTask) task).ensureNotCancelled();
6781
IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME);
6882
if (metadata == null) {
6983
listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY));

0 commit comments

Comments
 (0)