Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124149.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124149
summary: Retry ILM async action after reindexing data stream
area: Data streams
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_0_01);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02);
public static final TransportVersion INITIAL_ELASTICSEARCH_8_19 = def(8_841_0_00);
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_841_0_01);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_19 = def(8_841_0_02);
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_841_0_03);
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS_BACKPORT_8_19 = def(8_841_0_04);
public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED_BACKPORT_8_X = def(8_841_0_05);
public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_19 = def(8_841_0_06);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19 = def(8_841_0_07);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

public class RetryActionRequest extends AcknowledgedRequest<RetryActionRequest> implements IndicesRequest.Replaceable {
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private boolean requireError = true;

public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
super(masterNodeTimeout, ackTimeout);
this.indices = indices;
}

public RetryActionRequest(StreamInput in) throws IOException {
super(in);
this.indices = in.readStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
|| in.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
this.requireError = in.readBoolean();
}
}

@Override
public RetryActionRequest indices(String... indices) {
this.indices = indices;
return this;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

public void requireError(boolean requireError) {
this.requireError = requireError;
}

public boolean requireError() {
return requireError;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
|| out.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
out.writeBoolean(requireError);
}
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
RetryActionRequest other = (RetryActionRequest) obj;
return Objects.deepEquals(indices, other.indices)
&& Objects.equals(indicesOptions, other.indicesOptions)
&& requireError == other.requireError;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.support.MetadataUtils;

Expand Down Expand Up @@ -224,7 +225,8 @@ public class InternalUsers {
TransportBulkAction.NAME,
TransportIndexAction.NAME,
TransportSearchScrollAction.TYPE.name(),
ModifyDataStreamsAction.NAME
ModifyDataStreamsAction.NAME,
ILMActions.RETRY.name()
)
.allowRestrictedIndices(false)
.build() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata,
logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", currentStepKey, index, policy);
return;
}

if (expectedStepKey.phase() == null && expectedStepKey.name() == null && expectedStepKey.action() == null) {
// ILM is stopped, so do not try to run async action
logger.debug("expected step for index [{}] with policy [{}] is [{}], not running async action", index, policy, expectedStepKey);
return;
}
logger.trace(
"[{}] maybe running async action step ({}) with current step {}",
index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;

import java.util.List;

Expand All @@ -37,7 +38,7 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
final var indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
request.indices(indices);
request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
Expand All @@ -25,24 +21,18 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
import org.elasticsearch.xpack.ilm.IndexLifecycleService;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

public class TransportRetryAction extends TransportMasterNodeAction<TransportRetryAction.Request, AcknowledgedResponse> {
public class TransportRetryAction extends TransportMasterNodeAction<RetryActionRequest, AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);

Expand All @@ -62,15 +52,25 @@ public TransportRetryAction(
clusterService,
threadPool,
actionFilters,
Request::new,
RetryActionRequest::new,
AcknowledgedResponse::readFrom,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.indexLifecycleService = indexLifecycleService;
}

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
protected void masterOperation(
Task task,
RetryActionRequest request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) {
if (request.requireError() == false) {
maybeRunAsyncAction(state, request.indices());
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -79,101 +79,33 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
for (String index : request.indices()) {
IndexMetadata idxMeta = newState.metadata().index(index);
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug(
"index ["
+ index
+ "] has been deleted after moving to step ["
+ lifecycleState.step()
+ "], skipping async action check"
);
return;
}
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
}
maybeRunAsyncAction(newState, request.indices());
}
});
}

private void maybeRunAsyncAction(ClusterState state, String[] indices) {
for (String index : indices) {
IndexMetadata idxMeta = state.metadata().index(index);
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug("index [" + index + "] has been deleted, skipping async action check");
return;
}
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep);
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
super(masterNodeTimeout, ackTimeout);
this.indices = indices;
}

public Request(StreamInput in) throws IOException {
super(in);
this.indices = in.readStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
public Request indices(String... indices) {
this.indices = indices;
return this;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

public Request indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions);
}

}
}
Loading