Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,23 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_0_01);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02);
public static final TransportVersion INITIAL_ELASTICSEARCH_8_19 = def(8_841_0_00);
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_841_0_01);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_19 = def(8_841_0_02);
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_841_0_03);
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS_BACKPORT_8_19 = def(8_841_0_04);
public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED_BACKPORT_8_X = def(8_841_0_05);
public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_19 = def(8_841_0_06);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19 = def(8_841_0_07);
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0 = def(9_000_0_00);
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);
public static final TransportVersion REMOVE_DESIRED_NODE_VERSION_90 = def(9_000_0_03);
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_90 = def(9_000_0_04);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_9_0 = def(9_000_0_05);
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_90 = def(9_000_0_06);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90 = def(9_000_0_07);
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00);
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
Expand All @@ -211,6 +214,7 @@ static TransportVersion def(int id) {
public static final TransportVersion MULTI_PROJECT = def(9_018_0_00);
public static final TransportVersion STORED_SCRIPT_CONTENT_LENGTH = def(9_019_0_00);
public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_020_0_00);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_021_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

public class RetryActionRequest extends AcknowledgedRequest<RetryActionRequest> implements IndicesRequest.Replaceable {
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private boolean requireError = false;

public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
super(masterNodeTimeout, ackTimeout);
this.indices = indices;
}

public RetryActionRequest(StreamInput in) throws IOException {
super(in);
this.indices = in.readStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR)
|| in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
|| in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
|| in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
this.requireError = in.readBoolean();
}
}

@Override
public RetryActionRequest indices(String... indices) {
this.indices = indices;
return this;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

public void requireError(boolean requireError) {
this.requireError = requireError;
}

public boolean requireError() {
return requireError;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR)
|| out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
|| out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
|| out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
out.writeBoolean(requireError);
}
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
RetryActionRequest other = (RetryActionRequest) obj;
return Objects.deepEquals(indices, other.indices)
&& Objects.equals(indicesOptions, other.indicesOptions)
&& requireError == other.requireError;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.support.MetadataUtils;

Expand Down Expand Up @@ -222,7 +223,8 @@ public class InternalUsers {
TransportBulkAction.NAME,
TransportIndexAction.NAME,
TransportSearchScrollAction.TYPE.name(),
ModifyDataStreamsAction.NAME
ModifyDataStreamsAction.NAME,
ILMActions.RETRY.name()
)
.allowRestrictedIndices(false)
.build() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;

import java.util.List;

Expand All @@ -37,7 +38,7 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
final var indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
request.indices(indices);
request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
Expand All @@ -25,24 +21,18 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
import org.elasticsearch.xpack.ilm.IndexLifecycleService;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

public class TransportRetryAction extends TransportMasterNodeAction<TransportRetryAction.Request, AcknowledgedResponse> {
public class TransportRetryAction extends TransportMasterNodeAction<RetryActionRequest, AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);

Expand All @@ -62,15 +52,20 @@ public TransportRetryAction(
clusterService,
threadPool,
actionFilters,
Request::new,
RetryActionRequest::new,
AcknowledgedResponse::readFrom,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.indexLifecycleService = indexLifecycleService;
}

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
protected void masterOperation(Task task, RetryActionRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
if (request.requireError() == false) {
maybeRunAsyncAction(state, request.indices());
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -79,101 +74,37 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
for (String index : request.indices()) {
IndexMetadata idxMeta = newState.metadata().getProject().index(index);
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug(
"index ["
+ index
+ "] has been deleted after moving to step ["
+ lifecycleState.step()
+ "], skipping async action check"
);
return;
}
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
}
maybeRunAsyncAction(newState, request.indices());
}
});
}

private void maybeRunAsyncAction(ClusterState state, String[] indices) {
for (String index : indices) {
IndexMetadata idxMeta = state.metadata().getProject().index(index);
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug(
"index ["
+ index
+ "] has been deleted, skipping async action check"
);
return;
}
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep);
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
super(masterNodeTimeout, ackTimeout);
this.indices = indices;
}

public Request(StreamInput in) throws IOException {
super(in);
this.indices = in.readStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
public Request indices(String... indices) {
this.indices = indices;
return this;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

public Request indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions);
}

}
}
Loading