Skip to content

Commit 04d01fe

Browse files
authored
[8.x] Adding get migration reindex status (#118267) (#118361)
* Adding get migration reindex status (#118267) This adds a new transport action to get the status of a migration reindex (started via the API at #118109), and a new rest action to use it. The rest action accepts the data stream or index name, and returns the status. For example if the reindex task exists for data stream `my-data-stream`: ``` GET /_migration/reindex/my-data-stream/_status?pretty ``` returns ``` { "start_time" : 1733519098570, "complete" : true, "total_indices" : 1, "total_indices_requiring_upgrade" : 0, "successes" : 0, "in_progress" : 0, "pending" : 0, "errors" : [ ] } ``` If a reindex task does not exist: ``` GET _migration/reindex/my-data-stream/_status?pretty ``` Then a 404 is returned: ``` { "error" : { "root_cause" : [ { "type" : "resource_not_found_exception", "reason" : "No migration reindex status found for [my-data-stream]" } ], "type" : "resource_not_found_exception", "reason" : "No migration reindex status found for [my-data-stream]" }, "status" : 404 } ``` * adding migration reindex actions to OperatorPrivilegesIT
1 parent 18974f5 commit 04d01fe

File tree

13 files changed

+595
-9
lines changed

13 files changed

+595
-9
lines changed

docs/changelog/118267.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118267
2+
summary: Adding get migration reindex status
3+
area: Data streams
4+
type: enhancement
5+
issues: []
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"migrate.get_reindex_status":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html",
5+
"description":"This API returns the status of a migration reindex attempt for a data stream or index"
6+
},
7+
"stability":"experimental",
8+
"visibility":"private",
9+
"headers":{
10+
"accept": [ "application/json"],
11+
"content_type": ["application/json"]
12+
},
13+
"url":{
14+
"paths":[
15+
{
16+
"path":"/_migration/reindex/{index}/_status",
17+
"methods":[
18+
"GET"
19+
],
20+
"parts":{
21+
"index":{
22+
"type":"string",
23+
"description":"The index or data stream name"
24+
}
25+
}
26+
}
27+
]
28+
}
29+
}
30+
}
31+

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.migrate.MigratePlugin;
3030
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest;
3131
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse;
32+
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatus;
3233
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask;
3334

