Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122852.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122852
summary: Run `TransportGetDataStreamsAction` on local node
area: Data streams
type: enhancement
issues: []
1 change: 1 addition & 0 deletions modules/data-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
testImplementation project(path: ':test:test-clusters')
testImplementation project(":modules:mapper-extras")
internalClusterTestImplementation project(":modules:mapper-extras")
internalClusterTestImplementation project(':modules:rest-root')
}

tasks.withType(StandaloneRestIntegTestTask).configureEach {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.support.CancellableActionTestPlugin;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.transport.netty4.Netty4Plugin;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.oneOf;

public class DataStreamRestActionCancellationIT extends ESIntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, DataStreamsPlugin.class);
}

public void testGetDataStreamCancellation() {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream"), GetDataStreamAction.NAME);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

try (
var restClient = createRestClient(node);
var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node)
) {
final var responseFuture = new PlainActionFuture<Response>();
final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture));

if (randomBoolean()) {
// cancel by aborting the REST request
capturingAction.captureAndCancel(restInvocation::cancel);
expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
} else {
// cancel via the task management API
final var cancelFuture = new PlainActionFuture<Void>();
capturingAction.captureAndCancel(
() -> SubscribableListener

.<ObjectPath>newForked(
l -> restClient.performRequestAsync(
getListTasksRequest(node, actionName),
wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse))
)
)

.<Void>andThen((l, listTasksResponse) -> {
final var taskCount = listTasksResponse.evaluateArraySize("tasks");
assertThat(taskCount, greaterThan(0));
try (var listeners = new RefCountingListener(l)) {
for (int i = 0; i < taskCount; i++) {
final var taskPrefix = "tasks." + i + ".";
assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable"));
assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled"));
restClient.performRequestAsync(
getCancelTaskRequest(
listTasksResponse.evaluate(taskPrefix + "node"),
listTasksResponse.evaluate(taskPrefix + "id")
),
wrapAsRestResponseListener(listeners.acquire(DataStreamRestActionCancellationIT::assertOK))
);
}
}
})

.addListener(cancelFuture)
);
cancelFuture.get(10, TimeUnit.SECONDS);
expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
}

assertAllTasksHaveFinished(actionName);
} catch (Exception e) {
fail(e);
}
}

private static Request getListTasksRequest(String taskNode, String actionName) {
final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
listTasksRequest.addParameter("nodes", taskNode);
listTasksRequest.addParameter("actions", actionName);
listTasksRequest.addParameter("group_by", "none");
return listTasksRequest;
}

private static Request getCancelTaskRequest(String taskNode, int taskId) {
final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId));
cancelTaskRequest.addParameter("wait_for_completion", null);
return cancelTaskRequest;
}

public static void assertOK(Response response) {
assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.IndexProperties;
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.ManagedBy;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -37,11 +38,13 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -57,7 +60,7 @@

import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;

public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction<
public class TransportGetDataStreamsAction extends TransportLocalClusterStateAction<
GetDataStreamAction.Request,
GetDataStreamAction.Response> {

Expand All @@ -69,6 +72,12 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
private final Client client;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetDataStreamsAction(
TransportService transportService,
Expand All @@ -83,29 +92,36 @@ public TransportGetDataStreamsAction(
) {
super(
GetDataStreamAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetDataStreamAction.Request::new,
GetDataStreamAction.Response::new,
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
transportService.getTaskManager(),
clusterService,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.systemIndices = systemIndices;
this.globalRetentionSettings = globalRetentionSettings;
clusterSettings = clusterService.getClusterSettings();
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
this.client = new OriginSettingClient(client, "stack");

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetDataStreamAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
GetDataStreamAction.Request request,
ClusterState state,
ActionListener<GetDataStreamAction.Response> listener
) throws Exception {
((CancellableTask) task).ensureNotCancelled();
if (request.verbose()) {
DataStreamsStatsAction.Request req = new DataStreamsStatsAction.Request();
req.indices(request.indices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.util.List;
Expand Down Expand Up @@ -64,7 +65,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
getDataStreamsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
getDataStreamsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamsRequest.indicesOptions()));
getDataStreamsRequest.verbose(request.paramAsBoolean("verbose", false));
return channel -> client.execute(GetDataStreamAction.INSTANCE, getDataStreamsRequest, new RestToXContentListener<>(channel));
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetDataStreamAction.INSTANCE,
getDataStreamsRequest,
new RestToXContentListener<>(channel)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@
*/
package org.elasticsearch.datastreams.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.datastreams.GetDataStreamAction.Request;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.local.LocalClusterStateHelper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<Request> {
import java.io.IOException;

public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<GetDataStreamsRequestTests.RequestWrapper> {

@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
protected Writeable.Reader<RequestWrapper> instanceReader() {
return RequestWrapper::new;
}

@Override
protected Request createTestInstance() {
protected RequestWrapper createTestInstance() {
var req = new Request(TEST_REQUEST_TIMEOUT, switch (randomIntBetween(1, 4)) {
case 1 -> generateRandomStringArray(3, 8, false, false);
case 2 -> {
Expand All @@ -35,11 +41,12 @@ protected Request createTestInstance() {
default -> null;
});
req.verbose(randomBoolean());
return req;
return new RequestWrapper(req);
}

@Override
protected Request mutateInstance(Request instance) {
protected RequestWrapper mutateInstance(RequestWrapper requestWrapper) {
var instance = requestWrapper.request();
var indices = instance.indices();
var indicesOpts = instance.indicesOptions();
var includeDefaults = instance.includeDefaults();
Expand All @@ -63,11 +70,39 @@ protected Request mutateInstance(Request instance) {
case 2 -> includeDefaults = includeDefaults == false;
case 3 -> verbose = verbose == false;
}
var newReq = new Request(instance.masterNodeTimeout(), indices);
var newReq = new Request(instance.masterTimeout(), indices);
newReq.includeDefaults(includeDefaults);
newReq.indicesOptions(indicesOpts);
newReq.verbose(verbose);
return newReq;
return new RequestWrapper(newReq);
}

/**
* We need to wrap the request class because the request itself doesn't need to be able to serialize to the wire because
* a new node will never forward the request to a different node. Therefore, we moved the serialization here to still be able to test
* the deserialization.
*/
public static class RequestWrapper extends LocalClusterStateHelper.RequestSerializationWrapper<Request> {

RequestWrapper(Request request) {
super(request);
}

RequestWrapper(StreamInput in) throws IOException {
this(new Request(in));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(request().getNames());
request().indicesOptions().writeIndicesOptions(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
out.writeBoolean(request().includeDefaults());
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeBoolean(request().verbose());
}
}
}
}
Loading