Skip to content

Commit 3e51ea6

Browse files
[9.0] System data streams are not being upgraded in the feature migration API (elastic#124884)
This commit adds support for system data streams reindexing. The system data stream migration extends the existing system indices migration task and uses the data stream reindex API. The system index migration task starts a reindex data stream task and tracks its status every second. Only one system index or system data stream is migrated at a time. If a data stream migration fails, the entire system index migration task will also fail. Port of elastic#123926
1 parent 0fc2837 commit 3e51ea6

File tree

53 files changed

+1602
-251
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1602
-251
lines changed

docs/changelog/124884.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 124884
2+
summary: System data streams are not being upgraded in the feature migration API
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 122949

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
323323
.build(),
324324
Map.of(),
325325
List.of("product"),
326+
"product",
326327
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
327328
)
328329
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
275275
.build(),
276276
Map.of(),
277277
Collections.singletonList("test"),
278+
"test",
278279
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
279280
)
280281
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
210210
.build(),
211211
Map.of(),
212212
List.of("product"),
213+
"product",
213214
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
214215
)
215216
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,6 +1272,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
12721272
.build(),
12731273
Map.of(),
12741274
List.of(),
1275+
"test",
12751276
ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS
12761277
)
12771278
);

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@
283283
exports org.elasticsearch.indices.recovery;
284284
exports org.elasticsearch.indices.recovery.plan;
285285
exports org.elasticsearch.indices.store;
286+
exports org.elasticsearch.indices.system;
286287
exports org.elasticsearch.inference;
287288
exports org.elasticsearch.ingest;
288289
exports org.elasticsearch.internal

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ private static void addBackingIndex(
351351
mapperSupplier,
352352
false,
353353
failureStore,
354+
dataStream.isSystem(),
354355
nodeSettings
355356
);
356357
} catch (IOException e) {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ static void prepareBackingIndex(
205205
Function<IndexMetadata, MapperService> mapperSupplier,
206206
boolean removeAlias
207207
) throws IOException {
208-
prepareBackingIndex(b, im, dataStreamName, mapperSupplier, removeAlias, false, Settings.EMPTY);
208+
prepareBackingIndex(b, im, dataStreamName, mapperSupplier, removeAlias, false, false, Settings.EMPTY);
209209
}
210210

211211
/**
@@ -219,6 +219,8 @@ static void prepareBackingIndex(
219219
* exception should be thrown in that case instead
220220
* @param failureStore <code>true</code> if the index is being migrated into the data stream's failure store, <code>false</code> if it
221221
* is being migrated into the data stream's backing indices
222+
* @param makeSystem <code>true</code> if the index is being migrated into the system data stream, <code>false</code> if it
223+
* is being migrated into non-system data stream
222224
* @param nodeSettings The settings for the current node
223225
*/
224226
static void prepareBackingIndex(
@@ -228,6 +230,7 @@ static void prepareBackingIndex(
228230
Function<IndexMetadata, MapperService> mapperSupplier,
229231
boolean removeAlias,
230232
boolean failureStore,
233+
boolean makeSystem,
231234
Settings nodeSettings
232235
) throws IOException {
233236
MappingMetadata mm = im.mapping();
@@ -258,6 +261,7 @@ static void prepareBackingIndex(
258261
imb.mappingVersion(im.getMappingVersion() + 1)
259262
.mappingsUpdatedVersion(IndexVersion.current())
260263
.putMapping(new MappingMetadata(mapper));
264+
imb.system(makeSystem);
261265
b.put(imb);
262266
}
263267

server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,10 @@ private List<IndexMetadata> updateIndices(ClusterState currentState, List<Index>
239239
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
240240
for (Index index : indices) {
241241
IndexMetadata indexMetadata = metadata.index(index);
242+
// this might happen because update is async and the index might have been deleted between task creation and execution
243+
if (indexMetadata == null) {
244+
continue;
245+
}
242246
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
243247
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
244248
if (updatedIndexMetadata != null) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,7 +1195,7 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
11951195
.collect(
11961196
toMap(
11971197
SystemIndices.Feature::getName,
1198-
feature -> feature.getIndexDescriptors()
1198+
feature -> feature.getSystemResourceDescriptors()
11991199
.stream()
12001200
.flatMap(descriptor -> descriptor.getMatchingIndices(metadata).stream())
12011201
.collect(toSet())
@@ -1211,29 +1211,6 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
12111211
}
12121212
}
12131213

1214-
Map<String, Set<String>> featureToDsBackingIndices = systemIndices.getFeatures()
1215-
.stream()
1216-
.collect(
1217-
toMap(
1218-
SystemIndices.Feature::getName,
1219-
feature -> feature.getDataStreamDescriptors()
1220-
.stream()
1221-
.flatMap(descriptor -> descriptor.getBackingIndexNames(metadata).stream())
1222-
.collect(toSet())
1223-
)
1224-
);
1225-
1226-
// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
1227-
// the list of affected indices (the feature state will cover the restore of these indices too)
1228-
for (Map.Entry<String, Set<String>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
1229-
for (String featureIndex : featureToBackingIndices.getValue()) {
1230-
if (restoreFromSnapshotIndices.contains(featureIndex)) {
1231-
affectedFeatureStates.add(featureToBackingIndices.getKey());
1232-
affectedIndices.remove(featureIndex);
1233-
}
1234-
}
1235-
}
1236-
12371214
if (affectedIndices.isEmpty() == false) {
12381215
affectedResources.add(new Diagnosis.Resource(INDEX, affectedIndices.stream().limit(maxAffectedResourcesCount).toList()));
12391216
}

0 commit comments

Comments
 (0)