3435
import java.util.Collection;
@@ -68,7 +69,7 @@ public void testAlreadyUpToDateDataStream() throws Exception {
6869
ReindexDataStreamAction.Mode.UPGRADE,
6970
dataStreamName
7071
);
71-
createDataStream(dataStreamName);
72+
final int backingIndexCount = createDataStream(dataStreamName);
7273
ReindexDataStreamResponse response = client().execute(
7374
new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME),
7475
reindexDataStreamRequest
@@ -78,7 +79,6 @@ public void testAlreadyUpToDateDataStream() throws Exception {
7879
AtomicReference<ReindexDataStreamTask> runningTask = new AtomicReference<>();
7980
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
8081
TaskManager taskManager = transportService.getTaskManager();
81-
Map<Long, CancellableTask> tasksMap = taskManager.getCancellableTasks();
8282
Optional<Map.Entry<Long, CancellableTask>> optionalTask = taskManager.getCancellableTasks()
8383
.entrySet()
8484
.stream()
@@ -99,9 +99,24 @@ public void testAlreadyUpToDateDataStream() throws Exception {
9999
assertThat(task.getStatus().pending(), equalTo(0));
100100
assertThat(task.getStatus().inProgress(), equalTo(0));
101101
assertThat(task.getStatus().errors().size(), equalTo(0));
102+
103+
assertBusy(() -> {
104+
GetMigrationReindexStatusAction.Response statusResponse = client().execute(
105+
new ActionType<GetMigrationReindexStatusAction.Response>(GetMigrationReindexStatusAction.NAME),
106+
new GetMigrationReindexStatusAction.Request(dataStreamName)
107+
).actionGet();
108+
ReindexDataStreamStatus status = (ReindexDataStreamStatus) statusResponse.getTask().getTask().status();
109+
assertThat(status.complete(), equalTo(true));
110+
assertThat(status.errors(), equalTo(List.of()));
111+
assertThat(status.exception(), equalTo(null));
112+
assertThat(status.pending(), equalTo(0));
113+
assertThat(status.inProgress(), equalTo(0));
114+
assertThat(status.totalIndices(), equalTo(backingIndexCount));
115+
assertThat(status.totalIndicesToBeUpgraded(), equalTo(0));
116+
});
102117
}
103118

104-
private void createDataStream(String dataStreamName) {
119+
private int createDataStream(String dataStreamName) {
105120
final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest =
106121
new TransportPutComposableIndexTemplateAction.Request("my-template");
107122
putComposableTemplateRequest.indexTemplate(
@@ -125,10 +140,13 @@ private void createDataStream(String dataStreamName) {
125140
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
126141
);
127142
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
128-
indexDocs(dataStreamName);
129-
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
130-
indexDocs(dataStreamName);
131-
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
143+
int backingIndices = 1;
144+
for (int i = 0; i < randomIntBetween(2, 5); i++) {
145+
indexDocs(dataStreamName);
146+
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
147+
backingIndices++;
148+
}
149+
return backingIndices;
132150
}
133151

134152
private void indexDocs(String dataStreamName) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@
3232
import org.elasticsearch.threadpool.ThreadPool;
3333
import org.elasticsearch.xcontent.NamedXContentRegistry;
3434
import org.elasticsearch.xcontent.ParseField;
35+
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
36+
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction;
3537
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
3638
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction;
39+
import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction;
3740
import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction;
3841
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor;
3942
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskState;
@@ -65,6 +68,7 @@ public List<RestHandler> getRestHandlers(
6568
List<RestHandler> handlers = new ArrayList<>();
6669
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
6770
handlers.add(new RestMigrationReindexAction());
71+
handlers.add(new RestGetMigrationReindexStatusAction());
6872
}
6973
return handlers;
7074
}
@@ -74,6 +78,7 @@ public List<RestHandler> getRestHandlers(
7478
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
7579
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
7680
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
81+
actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class));
7782
}
7883
return actions;
7984
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.migrate.action;
9+
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.IndicesRequest;
15+
import org.elasticsearch.action.support.IndicesOptions;
16+
import org.elasticsearch.common.Strings;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.tasks.TaskResult;
21+
import org.elasticsearch.xcontent.ToXContentObject;
22+
import org.elasticsearch.xcontent.XContentBuilder;
23+
24+
import java.io.IOException;
25+
import java.util.Objects;
26+
27+
import static java.util.Objects.requireNonNull;
28+
29+
public class GetMigrationReindexStatusAction extends ActionType<GetMigrationReindexStatusAction.Response> {
30+
31+
public static final GetMigrationReindexStatusAction INSTANCE = new GetMigrationReindexStatusAction();
32+
public static final String NAME = "indices:admin/migration/reindex_status";
33+
34+
public GetMigrationReindexStatusAction() {
35+
super(NAME);
36+
}
37+
38+
public static class Response extends ActionResponse implements ToXContentObject {
39+
private final TaskResult task;
40+
41+
public Response(TaskResult task) {
42+
this.task = requireNonNull(task, "task is required");
43+
}
44+
45+
public Response(StreamInput in) throws IOException {
46+
super(in);
47+
task = in.readOptionalWriteable(TaskResult::new);
48+
}
49+
50+
@Override
51+
public void writeTo(StreamOutput out) throws IOException {
52+
out.writeOptionalWriteable(task);
53+
}
54+
55+
/**
56+
* Get the actual result of the fetch.
57+
*/
58+
public TaskResult getTask() {
59+
return task;
60+
}
61+
62+
@Override
63+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
64+
Task.Status status = task.getTask().status();
65+
if (status != null) {
66+
task.getTask().status().toXContent(builder, params);
67+
}
68+
return builder;
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return Objects.hashCode(task);
74+
}
75+
76+
@Override
77+
public boolean equals(Object other) {
78+
return other instanceof Response && task.equals(((Response) other).task);
79+
}
80+
81+
@Override
82+
public String toString() {
83+
String toString = Strings.toString(this);
84+
return toString.isEmpty() ? "unavailable" : toString;
85+
}
86+
87+
}
88+
89+
public static class Request extends ActionRequest implements IndicesRequest {
90+
private final String index;
91+
92+
public Request(String index) {
93+
super();
94+
this.index = index;
95+
}
96+
97+
public Request(StreamInput in) throws IOException {
98+
super(in);
99+
this.index = in.readString();
100+
}
101+
102+
@Override
103+
public void writeTo(StreamOutput out) throws IOException {
104+
super.writeTo(out);
105+
out.writeString(index);
106+
}
107+
108+
@Override
109+
public ActionRequestValidationException validate() {
110+
return null;
111+
}
112+
113+
public String getIndex() {
114+
return index;
115+
}
116+
117+
@Override
118+
public int hashCode() {
119+
return Objects.hashCode(index);
120+
}
121+
122+
@Override
123+
public boolean equals(Object other) {
124+
return other instanceof Request && index.equals(((Request) other).index);
125+
}
126+
127+
public Request nodeRequest(String thisNodeId, long thisTaskId) {
128+
Request copy = new Request(index);
129+
copy.setParentTask(thisNodeId, thisTaskId);
130+
return copy;
131+
}
132+
133+
@Override
134+
public String[] indices() {
135+
return new String[] { index };
136+
}
137+
138+
@Override
139+
public IndicesOptions indicesOptions() {
140+
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)