Skip to content

Commit 8107cc9

Browse files
authored
Adding reindex data stream rest action (#118109)
* Adding a _migration/reindex endpoint * Adding rest api spec and test * Adding a feature flag for reindex data streams * updating json spec * fixing a typo * Changing mode to an enum * Moving ParseFields into public static finals * Commenting out test that leaves task running, until we add a cancel API * Removing persistent task id from output * replacing a string with a variable
1 parent c09c4f4 commit 8107cc9

File tree

8 files changed

+387
-37
lines changed

8 files changed

+387
-37
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"migrate.reindex":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html",
5+
"description":"This API reindexes all legacy backing indices for a data stream. It does this in a persistent task. The persistent task id is returned immediately, and the reindexing work is completed in that task"
6+
},
7+
"stability":"experimental",
8+
"visibility":"private",
9+
"headers":{
10+
"accept": [ "application/json"],
11+
"content_type": ["application/json"]
12+
},
13+
"url":{
14+
"paths":[
15+
{
16+
"path":"/_migration/reindex",
17+
"methods":[
18+
"POST"
19+
]
20+
}
21+
]
22+
},
23+
"body":{
24+
"description":"The body contains the fields `mode` and `source.index, where the only mode currently supported is `upgrade`, and the `source.index` must be a data stream name",
25+
"required":true
26+
}
27+
}
28+
}
29+

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
5151

5252
public void testNonExistentDataStream() {
5353
String nonExistentDataStreamName = randomAlphaOfLength(50);
54-
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(nonExistentDataStreamName);
54+
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(
55+
ReindexDataStreamAction.Mode.UPGRADE,
56+
nonExistentDataStreamName
57+
);
5558
assertThrows(
5659
ResourceNotFoundException.class,
5760
() -> client().execute(new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME), reindexDataStreamRequest)
@@ -61,7 +64,10 @@ public void testNonExistentDataStream() {
6164

6265
public void testAlreadyUpToDateDataStream() throws Exception {
6366
String dataStreamName = randomAlphaOfLength(50).toLowerCase(Locale.ROOT);
64-
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(dataStreamName);
67+
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(
68+
ReindexDataStreamAction.Mode.UPGRADE,
69+
dataStreamName
70+
);
6571
createDataStream(dataStreamName);
6672
ReindexDataStreamResponse response = client().execute(
6773
new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME),

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,30 @@
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.client.internal.Client;
1313
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
14+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1415
import org.elasticsearch.cluster.service.ClusterService;
1516
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
17+
import org.elasticsearch.common.settings.ClusterSettings;
18+
import org.elasticsearch.common.settings.IndexScopedSettings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.settings.SettingsFilter;
1621
import org.elasticsearch.common.settings.SettingsModule;
22+
import org.elasticsearch.features.NodeFeature;
1723
import org.elasticsearch.persistent.PersistentTaskParams;
1824
import org.elasticsearch.persistent.PersistentTaskState;
1925
import org.elasticsearch.persistent.PersistentTasksExecutor;
2026
import org.elasticsearch.plugins.ActionPlugin;
2127
import org.elasticsearch.plugins.PersistentTaskPlugin;
2228
import org.elasticsearch.plugins.Plugin;
29+
import org.elasticsearch.rest.RestController;
30+
import org.elasticsearch.rest.RestHandler;
2331
import org.elasticsearch.tasks.Task;
2432
import org.elasticsearch.threadpool.ThreadPool;
2533
import org.elasticsearch.xcontent.NamedXContentRegistry;
2634
import org.elasticsearch.xcontent.ParseField;
2735
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
2836
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction;
37+
import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction;
2938
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor;
3039
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskState;
3140
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatus;
@@ -34,47 +43,80 @@
3443

3544
import java.util.ArrayList;
3645
import java.util.List;
46+
import java.util.function.Predicate;
47+
import java.util.function.Supplier;
48+
49+
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
3750

3851
public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
3952

53+
@Override
54+
public List<RestHandler> getRestHandlers(
55+
Settings unused,
56+
NamedWriteableRegistry namedWriteableRegistry,
57+
RestController restController,
58+
ClusterSettings clusterSettings,
59+
IndexScopedSettings indexScopedSettings,
60+
SettingsFilter settingsFilter,
61+
IndexNameExpressionResolver indexNameExpressionResolver,
62+
Supplier<DiscoveryNodes> nodesInCluster,
63+
Predicate<NodeFeature> clusterSupportsFeature
64+
) {
65+
List<RestHandler> handlers = new ArrayList<>();
66+
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
67+
handlers.add(new RestMigrationReindexAction());
68+
}
69+
return handlers;
70+
}
71+
4072
@Override
4173
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
4274
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
43-
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
75+
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
76+
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
77+
}
4478
return actions;
4579
}
4680

