Skip to content

Commit a1f0b89

Browse files
committed
Retry ILM async action after reindexing data stream
1 parent 7731316 commit a1f0b89

File tree

8 files changed

+239
-126
lines changed

8 files changed

+239
-126
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,20 +177,23 @@ static TransportVersion def(int id) {
177177
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
178178
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
179179
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_0_01);
180+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02);
180181
public static final TransportVersion INITIAL_ELASTICSEARCH_8_19 = def(8_841_0_00);
181182
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_841_0_01);
182183
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_19 = def(8_841_0_02);
183184
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_841_0_03);
184185
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS_BACKPORT_8_19 = def(8_841_0_04);
185186
public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED_BACKPORT_8_X = def(8_841_0_05);
186187
public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_19 = def(8_841_0_06);
188+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19 = def(8_841_0_07);
187189
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0 = def(9_000_0_00);
188190
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
189191
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);
190192
public static final TransportVersion REMOVE_DESIRED_NODE_VERSION_90 = def(9_000_0_03);
191193
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_90 = def(9_000_0_04);
192194
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_9_0 = def(9_000_0_05);
193195
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_90 = def(9_000_0_06);
196+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90 = def(9_000_0_07);
194197
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
195198
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00);
196199
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
@@ -211,6 +214,7 @@ static TransportVersion def(int id) {
211214
public static final TransportVersion MULTI_PROJECT = def(9_018_0_00);
212215
public static final TransportVersion STORED_SCRIPT_CONTENT_LENGTH = def(9_019_0_00);
213216
public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_020_0_00);
217+
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_021_0_00);
214218

