Skip to content

Commit 12b402c

Browse files
Retry ILM async action after reindexing data stream (#124149) (#124280)
When reindexing a data stream, the ILM metadata is copied from the index metadata of the source index to the destination index. But the ILM state of the new index can be stuck if the source index was in an AsyncAction at the time of reindexing. To un-stick the new index, we call TransportRetryAction to retry the AsyncAction. In the past this action would only run if the index were in the error phase. This change includes an update to TransportRetryAction, which allows it to be run when the index is not in an error phase, if the parameter requireError is set to false. (cherry picked from commit 10a8dcf) # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java
1 parent 099dbfa commit 12b402c

File tree

10 files changed

+274
-135
lines changed

10 files changed

+274
-135
lines changed

docs/changelog/124149.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124149
2+
summary: Retry ILM async action after reindexing data stream
3+
area: Data streams
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ static TransportVersion def(int id) {
176176
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
177177
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
178178
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_0_01);
179+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02);
179180
public static final TransportVersion INITIAL_ELASTICSEARCH_8_19 = def(8_841_0_00);
180181
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0 = def(9_000_0_00);
181182
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
@@ -184,6 +185,7 @@ static TransportVersion def(int id) {
184185
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_90 = def(9_000_0_04);
185186
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_9_0 = def(9_000_0_05);
186187
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_90 = def(9_000_0_06);
188+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90 = def(9_000_0_07);
187189

188190
/*
189191
* STOP! READ THIS FIRST! No, really,
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ilm.action;
9+
10+
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.IndicesRequest;
13+
import org.elasticsearch.action.support.IndicesOptions;
14+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.core.TimeValue;
18+
19+
import java.io.IOException;
20+
import java.util.Arrays;
21+
import java.util.Objects;
22+
23+
public class RetryActionRequest extends AcknowledgedRequest<RetryActionRequest> implements IndicesRequest.Replaceable {
24+
private String[] indices;
25+
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
26+
private boolean requireError = true;
27+
28+
public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
29+
super(masterNodeTimeout, ackTimeout);
30+
this.indices = indices;
31+
}
32+
33+
public RetryActionRequest(StreamInput in) throws IOException {
34+
super(in);
35+
this.indices = in.readStringArray();
36+
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
37+
if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
38+
|| in.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
39+
this.requireError = in.readBoolean();
40+
}
41+
}
42+
43+
@Override
44+
public RetryActionRequest indices(String... indices) {
45+
this.indices = indices;
46+
return this;
47+
}
48+
49+
@Override
50+
public String[] indices() {
51+
return indices;
52+
}
53+
54+
@Override
55+
public IndicesOptions indicesOptions() {
56+
return indicesOptions;
57+
}
58+
59+
public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) {
60+
this.indicesOptions = indicesOptions;
61+
return this;
62+
}
63+
64+
public void requireError(boolean requireError) {
65+
this.requireError = requireError;
66+
}
67+
68+
public boolean requireError() {
69+
return requireError;
70+
}
71+
72+
@Override
73+
public ActionRequestValidationException validate() {
74+
return null;
75+
}
76+
77+
@Override
78+
public void writeTo(StreamOutput out) throws IOException {
79+
super.writeTo(out);
80+
out.writeStringArray(indices);
81+
indicesOptions.writeIndicesOptions(out);
82+
if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
83+
|| out.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
84+
out.writeBoolean(requireError);
85+
}
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError);
91+
}
92+
93+
@Override
94+
public boolean equals(Object obj) {
95+
if (obj == null) {
96+
return false;
97+
}
98+
if (obj.getClass() != getClass()) {
99+
return false;
100+
}
101+
RetryActionRequest other = (RetryActionRequest) obj;
102+
return Objects.deepEquals(indices, other.indices)
103+
&& Objects.equals(indicesOptions, other.indicesOptions)
104+
&& requireError == other.requireError;
105+
}
106+
107+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.search.TransportSearchScrollAction;
3131
import org.elasticsearch.index.reindex.ReindexAction;
3232
import org.elasticsearch.xpack.core.XPackPlugin;
33+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
3334
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
3435
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
3536

@@ -222,7 +223,8 @@ public class InternalUsers {
222223
TransportBulkAction.NAME,
223224
TransportIndexAction.NAME,
224225
TransportSearchScrollAction.TYPE.name(),
225-
ModifyDataStreamsAction.NAME
226+
ModifyDataStreamsAction.NAME,
227+
ILMActions.RETRY.name()
226228
)
227229
.allowRestrictedIndices(false)
228230
.build() },

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,11 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata,
322322
logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", currentStepKey, index, policy);
323323
return;
324324
}
325-
325+
if (expectedStepKey.phase() == null && expectedStepKey.name() == null && expectedStepKey.action() == null) {
326+
// ILM is stopped, so do not try to run async action
327+
logger.debug("expected step for index [{}] with policy [{}] is [{}], not running async action", index, policy, expectedStepKey);
328+
return;
329+
}
326330
logger.trace(
327331
"[{}] maybe running async action step ({}) with current step {}",
328332
index,

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.rest.RestRequest;
1616
import org.elasticsearch.rest.action.RestToXContentListener;
1717
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
18+
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
1819

1920
import java.util.List;
2021

@@ -37,7 +38,7 @@ public String getName() {
3738
@Override
3839
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
3940
final var indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
40-
final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
41+
final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
4142
request.indices(indices);
4243
request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
4344
return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel));

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java

Lines changed: 30 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.action.ActionRequestValidationException;
14-
import org.elasticsearch.action.IndicesRequest;
1513
import org.elasticsearch.action.support.ActionFilters;
16-
import org.elasticsearch.action.support.IndicesOptions;
17-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1814
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1915
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2016
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -25,24 +21,18 @@
2521
import org.elasticsearch.cluster.metadata.IndexMetadata;
2622
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
2723
import org.elasticsearch.cluster.service.ClusterService;
28-
import org.elasticsearch.common.io.stream.StreamInput;
29-
import org.elasticsearch.common.io.stream.StreamOutput;
3024
import org.elasticsearch.common.util.concurrent.EsExecutors;
3125
import org.elasticsearch.core.SuppressForbidden;
32-
import org.elasticsearch.core.TimeValue;
3326
import org.elasticsearch.injection.guice.Inject;
3427
import org.elasticsearch.tasks.Task;
3528
import org.elasticsearch.threadpool.ThreadPool;
3629
import org.elasticsearch.transport.TransportService;
3730
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
3831
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
32+
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
3933
import org.elasticsearch.xpack.ilm.IndexLifecycleService;
4034

41-
import java.io.IOException;
42-
import java.util.Arrays;
43-
import java.util.Objects;
44-
45-
public class TransportRetryAction extends TransportMasterNodeAction<TransportRetryAction.Request, AcknowledgedResponse> {
35+
public class TransportRetryAction extends TransportMasterNodeAction<RetryActionRequest, AcknowledgedResponse> {
4636

4737
private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);
4838

@@ -62,15 +52,25 @@ public TransportRetryAction(
6252
clusterService,
6353
threadPool,
6454
actionFilters,
65-
Request::new,
55+
RetryActionRequest::new,
6656
AcknowledgedResponse::readFrom,
6757
EsExecutors.DIRECT_EXECUTOR_SERVICE
6858
);
6959
this.indexLifecycleService = indexLifecycleService;
7060
}
7161

7262
@Override
73-
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
63+
protected void masterOperation(
64+
Task task,
65+
RetryActionRequest request,
66+
ClusterState state,
67+
ActionListener<AcknowledgedResponse> listener
68+
) {
69+
if (request.requireError() == false) {
70+
maybeRunAsyncAction(state, request.indices());
71+
listener.onResponse(AcknowledgedResponse.TRUE);
72+
return;
73+
}
7474
submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) {
7575
@Override
7676
public ClusterState execute(ClusterState currentState) {
@@ -79,101 +79,33 @@ public ClusterState execute(ClusterState currentState) {
7979

8080
@Override
8181
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
82-
for (String index : request.indices()) {
83-
IndexMetadata idxMeta = newState.metadata().index(index);
84-
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
85-
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
86-
if (idxMeta == null) {
87-
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
88-
logger.debug(
89-
"index ["
90-
+ index
91-
+ "] has been deleted after moving to step ["
92-
+ lifecycleState.step()
93-
+ "], skipping async action check"
94-
);
95-
return;
96-
}
97-
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
98-
}
82+
maybeRunAsyncAction(newState, request.indices());
9983
}
10084
});
10185
}
10286

87+
private void maybeRunAsyncAction(ClusterState state, String[] indices) {
88+
for (String index : indices) {
89+
IndexMetadata idxMeta = state.metadata().index(index);
90+
if (idxMeta == null) {
91+
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
92+
logger.debug("index [" + index + "] has been deleted, skipping async action check");
93+
return;
94+
}
95+
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
96+
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
97+
indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep);
98+
}
99+
}
100+
103101
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
104102
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
105103
clusterService.submitUnbatchedStateUpdateTask(source, task);
106104
}
107105

108106
@Override
109-
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
107+
protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) {
110108
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
111109
}
112110

113-
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
114-
private String[] indices;
115-
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
116-
117-
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
118-
super(masterNodeTimeout, ackTimeout);
119-
this.indices = indices;
120-
}
121-
122-
public Request(StreamInput in) throws IOException {
123-
super(in);
124-
this.indices = in.readStringArray();
125-
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
126-
}
127-
128-
@Override
129-
public Request indices(String... indices) {
130-
this.indices = indices;
131-
return this;
132-
}
133-
134-
@Override
135-
public String[] indices() {
136-
return indices;
137-
}
138-
139-
@Override
140-
public IndicesOptions indicesOptions() {
141-
return indicesOptions;
142-
}
143-
144-
public Request indicesOptions(IndicesOptions indicesOptions) {
145-
this.indicesOptions = indicesOptions;
146-
return this;
147-
}
148-
149-
@Override
150-
public ActionRequestValidationException validate() {
151-
return null;
152-
}
153-
154-
@Override
155-
public void writeTo(StreamOutput out) throws IOException {
156-
super.writeTo(out);
157-
out.writeStringArray(indices);
158-
indicesOptions.writeIndicesOptions(out);
159-
}
160-
161-
@Override
162-
public int hashCode() {
163-
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
164-
}
165-
166-
@Override
167-
public boolean equals(Object obj) {
168-
if (obj == null) {
169-
return false;
170-
}
171-
if (obj.getClass() != getClass()) {
172-
return false;
173-
}
174-
Request other = (Request) obj;
175-
return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions);
176-
}
177-
178-
}
179111
}

0 commit comments

Comments
 (0)