4781
@Override
4882
public List<NamedXContentRegistry.Entry> getNamedXContent() {
49-
return List.of(
50-
new NamedXContentRegistry.Entry(
51-
PersistentTaskState.class,
52-
new ParseField(ReindexDataStreamPersistentTaskState.NAME),
53-
ReindexDataStreamPersistentTaskState::fromXContent
54-
),
55-
new NamedXContentRegistry.Entry(
56-
PersistentTaskParams.class,
57-
new ParseField(ReindexDataStreamTaskParams.NAME),
58-
ReindexDataStreamTaskParams::fromXContent
59-
)
60-
);
83+
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
84+
return List.of(
85+
new NamedXContentRegistry.Entry(
86+
PersistentTaskState.class,
87+
new ParseField(ReindexDataStreamPersistentTaskState.NAME),
88+
ReindexDataStreamPersistentTaskState::fromXContent
89+
),
90+
new NamedXContentRegistry.Entry(
91+
PersistentTaskParams.class,
92+
new ParseField(ReindexDataStreamTaskParams.NAME),
93+
ReindexDataStreamTaskParams::fromXContent
94+
)
95+
);
96+
} else {
97+
return List.of();
98+
}
6199
}
62100

63101
@Override
64102
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
65-
return List.of(
66-
new NamedWriteableRegistry.Entry(
67-
PersistentTaskState.class,
68-
ReindexDataStreamPersistentTaskState.NAME,
69-
ReindexDataStreamPersistentTaskState::new
70-
),
71-
new NamedWriteableRegistry.Entry(
72-
PersistentTaskParams.class,
73-
ReindexDataStreamTaskParams.NAME,
74-
ReindexDataStreamTaskParams::new
75-
),
76-
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new)
77-
);
103+
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
104+
return List.of(
105+
new NamedWriteableRegistry.Entry(
106+
PersistentTaskState.class,
107+
ReindexDataStreamPersistentTaskState.NAME,
108+
ReindexDataStreamPersistentTaskState::new
109+
),
110+
new NamedWriteableRegistry.Entry(
111+
PersistentTaskParams.class,
112+
ReindexDataStreamTaskParams.NAME,
113+
ReindexDataStreamTaskParams::new
114+
),
115+
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new)
116+
);
117+
} else {
118+
return List.of();
119+
}
78120
}
79121

80122
@Override
@@ -85,6 +127,12 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
85127
SettingsModule settingsModule,
86128
IndexNameExpressionResolver expressionResolver
87129
) {
88-
return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool));
130+
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
131+
return List.of(
132+
new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)
133+
);
134+
} else {
135+
return List.of();
136+
}
89137
}
90138
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,41 @@
1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.action.ActionResponse;
1313
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.IndicesRequest;
15+
import org.elasticsearch.action.support.IndicesOptions;
1416
import org.elasticsearch.common.io.stream.StreamInput;
1517
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.common.util.FeatureFlag;
19+
import org.elasticsearch.features.NodeFeature;
20+
import org.elasticsearch.xcontent.ConstructingObjectParser;
21+
import org.elasticsearch.xcontent.ParseField;
22+
import org.elasticsearch.xcontent.ToXContent;
1623
import org.elasticsearch.xcontent.ToXContentObject;
1724
import org.elasticsearch.xcontent.XContentBuilder;
25+
import org.elasticsearch.xcontent.XContentParser;
1826

1927
import java.io.IOException;
28+
import java.util.Locale;
2029
import java.util.Objects;
30+
import java.util.function.Predicate;
2131

