Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00);
public static final TransportVersion SLM_GET_STATS_CHANGE_REQUEST_TYPE = def(9_142_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also remove the Response(StreamInput in) constructor below, as we'll know for sure that we'll never have to deserialize a response anymore (because any node on this version will run the request locally).

And can you add a @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) line to the Response#writeTo method? When we get to V10, we won't need that method either anymore.

Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleStats;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.SLM_GET_STATS_CHANGE_REQUEST_TYPE;

/**
* This class represents the action of retriving the stats for snapshot lifecycle management.
* These are retrieved from the master's cluster state and contain numbers related to the count of
Expand All @@ -32,6 +38,33 @@ protected GetSnapshotLifecycleStatsAction() {
super(NAME);
}

public static class Request extends LocalClusterStateRequest {

public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

// private, to avoid non-backwards compatible use
private Request(StreamInput input) throws IOException {
super(input);
}

/**
* Previously this request was an AcknowledgedRequest, which had an ack timeout, and the action was an MasterNodeAction.
* This method only exists for backward compatibility to deserialize request from previous versions.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public static Request read(StreamInput input) throws IOException {
if (input.getTransportVersion().onOrAfter(SLM_GET_STATS_CHANGE_REQUEST_TYPE)) {
return new Request(input);
Comment on lines +58 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually need this transport version check (or the new transport version), as this branch will never fire. Any node that is on this transport version will execute the request locally, instead of forwarding it to the master node. Therefore, if we receive this request over the transport wire, we already know it's from an old node.

Also, we don't need this separate deserialization method, we can do this in the constructor like we do here:

public Request(StreamInput in) throws IOException {
super(in, false);
// Read and ignore ack timeout.
in.readTimeValue();
}

} else {
var requestBwc = new AcknowledgedRequest.Plain(input);
return new Request(requestBwc.masterNodeTimeout());
}
}

}

public static class Response extends ActionResponse implements ToXContentObject {

private SnapshotLifecycleStats slmStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.slm;

import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
Expand Down Expand Up @@ -63,7 +62,7 @@ public void testSnapshotLifeCycleMetadataEmptyNotChanged() throws Exception {

private GetSnapshotLifecycleStatsAction.Response getPolicyStats() {
try {
final var req = new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT);
final var req = new GetSnapshotLifecycleStatsAction.Request(TEST_REQUEST_TIMEOUT);
return client().execute(GetSnapshotLifecycleStatsAction.INSTANCE, req).get();
} catch (Exception e) {
fail("failed to get stats");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.slm.action;

import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -19,7 +18,6 @@
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestUtils.getAckTimeout;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.INTERNAL)
Expand All @@ -37,7 +35,7 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final var req = new AcknowledgedRequest.Plain(getMasterNodeTimeout(request), getAckTimeout(request));
final var req = new GetSnapshotLifecycleStatsAction.Request(getMasterNodeTimeout(request));
return channel -> client.execute(GetSnapshotLifecycleStatsAction.INSTANCE, req, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,64 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleStats;
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleStatsAction;

public class TransportGetSnapshotLifecycleStatsAction extends TransportMasterNodeAction<
AcknowledgedRequest.Plain,
public class TransportGetSnapshotLifecycleStatsAction extends TransportLocalProjectMetadataAction<
GetSnapshotLifecycleStatsAction.Request,
GetSnapshotLifecycleStatsAction.Response> {

/**
* This was a TransportMasterNodeAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Inject
@SuppressWarnings("this-escape")
public TransportGetSnapshotLifecycleStatsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
GetSnapshotLifecycleStatsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
AcknowledgedRequest.Plain::new,
GetSnapshotLifecycleStatsAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
projectResolver
);

transportService.registerRequestHandler(
GetSnapshotLifecycleStatsAction.NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
GetSnapshotLifecycleStatsAction.Request::read,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
AcknowledgedRequest.Plain request,
ClusterState state,
GetSnapshotLifecycleStatsAction.Request request,
ProjectState projectState,
ActionListener<GetSnapshotLifecycleStatsAction.Response> listener
) {
SnapshotLifecycleMetadata slmMeta = state.metadata().getProject().custom(SnapshotLifecycleMetadata.TYPE);
SnapshotLifecycleMetadata slmMeta = projectState.metadata().custom(SnapshotLifecycleMetadata.TYPE);
if (slmMeta == null) {
listener.onResponse(new GetSnapshotLifecycleStatsAction.Response(new SnapshotLifecycleStats()));
} else {
Expand All @@ -63,7 +75,7 @@ protected void masterOperation(
}

@Override
protected ClusterBlockException checkBlock(AcknowledgedRequest.Plain request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
protected ClusterBlockException checkBlock(GetSnapshotLifecycleStatsAction.Request request, ProjectState state) {
return state.blocks().globalBlockedException(state.projectId(), ClusterBlockLevel.METADATA_READ);
}
}