Skip to content

Commit f72cf56

Browse files
authored
Adding a migration reindex cancel API (elastic#118291) (elastic#118679)
This introduces the migration reindex cancel API, which cancels a migration reindex task for a given data stream name that was started with elastic#118109. For example: ``` POST localhost:9200/_migration/reindex/my-data-stream/_cancel?pretty ``` returns ``` { "acknowledged" : true } ``` This cancels the task, and cancels any ongoing reindexing of backing indices, but does not do any cleanup.
1 parent db0d6ce commit f72cf56

File tree

13 files changed

+371
-45
lines changed

13 files changed

+371
-45
lines changed

docs/changelog/118291.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118291
2+
summary: Adding a migration reindex cancel API
3+
area: Data streams
4+
type: enhancement
5+
issues: []
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"migrate.cancel_reindex":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html",
5+
"description":"This API returns the status of a migration reindex attempt for a data stream or index"
6+
},
7+
"stability":"experimental",
8+
"visibility":"private",
9+
"headers":{
10+
"accept": [ "application/json"],
11+
"content_type": ["application/json"]
12+
},
13+
"url":{
14+
"paths":[
15+
{
16+
"path":"/_migration/reindex/{index}/_cancel",
17+
"methods":[
18+
"POST"
19+
],
20+
"parts":{
21+
"index":{
22+
"type":"string",
23+
"description":"The index or data stream name"
24+
}
25+
}
26+
}
27+
]
28+
}
29+
}
30+
}

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,23 @@ public void testAlreadyUpToDateDataStream() throws Exception {
117117
assertThat(status.totalIndices(), equalTo(backingIndexCount));
118118
assertThat(status.totalIndicesToBeUpgraded(), equalTo(0));
119119
});
120+
AcknowledgedResponse cancelResponse = client().execute(
121+
CancelReindexDataStreamAction.INSTANCE,
122+
new CancelReindexDataStreamAction.Request(dataStreamName)
123+
).actionGet();
124+
assertNotNull(cancelResponse);
125+
assertThrows(
126+
ResourceNotFoundException.class,
127+
() -> client().execute(CancelReindexDataStreamAction.INSTANCE, new CancelReindexDataStreamAction.Request(dataStreamName))
128+
.actionGet()
129+
);
130+
assertThrows(
131+
ResourceNotFoundException.class,
132+
() -> client().execute(
133+
new ActionType<GetMigrationReindexStatusAction.Response>(GetMigrationReindexStatusAction.NAME),
134+
new GetMigrationReindexStatusAction.Request(dataStreamName)
135+
).actionGet()
136+
);
120137
}
121138

122139
private int createDataStream(String dataStreamName) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@
3232
import org.elasticsearch.threadpool.ThreadPool;
3333
import org.elasticsearch.xcontent.NamedXContentRegistry;
3434
import org.elasticsearch.xcontent.ParseField;
35+
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
36+
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction;
3537
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
3638
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction;
3739
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
3840
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction;
41+
import org.elasticsearch.xpack.migrate.rest.RestCancelReindexDataStreamAction;
3942
import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction;
4043
import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction;
4144
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor;
@@ -69,6 +72,7 @@ public List<RestHandler> getRestHandlers(
6972
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
7073
handlers.add(new RestMigrationReindexAction());
7174
handlers.add(new RestGetMigrationReindexStatusAction());
75+
handlers.add(new RestCancelReindexDataStreamAction());
7276
}
7377
return handlers;
7478
}
@@ -79,6 +83,7 @@ public List<RestHandler> getRestHandlers(
7983
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
8084
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
8185
actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class));
86+
actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class));
8287
}
8388
return actions;
8489
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.migrate.action;
9+
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.IndicesRequest;
14+
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
19+
import java.io.IOException;
20+
import java.util.Objects;
21+
22+
public class CancelReindexDataStreamAction extends ActionType<AcknowledgedResponse> {
23+
24+
public static final CancelReindexDataStreamAction INSTANCE = new CancelReindexDataStreamAction();
25+
public static final String NAME = "indices:admin/data_stream/reindex_cancel";
26+
27+
public CancelReindexDataStreamAction() {
28+
super(NAME);
29+
}
30+
31+
public static class Request extends ActionRequest implements IndicesRequest {
32+
private final String index;
33+
34+
public Request(String index) {
35+
super();
36+
this.index = index;
37+
}
38+
39+
public Request(StreamInput in) throws IOException {
40+
super(in);
41+
this.index = in.readString();
42+
}
43+
44+
@Override
45+
public void writeTo(StreamOutput out) throws IOException {
46+
super.writeTo(out);
47+
out.writeString(index);
48+
}
49+
50+
@Override
51+
public ActionRequestValidationException validate() {
52+
return null;
53+
}
54+
55+
@Override
56+
public boolean getShouldStoreResult() {
57+
return true;
58+
}
59+
60+
public String getIndex() {
61+
return index;
62+
}
63+
64+
@Override
65+
public int hashCode() {
66+
return Objects.hashCode(index);
67+
}
68+
69+
@Override
70+
public boolean equals(Object other) {
71+
return other instanceof Request && index.equals(((Request) other).index);
72+
}
73+
74+
public Request nodeRequest(String thisNodeId, long thisTaskId) {
75+
Request copy = new Request(index);
76+
copy.setParentTask(thisNodeId, thisTaskId);
77+
return copy;
78+
}
79+
80+
@Override
81+
public String[] indices() {
82+
return new String[] { index };
83+
}
84+
85+
@Override
86+
public IndicesOptions indicesOptions() {
87+
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
88+
}
89+
}
90+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.migrate.action;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.HandledTransportAction;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.common.util.concurrent.EsExecutors;
15+
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.injection.guice.Inject;
17+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
18+
import org.elasticsearch.persistent.PersistentTasksService;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.transport.TransportService;
21+
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction.Request;
22+
23+
public class CancelReindexDataStreamTransportAction extends HandledTransportAction<Request, AcknowledgedResponse> {
24+
private final PersistentTasksService persistentTasksService;
25+
26+
@Inject
27+
public CancelReindexDataStreamTransportAction(
28+
TransportService transportService,
29+
ActionFilters actionFilters,
30+
PersistentTasksService persistentTasksService
31+
) {
32+
super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
33+
this.persistentTasksService = persistentTasksService;
34+
}
35+
36+
@Override
37+
protected void doExecute(Task task, Request request, ActionListener<AcknowledgedResponse> listener) {
38+
String index = request.getIndex();
39+
String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index;
40+
/*
41+
* This removes the persistent task from the cluster state and results in the running task being cancelled (but not removed from
42+
* the task manager). The running task is removed from the task manager in ReindexDataStreamTask::onCancelled, which is called as
43+
* as result of this.
44+
*/
45+
persistentTasksService.sendRemoveRequest(persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() {
46+
@Override
47+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
48+
listener.onResponse(AcknowledgedResponse.TRUE);
49+
}
50+
51+
@Override
52+
public void onFailure(Exception e) {
53+
listener.onFailure(e);
54+
}
55+
});
56+
}
57+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.migrate.rest;
9+
10+
import org.elasticsearch.client.internal.node.NodeClient;
11+
import org.elasticsearch.rest.BaseRestHandler;
12+
import org.elasticsearch.rest.RestRequest;
13+
import org.elasticsearch.rest.action.RestToXContentListener;
14+
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
19+
import static org.elasticsearch.rest.RestRequest.Method.POST;
20+
21+
public class RestCancelReindexDataStreamAction extends BaseRestHandler {
22+
23+
@Override
24+
public String getName() {
25+
return "cancel_reindex_data_stream_action";
26+
}
27+
28+
@Override
29+
public List<Route> routes() {
30+
return List.of(new Route(POST, "/_migration/reindex/{index}/_cancel"));
31+
}
32+
33+
@Override
34+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
35+
String index = request.param("index");
36+
CancelReindexDataStreamAction.Request cancelTaskRequest = new CancelReindexDataStreamAction.Request(index);
37+
return channel -> client.execute(CancelReindexDataStreamAction.INSTANCE, cancelTaskRequest, new RestToXContentListener<>(channel));
38+
}
39+
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
9191
}
9292

