Skip to content

Commit af6eb8c

Browse files
authored
Run TransportGetDataStreamsAction on local node (#122852)
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805
1 parent df7be39 commit af6eb8c

File tree

8 files changed

+214
-260
lines changed

8 files changed

+214
-260
lines changed

docs/changelog/122852.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122852
2+
summary: Run `TransportGetDataStreamsAction` on local node
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222
testImplementation project(path: ':test:test-clusters')
2323
testImplementation project(":modules:mapper-extras")
2424
internalClusterTestImplementation project(":modules:mapper-extras")
25+
internalClusterTestImplementation project(':modules:rest-root')
2526
}
2627

2728
tasks.withType(StandaloneRestIntegTestTask).configureEach {
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams;
11+
12+
import org.apache.http.client.methods.HttpGet;
13+
import org.apache.http.client.methods.HttpPost;
14+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
15+
import org.elasticsearch.action.support.CancellableActionTestPlugin;
16+
import org.elasticsearch.action.support.PlainActionFuture;
17+
import org.elasticsearch.action.support.RefCountingListener;
18+
import org.elasticsearch.action.support.SubscribableListener;
19+
import org.elasticsearch.client.Request;
20+
import org.elasticsearch.client.Response;
21+
import org.elasticsearch.common.Strings;
22+
import org.elasticsearch.common.network.NetworkModule;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.rest.root.MainRestPlugin;
26+
import org.elasticsearch.test.ESIntegTestCase;
27+
import org.elasticsearch.test.rest.ObjectPath;
28+
import org.elasticsearch.transport.netty4.Netty4Plugin;
29+
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.concurrent.CancellationException;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
37+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
38+
import static org.hamcrest.Matchers.greaterThan;
39+
import static org.hamcrest.Matchers.oneOf;
40+
41+
public class DataStreamRestActionCancellationIT extends ESIntegTestCase {
42+
43+
@Override
44+
protected boolean addMockHttpTransport() {
45+
return false; // enable http
46+
}
47+
48+
@Override
49+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
50+
return Settings.builder()
51+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
52+
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
53+
.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
54+
.build();
55+
}
56+
57+
@Override
58+
protected Collection<Class<? extends Plugin>> nodePlugins() {
59+
return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, DataStreamsPlugin.class);
60+
}
61+
62+
public void testGetDataStreamCancellation() {
63+
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream"), GetDataStreamAction.NAME);
64+
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
65+
}
66+
67+
private void runRestActionCancellationTest(Request request, String actionName) {
68+
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
69+
70+
try (
71+
var restClient = createRestClient(node);
72+
var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node)
73+
) {
74+
final var responseFuture = new PlainActionFuture<Response>();
75+
final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture));
76+
77+
if (randomBoolean()) {
78+
// cancel by aborting the REST request
79+
capturingAction.captureAndCancel(restInvocation::cancel);
80+
expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
81+
} else {
82+
// cancel via the task management API
83+
final var cancelFuture = new PlainActionFuture<Void>();
84+
capturingAction.captureAndCancel(
85+
() -> SubscribableListener
86+
87+
.<ObjectPath>newForked(
88+
l -> restClient.performRequestAsync(
89+
getListTasksRequest(node, actionName),
90+
wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse))
91+
)
92+
)
93+
94+
.<Void>andThen((l, listTasksResponse) -> {
95+
final var taskCount = listTasksResponse.evaluateArraySize("tasks");
96+
assertThat(taskCount, greaterThan(0));
97+
try (var listeners = new RefCountingListener(l)) {
98+
for (int i = 0; i < taskCount; i++) {
99+
final var taskPrefix = "tasks." + i + ".";
100+
assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable"));
101+
assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled"));
102+
restClient.performRequestAsync(
103+
getCancelTaskRequest(
104+
listTasksResponse.evaluate(taskPrefix + "node"),
105+
listTasksResponse.evaluate(taskPrefix + "id")
106+
),
107+
wrapAsRestResponseListener(listeners.acquire(DataStreamRestActionCancellationIT::assertOK))
108+
);
109+
}
110+
}
111+
})
112+
113+
.addListener(cancelFuture)
114+
);
115+
cancelFuture.get(10, TimeUnit.SECONDS);
116+
expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
117+
}
118+
119+
assertAllTasksHaveFinished(actionName);
120+
} catch (Exception e) {
121+
fail(e);
122+
}
123+
}
124+
125+
private static Request getListTasksRequest(String taskNode, String actionName) {
126+
final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
127+
listTasksRequest.addParameter("nodes", taskNode);
128+
listTasksRequest.addParameter("actions", actionName);
129+
listTasksRequest.addParameter("group_by", "none");
130+
return listTasksRequest;
131+
}
132+
133+
private static Request getCancelTaskRequest(String taskNode, int taskId) {
134+
final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId));
135+
cancelTaskRequest.addParameter("wait_for_completion", null);
136+
return cancelTaskRequest;
137+
}
138+
139+
public static void assertOK(Response response) {
140+
assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201));
141+
}
142+
143+
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.IndexProperties;
1818
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.ManagedBy;
1919
import org.elasticsearch.action.support.ActionFilters;
20-
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
20+
import org.elasticsearch.action.support.ChannelActionListener;
21+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
2122
import org.elasticsearch.client.internal.Client;
2223
import org.elasticsearch.client.internal.OriginSettingClient;
2324
import org.elasticsearch.cluster.ProjectState;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.common.settings.Settings;
4041
import org.elasticsearch.core.Nullable;
4142
import org.elasticsearch.core.Tuple;
43+
import org.elasticsearch.core.UpdateForV10;
4244
import org.elasticsearch.index.Index;
4345
import org.elasticsearch.index.IndexMode;
4446
import org.elasticsearch.index.IndexSettingProvider;
@@ -47,6 +49,7 @@
4749
import org.elasticsearch.indices.SystemDataStreamDescriptor;
4850
import org.elasticsearch.indices.SystemIndices;
4951
import org.elasticsearch.injection.guice.Inject;
52+
import org.elasticsearch.tasks.CancellableTask;
5053
import org.elasticsearch.tasks.Task;
5154
import org.elasticsearch.threadpool.ThreadPool;
5255
import org.elasticsearch.transport.TransportService;
@@ -63,7 +66,7 @@
6366