215219
/*
216220
* STOP! READ THIS FIRST! No, really,
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ilm.action;
9+
10+
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.IndicesRequest;
13+
import org.elasticsearch.action.support.IndicesOptions;
14+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.core.TimeValue;
18+
19+
import java.io.IOException;
20+
import java.util.Arrays;
21+
import java.util.Objects;
22+
23+
public class RetryActionRequest extends AcknowledgedRequest<RetryActionRequest> implements IndicesRequest.Replaceable {
24+
private String[] indices;
25+
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
26+
private boolean requireError = false;
27+
28+
public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
29+
super(masterNodeTimeout, ackTimeout);
30+
this.indices = indices;
31+
}
32+
33+
public RetryActionRequest(StreamInput in) throws IOException {
34+
super(in);
35+
this.indices = in.readStringArray();
36+
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
37+
if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR)
38+
|| in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
39+
|| in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
40+
|| in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
41+
this.requireError = in.readBoolean();
42+
}
43+
}
44+
45+
@Override
46+
public RetryActionRequest indices(String... indices) {
47+
this.indices = indices;
48+
return this;
49+
}
50+
51+
@Override
52+
public String[] indices() {
53+
return indices;
54+
}
55+
56+
@Override
57+
public IndicesOptions indicesOptions() {
58+
return indicesOptions;
59+
}
60+
61+
public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) {
62+
this.indicesOptions = indicesOptions;
63+
return this;
64+
}
65+
66+
public void requireError(boolean requireError) {
67+
this.requireError = requireError;
68+
}
69+
70+
public boolean requireError() {
71+
return requireError;
72+
}
73+
74+
@Override
75+
public ActionRequestValidationException validate() {
76+
return null;
77+
}
78+
79+
@Override
80+
public void writeTo(StreamOutput out) throws IOException {
81+
super.writeTo(out);
82+
out.writeStringArray(indices);
83+
indicesOptions.writeIndicesOptions(out);
84+
if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR)
85+
|| out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
86+
|| out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
87+
|| out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
88+
out.writeBoolean(requireError);
89+
}
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError);
95+
}
96+
97+
@Override
98+
public boolean equals(Object obj) {
99+
if (obj == null) {
100+
return false;
101+
}
102+
if (obj.getClass() != getClass()) {
103+
return false;
104+
}
105+
RetryActionRequest other = (RetryActionRequest) obj;
106+
return Objects.deepEquals(indices, other.indices)
107+
&& Objects.equals(indicesOptions, other.indicesOptions)
108+
&& requireError == other.requireError;
109+
}
110+
111+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.search.TransportSearchScrollAction;
3131
import org.elasticsearch.index.reindex.ReindexAction;
3232
import org.elasticsearch.xpack.core.XPackPlugin;
33+
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
3334
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
3435
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
3536

@@ -222,7 +223,8 @@ public class InternalUsers {
222223
TransportBulkAction.NAME,
223224
TransportIndexAction.NAME,
224225
TransportSearchScrollAction.TYPE.name(),
225-
ModifyDataStreamsAction.NAME
226+
ModifyDataStreamsAction.NAME,
227+
ILMActions.RETRY.name()
226228
)
227229
.allowRestrictedIndices(false)
228230
.build() },

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.rest.RestRequest;
1616
import org.elasticsearch.rest.action.RestToXContentListener;
1717
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
18+
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
1819

1920
import java.util.List;
2021

@@ -37,7 +38,7 @@ public String getName() {
3738
@Override
3839
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
3940
final var indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
40-
final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
41+
final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
4142
request.indices(indices);
4243
request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
4344
return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel));

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java

Lines changed: 29 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.action.ActionRequestValidationException;
14-
import org.elasticsearch.action.IndicesRequest;
1513
import org.elasticsearch.action.support.ActionFilters;
16-
import org.elasticsearch.action.support.IndicesOptions;
17-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1814
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1915
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2016
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -25,24 +21,18 @@
2521
import org.elasticsearch.cluster.metadata.IndexMetadata;
2622
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
2723
import org.elasticsearch.cluster.service.ClusterService;
28-
import org.elasticsearch.common.io.stream.StreamInput;
29-
import org.elasticsearch.common.io.stream.StreamOutput;
3024
import org.elasticsearch.common.util.concurrent.EsExecutors;
3125
import org.elasticsearch.core.SuppressForbidden;
32-
import org.elasticsearch.core.TimeValue;
3326
import org.elasticsearch.injection.guice.Inject;
3427
import org.elasticsearch.tasks.Task;
3528
import org.elasticsearch.threadpool.ThreadPool;
3629
import org.elasticsearch.transport.TransportService;
3730
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
3831
import org.elasticsearch.xpack.core.ilm.action.ILMActions;
32+
import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
3933
import org.elasticsearch.xpack.ilm.IndexLifecycleService;
4034

41-
import java.io.IOException;
42-
import java.util.Arrays;
43-
import java.util.Objects;
44-
45-
public class TransportRetryAction extends TransportMasterNodeAction<TransportRetryAction.Request, AcknowledgedResponse> {
35+
public class TransportRetryAction extends TransportMasterNodeAction<RetryActionRequest, AcknowledgedResponse> {
4636

4737
private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);
4838

@@ -62,15 +52,20 @@ public TransportRetryAction(
6252
clusterService,
6353
threadPool,
6454
actionFilters,
65-
Request::new,
55+
RetryActionRequest::new,
6656
AcknowledgedResponse::readFrom,
6757
EsExecutors.DIRECT_EXECUTOR_SERVICE
6858
);
6959
this.indexLifecycleService = indexLifecycleService;
7060
}
7161

7262
@Override
73-
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
63+
protected void masterOperation(Task task, RetryActionRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
64+
if (request.requireError() == false) {
65+
maybeRunAsyncAction(state, request.indices());
66+
listener.onResponse(AcknowledgedResponse.TRUE);
67+
return;
68+
}
7469
submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) {
7570
@Override
7671
public ClusterState execute(ClusterState currentState) {
@@ -79,101 +74,37 @@ public ClusterState execute(ClusterState currentState) {
7974

8075
@Override
8176
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
82-
for (String index : request.indices()) {
83-
IndexMetadata idxMeta = newState.metadata().getProject().index(index);
84-
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
85-
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
86-
if (idxMeta == null) {
87-
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
88-
logger.debug(
89-
"index ["
90-
+ index
91-
+ "] has been deleted after moving to step ["
92-
+ lifecycleState.step()
93-
+ "], skipping async action check"
94-
);
95-
return;
96-
}
97-
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
98-
}
77+
maybeRunAsyncAction(newState, request.indices());
9978
}
10079
});
10180
}
10281

82+
private void maybeRunAsyncAction(ClusterState state, String[] indices) {
83+
for (String index : indices) {
84+
IndexMetadata idxMeta = state.metadata().getProject().index(index);
85+
if (idxMeta == null) {
86+
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
87+
logger.debug(
88+
"index ["
89+
+ index
90+
+ "] has been deleted, skipping async action check"
91+
);
92+
return;
93+
}
94+
LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
95+
StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
96+
indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep);
97+
}
98+
}
99+
103100
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
104101
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
105102
clusterService.submitUnbatchedStateUpdateTask(source, task);
106103
}
107104

108105
@Override
109-
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
106+
protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) {
110107
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
111108
}
112109

113-
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
114-
private String[] indices;
115-
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
116-
117-
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
118-
super(masterNodeTimeout, ackTimeout);
119-
this.indices = indices;
120-
}
121-
122-
public Request(StreamInput in) throws IOException {
123-
super(in);
124-
this.indices = in.readStringArray();
125-
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
126-
}
127-
128-
@Override
129-
public Request indices(String... indices) {
130-
this.indices = indices;
131-
return this;
132-
}
133-
134-
@Override
135-
public String[] indices() {
136-
return indices;
137-
}
138-
139-
@Override
140-
public IndicesOptions indicesOptions() {
141-
return indicesOptions;
142-
}
143-
144-
public Request indicesOptions(IndicesOptions indicesOptions) {
145-
this.indicesOptions = indicesOptions;
146-
return this;
147-
}
148-
149-
@Override
150-
public ActionRequestValidationException validate() {
151-
return null;
152-
}
153-
154-
@Override
155-
public void writeTo(StreamOutput out) throws IOException {
156-
super.writeTo(out);
157-
out.writeStringArray(indices);
158-
indicesOptions.writeIndicesOptions(out);
159-
}
160-
161-
@Override
162-
public int hashCode() {
163-
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
164-
}
165-
166-
@Override
167-
public boolean equals(Object obj) {
168-
if (obj == null) {
169-
return false;
170-
}
171-
if (obj.getClass() != getClass()) {
172-
return false;
173-
}
174-
Request other = (Request) obj;
175-
return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions);
176-
}
177-
178-
}
179110
}

0 commit comments

Comments
 (0)