9393
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
94-
persistentTask.allReindexesCompleted();
95-
threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic());
94+
persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask));
9695
}
9796

9897
private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
99-
persistentTask.taskFailed(e);
100-
threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic());
98+
persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e);
10199
}
102100

103101
private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.elasticsearch.common.util.concurrent.RunOnce;
11+
import org.elasticsearch.core.TimeValue;
1012
import org.elasticsearch.core.Tuple;
1113
import org.elasticsearch.persistent.AllocatedPersistentTask;
1214
import org.elasticsearch.tasks.TaskId;
15+
import org.elasticsearch.threadpool.ThreadPool;
1316

1417
import java.util.ArrayList;
1518
import java.util.List;
@@ -21,12 +24,14 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
2124
private final long persistentTaskStartTime;
2225
private final int totalIndices;
2326
private final int totalIndicesToBeUpgraded;
24-
private boolean complete = false;
25-
private Exception exception;
26-
private AtomicInteger inProgress = new AtomicInteger(0);
27-
private AtomicInteger pending = new AtomicInteger();
28-
private List<Tuple<String, Exception>> errors = new ArrayList<>();
27+
private volatile boolean complete = false;
28+
private volatile Exception exception;
29+
private final AtomicInteger inProgress = new AtomicInteger(0);
30+
private final AtomicInteger pending = new AtomicInteger();
31+
private final List<Tuple<String, Exception>> errors = new ArrayList<>();
32+
private final RunOnce completeTask;
2933

34+
@SuppressWarnings("this-escape")
3035
public ReindexDataStreamTask(
3136
long persistentTaskStartTime,
3237
int totalIndices,
@@ -42,6 +47,13 @@ public ReindexDataStreamTask(
4247
this.persistentTaskStartTime = persistentTaskStartTime;
4348
this.totalIndices = totalIndices;
4449
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
50+
this.completeTask = new RunOnce(() -> {
51+
if (exception == null) {
52+
markAsCompleted();
53+
} else {
54+
markAsFailed(exception);
55+
}
56+
});
4557
}
4658

4759
@Override
@@ -58,13 +70,18 @@ public ReindexDataStreamStatus getStatus() {
5870
);
5971
}
6072

61-
public void allReindexesCompleted() {
73+
public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
6274
this.complete = true;
75+
if (isCancelled()) {
76+
completeTask.run();
77+
} else {
78+
threadPool.schedule(completeTask, timeToLive, threadPool.generic());
79+
}
6380
}
6481

65-
public void taskFailed(Exception e) {
66-
this.complete = true;
82+
public void taskFailed(ThreadPool threadPool, TimeValue timeToLive, Exception e) {
6783
this.exception = e;
84+
allReindexesCompleted(threadPool, timeToLive);
6885
}
6986

7087
public void reindexSucceeded() {
@@ -84,4 +101,16 @@ public void incrementInProgressIndicesCount() {
84101
public void setPendingIndicesCount(int size) {
85102
pending.set(size);
86103
}
104+
105+
@Override
106+
public void onCancelled() {
107+
/*
108+
* If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
109+
* immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
110+
* allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
111+
*/
112+
if (complete) {
113+
completeTask.run();
114+
}
115+
}
87116
}

0 commit comments

Comments
 (0)