Skip to content

Commit ecf9adf

Browse files
[main] System data streams are not being upgraded in the feature migration API (#126409)
This commit adds support for system data streams reindexing. The system data stream migration extends the existing system indices migration task and uses the data stream reindex API. The system index migration task starts a reindex data stream task and tracks its status every second. Only one system index or system data stream is migrated at a time. If a data stream migration fails, the entire system index migration task will also fail. Port of #123926
1 parent 728eb75 commit ecf9adf

File tree

55 files changed

+1634
-285
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1634
-285
lines changed

docs/changelog/126409.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126409
2+
summary: System data streams are not being upgraded in the feature migration API
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 122949

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
326326
.build(),
327327
Map.of(),
328328
List.of("product"),
329+
"product",
329330
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
330331
)
331332
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
275275
.build(),
276276
Map.of(),
277277
Collections.singletonList("test"),
278+
"test",
278279
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
279280
)
280281
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
213213
.build(),
214214
Map.of(),
215215
List.of("product"),
216+
"product",
216217
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
217218
)
218219
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
10941094
.build(),
10951095
Map.of(),
10961096
List.of(),
1097+
"test",
10971098
ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS
10981099
)
10991100
);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,6 +1153,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
11531153
.build(),
11541154
Map.of(),
11551155
List.of("product"),
1156+
"product",
11561157
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
11571158
)
11581159
);
@@ -1192,6 +1193,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
11921193
.build(),
11931194
Map.of(),
11941195
List.of("product"),
1196+
"product",
11951197
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
11961198
)
11971199
);
@@ -1231,6 +1233,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
12311233
.build(),
12321234
Map.of(),
12331235
List.of("product"),
1236+
"product",
12341237
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
12351238
)
12361239
);
@@ -1299,6 +1302,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
12991302
.build(),
13001303
Map.of(),
13011304
List.of("product"),
1305+
"product",
13021306
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
13031307
)
13041308
);

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@
282282
exports org.elasticsearch.indices.recovery;
283283
exports org.elasticsearch.indices.recovery.plan;
284284
exports org.elasticsearch.indices.store;
285+
exports org.elasticsearch.indices.system;
285286
exports org.elasticsearch.inference;
286287
exports org.elasticsearch.ingest;
287288
exports org.elasticsearch.internal

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ private static void addBackingIndex(
382382
mapperSupplier,
383383
false,
384384
failureStore,
385+
dataStream.isSystem(),
385386
nodeSettings
386387
);
387388
} catch (IOException e) {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static ClusterState migrateToDataStream(
156156
ProjectMetadata.Builder mb = ProjectMetadata.builder(project);
157157
for (Index index : alias.getIndices()) {
158158
IndexMetadata im = project.index(index);
159-
prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, Settings.EMPTY);
159+
prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, false, Settings.EMPTY);
160160
}
161161
ClusterState updatedState = ClusterState.builder(projectState.cluster()).putProjectMetadata(mb).build();
162162

@@ -212,6 +212,8 @@ static void validateRequest(ProjectMetadata project, MigrateToDataStreamClusterS
212212
* exception should be thrown in that case instead
213213
* @param failureStore <code>true</code> if the index is being migrated into the data stream's failure store, <code>false</code> if it
214214
* is being migrated into the data stream's backing indices
215+
* @param makeSystem <code>true</code> if the index is being migrated into the system data stream, <code>false</code> if it
216+
* is being migrated into non-system data stream
215217
* @param nodeSettings The settings for the current node
216218
*/
217219
static void prepareBackingIndex(
@@ -221,6 +223,7 @@ static void prepareBackingIndex(
221223
Function<IndexMetadata, MapperService> mapperSupplier,
222224
boolean removeAlias,
223225
boolean failureStore,
226+
boolean makeSystem,
224227
Settings nodeSettings
225228
) throws IOException {
226229
MappingMetadata mm = im.mapping();
@@ -251,6 +254,7 @@ static void prepareBackingIndex(
251254
imb.mappingVersion(im.getMappingVersion() + 1)
252255
.mappingsUpdatedVersion(IndexVersion.current())
253256
.putMapping(new MappingMetadata(mapper));
257+
imb.system(makeSystem);
254258
b.put(imb);
255259
}
256260

server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ private List<IndexMetadata> updateIndices(ClusterState currentState, List<Index>
241241
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
242242
for (Index index : indices) {
243243
IndexMetadata indexMetadata = metadata.indexMetadata(index);
244+
// this might happen because update is async and the index might have been deleted between task creation and execution
245+
if (indexMetadata == null) {
246+
continue;
247+
}
244248
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
245249
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
246250
if (updatedIndexMetadata != null) {

0 commit comments

Comments
 (0)