Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129367.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129367
summary: Run `TransportGetStatusAction` on local node
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ilm.OperationMode;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class GetStatusAction extends ActionType<GetStatusAction.Response> {
Expand All @@ -27,6 +34,29 @@ protected GetStatusAction() {
super(NAME);
}

public static class Request extends LocalClusterStateRequest {

public Request(TimeValue masterTimeout) {
super(masterTimeout);
}

/**
* NB prior to 9.1 this was a TransportMasterNodeAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException {
super(in, false);
// Read and ignore ack timeout.
in.readTimeValue();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
}

public static class Response extends ActionResponse implements ToXContentObject {

private final OperationMode mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
Expand Down Expand Up @@ -442,10 +441,9 @@ public void testCreatePolicyWhenStopped() throws Exception {

assertAcked(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)).get());
assertBusy(() -> {
OperationMode mode = client().execute(
GetStatusAction.INSTANCE,
new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
).get().getMode();
OperationMode mode = client().execute(GetStatusAction.INSTANCE, new GetStatusAction.Request(TEST_REQUEST_TIMEOUT))
.get()
.getMode();
logger.info("--> waiting for STOPPED, currently: {}", mode);
assertThat(mode, equalTo(OperationMode.STOPPED));
});
Expand Down Expand Up @@ -473,7 +471,7 @@ public void testCreatePolicyWhenStopped() throws Exception {
// assert ILM is still stopped
GetStatusAction.Response statusResponse = client().execute(
GetStatusAction.INSTANCE,
new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
new GetStatusAction.Request(TEST_REQUEST_TIMEOUT)
).get();
assertThat(statusResponse.getMode(), equalTo(OperationMode.STOPPED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@

package org.elasticsearch.xpack.ilm.action;

import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;

import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestUtils.getAckTimeout;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

public class RestGetStatusAction extends BaseRestHandler {
Expand All @@ -34,7 +37,25 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
final var request = new AcknowledgedRequest.Plain(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest));
return channel -> client.execute(GetStatusAction.INSTANCE, request, new RestToXContentListener<>(channel));
final var request = new GetStatusAction.Request(getMasterNodeTimeout(restRequest));
if (restRequest.hasParam("timeout")) {
// Consume this param just for validation when in BWC mode.
final var timeout = restRequest.paramAsTime("timeout", null);
if (restRequest.getRestApiVersion() != RestApiVersion.V_8) {
DeprecationLogger.getLogger(TransportLocalClusterStateAction.class)
.critical(
DeprecationCategory.API,
"TransportLocalClusterStateAction-timeout-parameter",
"the [?timeout] query parameter to this API has no effect, is now deprecated, "
+ "and will be removed in a future version"
);
}

}
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
GetStatusAction.INSTANCE,
request,
new RestToXContentListener<>(channel)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -26,10 +26,14 @@

import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;

public class TransportGetStatusAction extends TransportMasterNodeAction<AcknowledgedRequest.Plain, Response> {

private final ProjectResolver projectResolver;
public class TransportGetStatusAction extends TransportLocalProjectMetadataAction<GetStatusAction.Request, Response> {

/**
* NB prior to 9.1 this was a TransportMasterNodeAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetStatusAction(
TransportService transportService,
Expand All @@ -40,24 +44,36 @@ public TransportGetStatusAction(
) {
super(
GetStatusAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
AcknowledgedRequest.Plain::new,
Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
transportService.getTaskManager(),
clusterService,
threadPool.executor(ThreadPool.Names.MANAGEMENT),
projectResolver
);
this.projectResolver = projectResolver;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetStatusAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);

}

@Override
protected void masterOperation(Task task, AcknowledgedRequest.Plain request, ClusterState state, ActionListener<Response> listener) {
listener.onResponse(new Response(currentILMMode(projectResolver.getProjectMetadata(state))));
protected void localClusterStateOperation(
Task task,
GetStatusAction.Request request,
ProjectState state,
ActionListener<Response> listener
) {
listener.onResponse(new Response(currentILMMode(state.metadata())));
}

@Override
protected ClusterBlockException checkBlock(AcknowledgedRequest.Plain request, ClusterState state) {
protected ClusterBlockException checkBlock(GetStatusAction.Request request, ProjectState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ public void testModeSnapshotRestore() throws Exception {
}

private OperationMode ilmMode() throws Exception {
return client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
.get()
.getMode();
return client().execute(GetStatusAction.INSTANCE, new GetStatusAction.Request(TEST_REQUEST_TIMEOUT)).get().getMode();
}

private OperationMode slmMode() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
Expand Down Expand Up @@ -139,9 +138,7 @@ public void testILMState() throws Exception {
// stop ILM so source does not change after copying metadata
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
assertBusy(() -> {
var statusResponse = safeGet(
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
);
var statusResponse = safeGet(client().execute(GetStatusAction.INSTANCE, new GetStatusAction.Request(TEST_REQUEST_TIMEOUT)));
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -623,9 +622,7 @@ public void testIndexLifecycleSettingNotCopied() throws Exception {
private void stopILM() throws Exception {
assertAcked(safeGet(client().execute(ILMActions.STOP, new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))));
assertBusy(() -> {
var statusResponse = safeGet(
client().execute(GetStatusAction.INSTANCE, new AcknowledgedRequest.Plain(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
);
var statusResponse = safeGet(client().execute(GetStatusAction.INSTANCE, new GetStatusAction.Request(TEST_REQUEST_TIMEOUT)));
assertEquals(OperationMode.STOPPED, statusResponse.getMode());
});
}
Expand Down
Loading