6467
import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
6568

66-
public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjectAction<
69+
public class TransportGetDataStreamsAction extends TransportLocalProjectMetadataAction<
6770
GetDataStreamAction.Request,
6871
GetDataStreamAction.Response> {
6972

@@ -76,6 +79,12 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
7679
private final IndexSettingProviders indexSettingProviders;
7780
private final Client client;
7881

82+
/**
83+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
84+
* we no longer need to support calling this action remotely.
85+
*/
86+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
87+
@SuppressWarnings("this-escape")
7988
@Inject
8089
public TransportGetDataStreamsAction(
8190
TransportService transportService,
@@ -92,14 +101,11 @@ public TransportGetDataStreamsAction(
92101
) {
93102
super(
94103
GetDataStreamAction.NAME,
95-
transportService,
96-
clusterService,
97-
threadPool,
98104
actionFilters,
99-
GetDataStreamAction.Request::new,
100-
projectResolver,
101-
GetDataStreamAction.Response::new,
102-
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
105+
transportService.getTaskManager(),
106+
clusterService,
107+
threadPool.executor(ThreadPool.Names.MANAGEMENT),
108+
projectResolver
103109
);
104110
this.indexNameExpressionResolver = indexNameExpressionResolver;
105111
this.systemIndices = systemIndices;
@@ -108,21 +114,32 @@ public TransportGetDataStreamsAction(
108114
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
109115
this.indexSettingProviders = indexSettingProviders;
110116
this.client = new OriginSettingClient(client, "stack");
117+
118+
transportService.registerRequestHandler(
119+
actionName,
120+
executor,
121+
false,
122+
true,
123+
GetDataStreamAction.Request::new,
124+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
125+
);
111126
}
112127

113128
@Override
114-
protected void masterOperation(
129+
protected void localClusterStateOperation(
115130
Task task,
116131
GetDataStreamAction.Request request,
117132
ProjectState state,
118133
ActionListener<GetDataStreamAction.Response> listener
119134
) throws Exception {
135+
((CancellableTask) task).ensureNotCancelled();
120136
if (request.verbose()) {
121137
DataStreamsStatsAction.Request req = new DataStreamsStatsAction.Request();
122138
req.indices(request.indices());
123139
client.execute(DataStreamsStatsAction.INSTANCE, req, new ActionListener<>() {
124140
@Override
125141
public void onResponse(DataStreamsStatsAction.Response response) {
142+
((CancellableTask) task).ensureNotCancelled();
126143
final Map<String, Long> maxTimestamps = Arrays.stream(response.getDataStreams())
127144
.collect(
128145
Collectors.toMap(

modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.rest.RestUtils;
2020
import org.elasticsearch.rest.Scope;
2121
import org.elasticsearch.rest.ServerlessScope;
22+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2223
import org.elasticsearch.rest.action.RestToXContentListener;
2324

2425
import java.util.List;
@@ -64,7 +65,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
6465
getDataStreamsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
6566
getDataStreamsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamsRequest.indicesOptions()));
6667
getDataStreamsRequest.verbose(request.paramAsBoolean("verbose", false));
67-
return channel -> client.execute(GetDataStreamAction.INSTANCE, getDataStreamsRequest, new RestToXContentListener<>(channel));
68+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
69+
GetDataStreamAction.INSTANCE,
70+
getDataStreamsRequest,
71+
new RestToXContentListener<>(channel)
72+
);
6873
}
6974

7075
@Override

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsRequestTests.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

0 commit comments

Comments
 (0)