|
9 | 9 |
|
10 | 10 | import org.elasticsearch.action.ActionListener; |
11 | 11 | import org.elasticsearch.action.support.ActionFilters; |
12 | | -import org.elasticsearch.action.support.master.TransportMasterNodeAction; |
| 12 | +import org.elasticsearch.action.support.ChannelActionListener; |
| 13 | +import org.elasticsearch.action.support.local.TransportLocalClusterStateAction; |
13 | 14 | import org.elasticsearch.cluster.ClusterState; |
14 | 15 | import org.elasticsearch.cluster.block.ClusterBlockException; |
15 | 16 | import org.elasticsearch.cluster.block.ClusterBlockLevel; |
|
18 | 19 | import org.elasticsearch.cluster.service.ClusterService; |
19 | 20 | import org.elasticsearch.common.settings.Settings; |
20 | 21 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
| 22 | +import org.elasticsearch.core.UpdateForV10; |
21 | 23 | import org.elasticsearch.injection.guice.Inject; |
| 24 | +import org.elasticsearch.tasks.CancellableTask; |
22 | 25 | import org.elasticsearch.tasks.Task; |
23 | | -import org.elasticsearch.threadpool.ThreadPool; |
24 | 26 | import org.elasticsearch.transport.TransportService; |
25 | 27 | import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction; |
26 | 28 |
|
|
30 | 32 | import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME; |
31 | 33 | import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST; |
32 | 34 |
|
33 | | -public class TransportGetWatcherSettingsAction extends TransportMasterNodeAction< |
| 35 | +public class TransportGetWatcherSettingsAction extends TransportLocalClusterStateAction< |
34 | 36 | GetWatcherSettingsAction.Request, |
35 | 37 | GetWatcherSettingsAction.Response> { |
36 | 38 |
|
37 | 39 | private final IndexNameExpressionResolver indexNameExpressionResolver; |
38 | 40 |
|
| 41 | + /** |
| 42 | + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until |
| 43 | + * we no longer need to support calling this action remotely. |
| 44 | + */ |
| 45 | + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) |
| 46 | + @SuppressWarnings("this-escape") |
39 | 47 | @Inject |
40 | 48 | public TransportGetWatcherSettingsAction( |
41 | 49 | TransportService transportService, |
42 | 50 | ClusterService clusterService, |
43 | | - ThreadPool threadPool, |
44 | 51 | ActionFilters actionFilters, |
45 | 52 | IndexNameExpressionResolver indexNameExpressionResolver |
46 | 53 | ) { |
47 | 54 | super( |
48 | 55 | GetWatcherSettingsAction.NAME, |
49 | | - transportService, |
50 | | - clusterService, |
51 | | - threadPool, |
52 | 56 | actionFilters, |
53 | | - GetWatcherSettingsAction.Request::readFrom, |
54 | | - GetWatcherSettingsAction.Response::new, |
| 57 | + transportService.getTaskManager(), |
| 58 | + clusterService, |
55 | 59 | EsExecutors.DIRECT_EXECUTOR_SERVICE |
56 | 60 | ); |
57 | 61 | this.indexNameExpressionResolver = indexNameExpressionResolver; |
| 62 | + |
| 63 | + transportService.registerRequestHandler( |
| 64 | + actionName, |
| 65 | + executor, |
| 66 | + false, |
| 67 | + true, |
| 68 | + GetWatcherSettingsAction.Request::readFrom, |
| 69 | + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) |
| 70 | + ); |
58 | 71 | } |
59 | 72 |
|
60 | 73 | @Override |
61 | | - protected void masterOperation( |
| 74 | + protected void localClusterStateOperation( |
62 | 75 | Task task, |
63 | 76 | GetWatcherSettingsAction.Request request, |
64 | 77 | ClusterState state, |
65 | 78 | ActionListener<GetWatcherSettingsAction.Response> listener |
66 | 79 | ) { |
| 80 | + ((CancellableTask) task).ensureNotCancelled(); |
67 | 81 | IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME); |
68 | 82 | if (metadata == null) { |
69 | 83 | listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY)); |
|
0 commit comments