Skip to content
Merged
6 changes: 6 additions & 0 deletions docs/changelog/124884.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 124884
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this reference the current PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

summary: System data streams are not being upgraded in the feature migration API
area: Infra/Core
type: bug
issues:
- 122949
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
Collections.singletonList("test"),
"test",
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of(),
"test",
ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down Expand Up @@ -1192,6 +1193,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down Expand Up @@ -1231,6 +1233,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down Expand Up @@ -1299,6 +1302,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
exports org.elasticsearch.indices.recovery;
exports org.elasticsearch.indices.recovery.plan;
exports org.elasticsearch.indices.store;
exports org.elasticsearch.indices.system;
exports org.elasticsearch.inference;
exports org.elasticsearch.ingest;
exports org.elasticsearch.internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ private static void addBackingIndex(
mapperSupplier,
false,
failureStore,
dataStream.isSystem(),
nodeSettings
);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ static ClusterState migrateToDataStream(
ProjectMetadata.Builder mb = ProjectMetadata.builder(project);
for (Index index : alias.getIndices()) {
IndexMetadata im = project.index(index);
prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, Settings.EMPTY);
prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, false, Settings.EMPTY);
}
ClusterState updatedState = ClusterState.builder(projectState.cluster()).putProjectMetadata(mb).build();

