Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/124884.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 124884
summary: System data streams are not being upgraded in the feature migration API
area: Infra/Core
type: bug
issues:
- 122949
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
Collections.singletonList("test"),
"test",
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
.build(),
Map.of(),
List.of(),
"test",
ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS
)
);
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
exports org.elasticsearch.indices.recovery;
exports org.elasticsearch.indices.recovery.plan;
exports org.elasticsearch.indices.store;
exports org.elasticsearch.indices.system;
exports org.elasticsearch.inference;
exports org.elasticsearch.ingest;
exports org.elasticsearch.internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ private static void addBackingIndex(
mapperSupplier,
false,
failureStore,
dataStream.isSystem(),
nodeSettings
);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ static void prepareBackingIndex(
Function<IndexMetadata, MapperService> mapperSupplier,
boolean removeAlias
) throws IOException {
prepareBackingIndex(b, im, dataStreamName, mapperSupplier, removeAlias, false, Settings.EMPTY);
prepareBackingIndex(b, im, dataStreamName, mapperSupplier, removeAlias, false, false, Settings.EMPTY);
}

/**
Expand All @@ -219,6 +219,8 @@ static void prepareBackingIndex(
* exception should be thrown in that case instead
* @param failureStore <code>true</code> if the index is being migrated into the data stream's failure store, <code>false</code> if it
* is being migrated into the data stream's backing indices
* @param makeSystem <code>true</code> if the index is being migrated into the system data stream, <code>false</code> if it
* is being migrated into non-system data stream
* @param nodeSettings The settings for the current node
*/
static void prepareBackingIndex(
Expand All @@ -228,6 +230,7 @@ static void prepareBackingIndex(
Function<IndexMetadata, MapperService> mapperSupplier,
boolean removeAlias,
boolean failureStore,
boolean makeSystem,
Settings nodeSettings
) throws IOException {
MappingMetadata mm = im.mapping();
Expand Down Expand Up @@ -258,6 +261,7 @@ static void prepareBackingIndex(
imb.mappingVersion(im.getMappingVersion() + 1)
.mappingsUpdatedVersion(IndexVersion.current())
.putMapping(new MappingMetadata(mapper));
imb.system(makeSystem);
b.put(imb);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ private List<IndexMetadata> updateIndices(ClusterState currentState, List<Index>
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
for (Index index : indices) {
IndexMetadata indexMetadata = metadata.index(index);
// this might happen because update is async and the index might have been deleted between task creation and execution
if (indexMetadata == null) {
continue;
}
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
if (updatedIndexMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getIndexDescriptors()
feature -> feature.getSystemResourceDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(metadata).stream())
.collect(toSet())
Expand All @@ -1211,29 +1211,6 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
}
}

Map<String, Set<String>> featureToDsBackingIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getDataStreamDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getBackingIndexNames(metadata).stream())
.collect(toSet())
)
);

// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
// the list of affected indices (the feature state will cover the restore of these indices too)
for (Map.Entry<String, Set<String>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
for (String featureIndex : featureToBackingIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToBackingIndices.getKey());
affectedIndices.remove(featureIndex);
}
}
}

