Skip to content

Commit 335a1d4

Browse files
[8.18] Retry ILM async action after reindexing data stream (elastic#124149) (elastic#124270)
* Retry ILM async action after reindexing data stream (elastic#124149) When reindexing a data stream, the ILM metadata is copied from the index metadata of the source index to the destination index. But the ILM state of the new index can be stuck if the source index was in an AsyncAction at the time of reindexing. To un-stick the new index, we call TransportRetryAction to retry the AsyncAction. In the past this action would only run if the index were in the error phase. This change includes an update to TransportRetryAction, which allows it to be run when the index is not in an error phase, if the parameter requireError is set to false. (cherry picked from commit 10a8dcf) # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java # x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java # x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java * index mode cannot be set on v7 indices
1 parent 98e6565 commit 335a1d4

File tree

10 files changed

+270
-134
lines changed

10 files changed

+270
-134
lines changed

docs/changelog/124149.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124149
2+
summary: Retry ILM async action after reindexing data stream
3+
area: Data streams
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ static TransportVersion def(int id) {
183183
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
184184
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_00_0);
185185
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_00_1);
186+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02);
186187

187188
/*
188189
* STOP! READ THIS FIRST! No, really,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ilm.action;
9+
10+
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.IndicesRequest;
13+
import org.elasticsearch.action.support.IndicesOptions;
14+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.core.TimeValue;
18+
19+
import java.io.IOException;
20+
import java.util.Arrays;
21+
import java.util.Objects;
22+
23+
public class RetryActionRequest extends AcknowledgedRequest<RetryActionRequest> implements IndicesRequest.Replaceable {
24+
private String[] indices;
25+
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
26+
private boolean requireError = true;
27+
28+
public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
29+
super(masterNodeTimeout, ackTimeout);
30+
this.indices = indices;
31+
}
32+
33+
public RetryActionRequest(StreamInput in) throws IOException {
34+
super(in);
35+
this.indices = in.readStringArray();
36+
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
37+
if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
38+
this.requireError = in.readBoolean();
39+
}
40+
}
41+
42+
@Override
43+
public RetryActionRequest indices(String... indices) {
44+
this.indices = indices;
45+
return this;
46+
}
47+
48+
@Override
49+
public String[] indices() {
50+
return indices;
51+
}
52+
53+
@Override
54+
public IndicesOptions indicesOptions() {
55+
return indicesOptions;
56+
}
57+
58+
public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) {
59+
this.indicesOptions = indicesOptions;
60+
return this;
61+
}
62+
63+
public void requireError(boolean requireError) {
64+
this.requireError = requireError;
65+
}
66+
67+
public boolean requireError() {
68+
return requireError;
69+
}
70+
71+
@Override
72+
public ActionRequestValidationException validate() {
73+
return null;
74+
}
75+
76+
@Override
77+
public void writeTo(StreamOutput out) throws IOException {
78+
super.writeTo(out);
79+
out.writeStringArray(indices);
80+
indicesOptions.writeIndicesOptions(out);
81+
if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
82+
out.writeBoolean(requireError);
83+
}
84+
}
85+
86+
@Override
87+
public int hashCode() {
88+
return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError);
89+
}
90+
91+
@Override
92+
public boolean equals(Object obj) {
93+
if (obj == null) {
94+
return false;
95+
}
96+
if (obj.getClass() != getClass()) {
97+
return false;
98+
}
99+
RetryActionRequest other = (RetryActionRequest) obj;
100+
return Objects.deepEquals(indices, other.indices)
101+
&& Objects.equals(indicesOptions, other.indicesOptions)
102+
&& requireError == other.requireError;
103+
}
104+
105+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.index.reindex.ReindexAction;
3232
import org.elasticsearch.xpack.core.XPackPlugin;
3333
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
34+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
3435
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
3536
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
3637

@@ -224,7 +225,8 @@ public class InternalUsers {
224225
TransportBulkAction.NAME,
225226
TransportIndexAction.NAME,
226227
TransportSearchScrollAction.TYPE.name(),
227-
ModifyDataStreamsAction.NAME
228+
ModifyDataStreamsAction.NAME,
229+
ILMActions.RETRY.name()
228230
)
229231
.allowRestrictedIndices(false)
230232
.build() },

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,11 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata,
322322
logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", currentStepKey, index, policy);
323323
return;
324324
}
325-
325+
if (expectedStepKey.phase() == null && expectedStepKey.name() == null && expectedStepKey.action() == null) {
326+
// ILM is stopped, so do not try to run async action
327+
logger.debug("expected step for index [{}] with policy [{}] is [{}], not running async action", index, policy, expectedStepKey);
328+
return;
329+
}
326330
logger.trace(
327331
"[{}] maybe running async action step ({}) with current step {}",
328332
index,

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.rest.RestRequest;
1616
import org.elasticsearch.rest.action.RestToXContentListener;
1717
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
18+
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
1819

1920
import java.util.List;
2021

@@ -37,7 +38,7 @@ public String getName() {
3738
@Override
3839
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
3940
final var indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
40-
final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
41+
final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
4142
request.indices(indices);
4243
request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
4344
return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel));

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java

Lines changed: 30 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.action.ActionRequestValidationException;
14-
import org.elasticsearch.action.IndicesRequest;
1513
import org.elasticsearch.action.support.ActionFilters;
16-
import org.elasticsearch.action.support.IndicesOptions;
17-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1814
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1915
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2016
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -26,24 +22,18 @@
2622
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2723
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
2824
import org.elasticsearch.cluster.service.ClusterService;
29-
import org.elasticsearch.common.io.stream.StreamInput;
30-
import org.elasticsearch.common.io.stream.StreamOutput;
3125
import org.elasticsearch.common.util.concurrent.EsExecutors;
3226
import org.elasticsearch.core.SuppressForbidden;
33-
import org.elasticsearch.core.TimeValue;
3427
import org.elasticsearch.injection.guice.Inject;
3528
import org.elasticsearch.tasks.Task;
3629
import org.elasticsearch.threadpool.ThreadPool;
3730
import org.elasticsearch.transport.TransportService;
3831
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
3932
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
33+
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
4034
import org.elasticsearch.xpack.ilm.IndexLifecycleService;
4135

42-
import java.io.IOException;
43-
import java.util.Arrays;
44-
import java.util.Objects;
45-
46-
public class TransportRetryAction extends TransportMasterNodeAction<TransportRetryAction.Request, AcknowledgedResponse> {
36+
public class TransportRetryAction extends TransportMasterNodeAction<RetryActionRequest, AcknowledgedResponse> {
4737

4838
private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);
4939

@@ -64,15 +54,25 @@ public TransportRetryAction(
6454
clusterService,
6555
threadPool,
6656
actionFilters,
67-
Request::new,
57+
RetryActionRequest::new,
6858
AcknowledgedResponse::readFrom,
6959
EsExecutors.DIRECT_EXECUTOR_SERVICE
7060
);
7161
this.indexLifecycleService = indexLifecycleService;
7262
}
7363

7464
@Override
75-
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
65+
protected void masterOperation(
66+
Task task,
67+
RetryActionRequest request,
68+
ClusterState state,
69+
ActionListener<AcknowledgedResponse> listener
70+
) {
71+
if (request.requireError() == false) {
72+
maybeRunAsyncAction(state, request.indices());
73+
listener.onResponse(AcknowledgedResponse.TRUE);
74+
return;
75+
}
7676
submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) {
7777
@Override
7878
public ClusterState execute(ClusterState currentState) {
@@ -81,101 +81,33 @@ public ClusterState execute(ClusterState currentState) {
8181

8282
@Override
8383
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
84-
for (String index : request.indices()) {
85-
IndexMetadata idxMeta = newState.metadata().index(index);
86-
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
87-
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
88-
if (idxMeta == null) {
89-
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
90-
logger.debug(
91-
"index ["
92-
+ index
93-
+ "] has been deleted after moving to step ["
94-
+ lifecycleState.step()
95-
+ "], skipping async action check"
96-
);
97-
return;
98-
}
99-
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
100-
}
84+
maybeRunAsyncAction(newState, request.indices());
10185
}
10286
});
10387
}
10488

89+
private void maybeRunAsyncAction(ClusterState state, String[] indices) {
90+
for (String index : indices) {
91+
IndexMetadata idxMeta = state.metadata().index(index);
92+
if (idxMeta == null) {
93+
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
94+
logger.debug("index [" + index + "] has been deleted, skipping async action check");
95+
return;
96+
}
97+
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
98+
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
99+
indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep);
100+
}
101+
}
102+
105103
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
106104
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
107105
clusterService.submitUnbatchedStateUpdateTask(source, task);
108106
}
109107

110108
@Override
111-
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
109+
protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) {
112110
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
113111
}
114112

115-
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
116-
private String[] indices;
117-
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
118-
119-
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
120-
super(masterNodeTimeout, ackTimeout);
121-
this.indices = indices;
122-
}
123-
124-
public Request(StreamInput in) throws IOException {
125-
super(in);
126-
this.indices = in.readStringArray();
127-
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
128-
}
129-
130-
@Override
131-
public Request indices(String... indices) {
132-
this.indices = indices;
133-
return this;
134-
}
135-
136-
@Override
137-
public String[] indices() {
138-
return indices;
139-
}
140-
141-
@Override
142-
public IndicesOptions indicesOptions() {
143-
return indicesOptions;
144-
}
145-
146-
public Request indicesOptions(IndicesOptions indicesOptions) {
147-
this.indicesOptions = indicesOptions;
148-
return this;
149-
}
150-
151-
@Override
152-
public ActionRequestValidationException validate() {
153-
return null;
154-
}
155-
156-
@Override
157-
public void writeTo(StreamOutput out) throws IOException {
158-
super.writeTo(out);
159-
out.writeStringArray(indices);
160-
indicesOptions.writeIndicesOptions(out);
161-
}
162-
163-
@Override
164-
public int hashCode() {
165-
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
166-
}
167-
168-
@Override
169-
public boolean equals(Object obj) {
170-
if (obj == null) {
171-
return false;
172-
}
173-
if (obj.getClass() != getClass()) {
174-
return false;
175-
}
176-
Request other = (Request) obj;
177-
return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions);
178-
}
179-
180-
}
181113
}

0 commit comments

Comments
 (0)