Expand Down Expand Up @@ -212,6 +212,8 @@ static void validateRequest(ProjectMetadata project, MigrateToDataStreamClusterS
* exception should be thrown in that case instead
* @param failureStore <code>true</code> if the index is being migrated into the data stream's failure store, <code>false</code> if it
* is being migrated into the data stream's backing indices
* @param makeSystem <code>true</code> if the index is being migrated into the system data stream, <code>false</code> if it
* is being migrated into non-system data stream
* @param nodeSettings The settings for the current node
*/
static void prepareBackingIndex(
Expand All @@ -221,6 +223,7 @@ static void prepareBackingIndex(
Function<IndexMetadata, MapperService> mapperSupplier,
boolean removeAlias,
boolean failureStore,
boolean makeSystem,
Settings nodeSettings
) throws IOException {
MappingMetadata mm = im.mapping();
Expand Down Expand Up @@ -251,6 +254,7 @@ static void prepareBackingIndex(
imb.mappingVersion(im.getMappingVersion() + 1)
.mappingsUpdatedVersion(IndexVersion.current())
.putMapping(new MappingMetadata(mapper));
imb.system(makeSystem);
b.put(imb);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ private List<IndexMetadata> updateIndices(ClusterState currentState, List<Index>
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
for (Index index : indices) {
IndexMetadata indexMetadata = metadata.indexMetadata(index);
// this might happen because update is async and the index might have been deleted between task creation and execution
if (indexMetadata == null) {
continue;
}
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
if (updatedIndexMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX;
Expand Down Expand Up @@ -1152,7 +1151,8 @@ public List<HealthIndicatorImpact> getImpacts() {

/**
* Returns the diagnosis for unassigned primary and replica shards.
* @param verbose true if the diagnosis should be generated, false if they should be omitted.
*
* @param verbose true if the diagnosis should be generated, false if they should be omitted.
* @param maxAffectedResourcesCount the max number of affected resources to be returned as part of the diagnosis
* @return The diagnoses list the indicator identified. Alternatively, an empty list if none were found or verbose is false.
*/
Expand Down Expand Up @@ -1243,23 +1243,6 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
}
}

Map<String, Set<ProjectIndexName>> featureToDsBackingIndices = getSystemDsBackingIndicesForProjects(
systemIndices,
affectedProjects,
metadata
);

// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
// the list of affected indices (the feature state will cover the restore of these indices too)
for (Map.Entry<String, Set<ProjectIndexName>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
for (ProjectIndexName featureIndex : featureToBackingIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToBackingIndices.getKey());
affectedIndices.remove(featureIndex);
}
}
}

if (affectedIndices.isEmpty() == false) {
affectedResources.add(
new Diagnosis.Resource(
Expand All @@ -1281,7 +1264,7 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
}

/**
* Retrieve the system indices for the projects and group them by Feature
* Retrieve the system indices and indices backing system data streams for the projects and group them by Feature
*/
private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
SystemIndices systemIndices,
Expand All @@ -1293,7 +1276,7 @@ private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
.collect(
Collectors.toMap(
SystemIndices.Feature::getName,
feature -> feature.getIndexDescriptors()
feature -> feature.getSystemResourceDescriptors()
.stream()
.flatMap(
descriptor -> projects.stream()
Expand All @@ -1307,34 +1290,6 @@ private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
)
);
}

/**
* Retrieve the backing indices for system data stream for the projects and group them by Feature
*/
private static Map<String, Set<ProjectIndexName>> getSystemDsBackingIndicesForProjects(
SystemIndices systemIndices,
Set<ProjectId> projects,
Metadata metadata
) {
return systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getDataStreamDescriptors()
.stream()
.flatMap(
descriptor -> projects.stream()
.flatMap(
projectId -> descriptor.getBackingIndexNames(metadata.getProject(projectId))
.stream()
.map(index -> new ProjectIndexName(projectId, index))
)
)
.collect(Collectors.toSet())
)
);
}
}

public static class SearchableSnapshotsState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.indices.system.IndexPatternMatcher;

import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,15 @@
import java.util.List;

/**
* An IndexPatternMatcher holds an index pattern in a string and, given a
* {@link Metadata} object, can return a list of index names matching that pattern.
* An IndexMatcher given a {@link Metadata} object, can return a list of index names matching that pattern.
*/
public interface IndexPatternMatcher {
/**
* @return A pattern, either with a wildcard or simple regex, describing indices that are
* related to a system feature. Such indices may be system indices or associated
* indices.
*/
String getIndexPattern();

public interface IndexMatcher {
/**
* Retrieves a list of all indices which match this descriptor's pattern. Implementations
* may include other special information when matching indices, such as aliases.
*
* <p>
* This cannot be done via {@link org.elasticsearch.cluster.metadata.IndexNameExpressionResolver} because that class can only handle
* simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax,
* simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax.
*
* @param project The current metadata to get the list of matching indices from
* @return A list of index names that match this descriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.system.SystemResourceDescriptor;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -45,14 +46,15 @@
* <p>The descriptor also provides names for the thread pools that Elasticsearch should use to read, search, or modify the descriptor’s
* indices.
*/
public class SystemDataStreamDescriptor {
public class SystemDataStreamDescriptor implements SystemResourceDescriptor {

private final String dataStreamName;
private final String description;
private final Type type;
private final ComposableIndexTemplate composableIndexTemplate;
private final Map<String, ComponentTemplate> componentTemplates;
private final List<String> allowedElasticProductOrigins;
private final String origin;
private final ExecutorNames executorNames;

/**
Expand All @@ -66,6 +68,7 @@ public class SystemDataStreamDescriptor {
* {@link ComposableIndexTemplate}
* @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the
* type is {@link Type#EXTERNAL}. Must not be {@code null}
* @param origin specifies the origin to use when creating or updating the data stream
* @param executorNames thread pools that should be used for operations on the system data stream
*/
public SystemDataStreamDescriptor(
Expand All @@ -75,6 +78,7 @@ public SystemDataStreamDescriptor(
ComposableIndexTemplate composableIndexTemplate,
Map<String, ComponentTemplate> componentTemplates,
List<String> allowedElasticProductOrigins,
String origin,
ExecutorNames executorNames
) {
this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified");
Expand All @@ -96,6 +100,7 @@ public SystemDataStreamDescriptor(
throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination");
}
this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;
this.origin = origin;
}

public String getDataStreamName() {
Expand Down Expand Up @@ -125,6 +130,11 @@ public List<String> getBackingIndexNames(ProjectMetadata projectMetadata) {
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(Index::getName).toList();
}

@Override
public List<String> getMatchingIndices(ProjectMetadata metadata) {
return getBackingIndexNames(metadata);
}

public String getDescription() {
return description;
}
Expand All @@ -133,6 +143,17 @@ public ComposableIndexTemplate getComposableIndexTemplate() {
return composableIndexTemplate;
}

@Override
public String getOrigin() {
return origin;
}

@Override
public boolean isAutomaticallyManaged() {
return true;
}

@Override
public boolean isExternal() {
return type == Type.EXTERNAL;
}
Expand All @@ -142,9 +163,10 @@ public String getBackingIndexPattern() {
}

private static String backingIndexPatternForDataStream(String dataStream) {
return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*";
return ".(migrated-){0,}[fd]s-" + dataStream + "-*";
}

@Override
public List<String> getAllowedElasticProductOrigins() {
return allowedElasticProductOrigins;
}
Expand All @@ -157,6 +179,7 @@ public Map<String, ComponentTemplate> getComponentTemplates() {
* Get the names of the thread pools that should be used for operations on this data stream.
* @return Names for get, search, and write executors.
*/
@Override
public ExecutorNames getThreadPoolNames() {
return this.executorNames;
}
Expand Down
Loading
Loading