2232
public class ReindexDataStreamAction extends ActionType<ReindexDataStreamAction.ReindexDataStreamResponse> {
33+
public static final FeatureFlag REINDEX_DATA_STREAM_FEATURE_FLAG = new FeatureFlag("reindex_data_stream");
2334

2435
public static final ReindexDataStreamAction INSTANCE = new ReindexDataStreamAction();
2536
public static final String NAME = "indices:admin/data_stream/reindex";
37+
public static final ParseField MODE_FIELD = new ParseField("mode");
38+
public static final ParseField SOURCE_FIELD = new ParseField("source");
39+
public static final ParseField INDEX_FIELD = new ParseField("index");
2640

2741
public ReindexDataStreamAction() {
2842
super(NAME);
2943
}
3044

45+
public enum Mode {
46+
UPGRADE
47+
}
48+
3149
public static class ReindexDataStreamResponse extends ActionResponse implements ToXContentObject {
3250
private final String taskId;
3351

@@ -49,7 +67,7 @@ public void writeTo(StreamOutput out) throws IOException {
4967
@Override
5068
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5169
builder.startObject();
52-
builder.field("task", getTaskId());
70+
builder.field("acknowledged", true);
5371
builder.endObject();
5472
return builder;
5573
}
@@ -70,22 +88,52 @@ public boolean equals(Object other) {
7088

7189
}
7290

73-
public static class ReindexDataStreamRequest extends ActionRequest {
91+
public static class ReindexDataStreamRequest extends ActionRequest implements IndicesRequest, ToXContent {
92+
private final Mode mode;
7493
private final String sourceDataStream;
7594

76-
public ReindexDataStreamRequest(String sourceDataStream) {
77-
super();
95+
public ReindexDataStreamRequest(Mode mode, String sourceDataStream) {
96+
this.mode = mode;
7897
this.sourceDataStream = sourceDataStream;
7998
}
8099

81100
public ReindexDataStreamRequest(StreamInput in) throws IOException {
82101
super(in);
102+
this.mode = Mode.valueOf(in.readString());
83103
this.sourceDataStream = in.readString();
84104
}
85105

106+
private static final ConstructingObjectParser<ReindexDataStreamRequest, Predicate<NodeFeature>> PARSER =
107+
new ConstructingObjectParser<>("migration_reindex", objects -> {
108+
Mode mode = Mode.valueOf(((String) objects[0]).toUpperCase(Locale.ROOT));
109+
String source = (String) objects[1];
110+
return new ReindexDataStreamRequest(mode, source);
111+
});
112+
113+
private static final ConstructingObjectParser<String, Void> SOURCE_PARSER = new ConstructingObjectParser<>(
114+
SOURCE_FIELD.getPreferredName(),
115+
false,
116+
(a, id) -> (String) a[0]
117+
);
118+
119+
static {
120+
SOURCE_PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_FIELD);
121+
PARSER.declareString(ConstructingObjectParser.constructorArg(), MODE_FIELD);
122+
PARSER.declareObject(
123+
ConstructingObjectParser.constructorArg(),
124+
(parser, id) -> SOURCE_PARSER.apply(parser, null),
125+
SOURCE_FIELD
126+
);
127+
}
128+
129+
public static ReindexDataStreamRequest fromXContent(XContentParser parser) {
130+
return PARSER.apply(parser, null);
131+
}
132+
86133
@Override
87134
public void writeTo(StreamOutput out) throws IOException {
88135
super.writeTo(out);
136+
out.writeString(mode.name());
89137
out.writeString(sourceDataStream);
90138
}
91139

@@ -103,15 +151,42 @@ public String getSourceDataStream() {
103151
return sourceDataStream;
104152
}
105153

154+
public Mode getMode() {
155+
return mode;
156+
}
157+
106158
@Override
107159
public int hashCode() {
108-
return Objects.hashCode(sourceDataStream);
160+
return Objects.hash(mode, sourceDataStream);
109161
}
110162

111163
@Override
112164
public boolean equals(Object other) {
113-
return other instanceof ReindexDataStreamRequest
114-
&& sourceDataStream.equals(((ReindexDataStreamRequest) other).sourceDataStream);
165+
return other instanceof ReindexDataStreamRequest otherRequest
166+
&& mode.equals(otherRequest.mode)
167+
&& sourceDataStream.equals(otherRequest.sourceDataStream);
168+
}
169+
170+
@Override
171+
public String[] indices() {
172+
return new String[] { sourceDataStream };
173+
}
174+
175+
@Override
176+
public IndicesOptions indicesOptions() {
177+
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
178+
}
179+
180+
/*
181+
* This only exists for the sake of testing the xcontent parser
182+
*/
183+
@Override
184+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
185+
builder.field(MODE_FIELD.getPreferredName(), mode);
186+
builder.startObject(SOURCE_FIELD.getPreferredName());
187+
builder.field(INDEX_FIELD.getPreferredName(), sourceDataStream);
188+
builder.endObject();
189+
return builder;
115190
}
116191
}
117192
}

0 commit comments

Comments
 (0)