if (affectedIndices.isEmpty() == false) {
affectedResources.add(new Diagnosis.Resource(INDEX, affectedIndices.stream().limit(maxAffectedResourcesCount).toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.indices.system.IndexPatternMatcher;

import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,15 @@
import java.util.List;

/**
* An IndexPatternMatcher holds an index pattern in a string and, given a
* {@link Metadata} object, can return a list of index names matching that pattern.
* An IndexMatcher given a {@link Metadata} object, can return a list of index names matching that pattern.
*/
public interface IndexPatternMatcher {
/**
* @return A pattern, either with a wildcard or simple regex, describing indices that are
* related to a system feature. Such indices may be system indices or associated
* indices.
*/
String getIndexPattern();

public interface IndexMatcher {
/**
* Retrieves a list of all indices which match this descriptor's pattern. Implementations
* may include other special information when matching indices, such as aliases.
*
* <p>
* This cannot be done via {@link org.elasticsearch.cluster.metadata.IndexNameExpressionResolver} because that class can only handle
* simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax,
* simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax.
*
* @param metadata The current metadata to get the list of matching indices from
* @return A list of index names that match this descriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.system.SystemResourceDescriptor;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -44,14 +45,15 @@
* <p>The descriptor also provides names for the thread pools that Elasticsearch should use to read, search, or modify the descriptor’s
* indices.
*/
public class SystemDataStreamDescriptor {
public class SystemDataStreamDescriptor implements SystemResourceDescriptor {

private final String dataStreamName;
private final String description;
private final Type type;
private final ComposableIndexTemplate composableIndexTemplate;
private final Map<String, ComponentTemplate> componentTemplates;
private final List<String> allowedElasticProductOrigins;
private final String origin;
private final ExecutorNames executorNames;

/**
Expand All @@ -65,6 +67,7 @@ public class SystemDataStreamDescriptor {
* {@link ComposableIndexTemplate}
* @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the
* type is {@link Type#EXTERNAL}. Must not be {@code null}
* @param origin specifies the origin to use when creating or updating the data stream
* @param executorNames thread pools that should be used for operations on the system data stream
*/
public SystemDataStreamDescriptor(
Expand All @@ -74,6 +77,7 @@ public SystemDataStreamDescriptor(
ComposableIndexTemplate composableIndexTemplate,
Map<String, ComponentTemplate> componentTemplates,
List<String> allowedElasticProductOrigins,
String origin,
ExecutorNames executorNames
) {
this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified");
Expand All @@ -95,6 +99,7 @@ public SystemDataStreamDescriptor(
throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination");
}
this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;
this.origin = origin;
}

public String getDataStreamName() {
Expand All @@ -114,6 +119,11 @@ public List<String> getBackingIndexNames(Metadata metadata) {
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(Index::getName).toList();
}

@Override
public List<String> getMatchingIndices(Metadata metadata) {
return getBackingIndexNames(metadata);
}

public String getDescription() {
return description;
}
Expand All @@ -122,6 +132,17 @@ public ComposableIndexTemplate getComposableIndexTemplate() {
return composableIndexTemplate;
}

@Override
public String getOrigin() {
return origin;
}

@Override
public boolean isAutomaticallyManaged() {
return true;
}

@Override
public boolean isExternal() {
return type == Type.EXTERNAL;
}
Expand All @@ -131,9 +152,10 @@ public String getBackingIndexPattern() {
}

private static String backingIndexPatternForDataStream(String dataStream) {
return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*";
return ".(migrated-)?[fd]s-" + dataStream + "-*";
}

@Override
public List<String> getAllowedElasticProductOrigins() {
return allowedElasticProductOrigins;
}
Expand All @@ -146,6 +168,7 @@ public Map<String, ComponentTemplate> getComponentTemplates() {
* Get the names of the thread pools that should be used for operations on this data stream.
* @return Names for get, search, and write executors.
*/
@Override
public ExecutorNames getThreadPoolNames() {
return this.executorNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.indices.system.IndexPatternMatcher;
import org.elasticsearch.indices.system.SystemResourceDescriptor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -103,7 +105,7 @@
* A system index that is fully internal to Elasticsearch will not allow any product origins; such an index is fully "locked down,"
* and in general can only be changed by restoring feature states from snapshots.
*/
public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<SystemIndexDescriptor> {
public class SystemIndexDescriptor implements IndexPatternMatcher, SystemResourceDescriptor, Comparable<SystemIndexDescriptor> {

public static final Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build();

Expand Down Expand Up @@ -297,7 +299,7 @@ protected SystemIndexDescriptor(
}

Objects.requireNonNull(allowedElasticProductOrigins, "allowedProductOrigins must not be null");
if (type.isInternal() && allowedElasticProductOrigins.isEmpty() == false) {
if (type.isExternal() == false && allowedElasticProductOrigins.isEmpty() == false) {
throw new IllegalArgumentException("Allowed origins are not valid for internal system indices");
} else if (type.isExternal() && allowedElasticProductOrigins.isEmpty()) {
throw new IllegalArgumentException("External system indices without allowed products is not a valid combination");
Expand Down Expand Up @@ -442,9 +444,7 @@ public List<String> getMatchingIndices(Metadata metadata) {
return metadata.indices().keySet().stream().filter(this::matchesIndexPattern).toList();
}

/**
* @return A short description of the purpose of this system index.
*/
@Override
public String getDescription() {
return description;
}
Expand Down Expand Up @@ -473,16 +473,12 @@ public int getIndexFormat() {
return this.indexFormat;
}

@Override
public boolean isAutomaticallyManaged() {
return type.isManaged();
}

/**
* Get an origin string suitable for use in an {@link org.elasticsearch.client.internal.OriginSettingClient}. See
* {@link Builder#setOrigin(String)} for more information.
*
* @return an origin string to use for sub-requests
*/
@Override
public String getOrigin() {
// TODO[wrb]: most unmanaged system indices do not set origins; could we assert on that here?
return this.origin;
Expand All @@ -493,20 +489,12 @@ public boolean hasDynamicMappings() {
return this.hasDynamicMappings;
}

@Override
public boolean isExternal() {
return type.isExternal();
}

public boolean isInternal() {
return type.isInternal();
}

/**
* Requests from these products, if made with the proper security credentials, are allowed non-deprecated access to this descriptor's
* indices. (Product names may be specified in requests with the
* {@link org.elasticsearch.tasks.Task#X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER}).
* @return A list of product names.
*/
@Override
public List<String> getAllowedElasticProductOrigins() {
return allowedElasticProductOrigins;
}
Expand Down Expand Up @@ -574,6 +562,7 @@ public SystemIndexDescriptor getDescriptorCompatibleWith(MappingsVersion version
/**
* @return The names of thread pools that should be used for operations on this system index.
*/
@Override
public ExecutorNames getThreadPoolNames() {
return this.executorNames;
}
Expand Down Expand Up @@ -626,10 +615,6 @@ public boolean isExternal() {
public boolean isManaged() {
return managed;
}

public boolean isInternal() {
return external == false;
}
}

/**
Expand Down
Loading