diff --git a/docs/changelog/121392.yaml b/docs/changelog/121392.yaml new file mode 100644 index 0000000000000..6323789f071d8 --- /dev/null +++ b/docs/changelog/121392.yaml @@ -0,0 +1,5 @@ +pr: 121392 +summary: Include data streams when converting an existing resource to a system resource +area: Infra/Core +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 1313060936f63..ced580717c2fb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -301,6 +301,13 @@ public boolean isFailureStoreIndex(String indexName) { return failureIndices.containsIndex(indexName); } + /** + * Returns true if the index name provided belongs to this data stream. + */ + public boolean containsIndex(String indexName) { + return backingIndices.containsIndex(indexName) || failureIndices.containsIndex(indexName); + } + public DataStreamOptions getDataStreamOptions() { return dataStreamOptions; } @@ -782,8 +789,9 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) { // ensure that no aliases reference index ensureNoAliasesOnIndex(clusterMetadata, index); - List backingIndices = new ArrayList<>(this.backingIndices.indices); - backingIndices.add(0, index); + List backingIndices = new ArrayList<>(this.backingIndices.indices.size() + 1); + backingIndices.add(index); + backingIndices.addAll(this.backingIndices.indices); assert backingIndices.size() == this.backingIndices.indices.size() + 1; return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build()) .setGeneration(generation + 1) @@ -808,8 +816,9 @@ public DataStream addFailureStoreIndex(Metadata clusterMetadata, Index index) { ensureNoAliasesOnIndex(clusterMetadata, index); - List updatedFailureIndices = new ArrayList<>(failureIndices.indices); - updatedFailureIndices.add(0, index); + List updatedFailureIndices = new ArrayList<>(failureIndices.indices.size() + 1); + updatedFailureIndices.add(index); + updatedFailureIndices.addAll(failureIndices.indices); assert updatedFailureIndices.size() == failureIndices.indices.size() + 1; return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).build()) .setGeneration(generation + 1) @@ -1039,7 +1048,7 @@ private boolean isIndexOlderThan( * we return false. */ public boolean isIndexManagedByDataStreamLifecycle(Index index, Function indexMetadataSupplier) { - if (backingIndices.containsIndex(index.getName()) == false && failureIndices.containsIndex(index.getName()) == false) { + if (containsIndex(index.getName()) == false) { return false; } IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index 228bb3b222a57..a79923921d37e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -14,18 +14,28 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.index.Index; import org.elasticsearch.indices.SystemIndexMappingUpdateService; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * A service responsible for updating the metadata used by system indices. @@ -38,48 +48,62 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { private final SystemIndices systemIndices; private final ClusterService clusterService; - - private volatile boolean updateTaskPending = false; - - private volatile long triggeredVersion = -1L; + private final MasterServiceTaskQueue taskQueue; public SystemIndexMetadataUpgradeService(SystemIndices systemIndices, ClusterService clusterService) { this.systemIndices = systemIndices; this.clusterService = clusterService; + this.taskQueue = clusterService.createTaskQueue( + "system-indices-metadata-upgrade", + Priority.NORMAL, + new SystemIndexMetadataUpgradeExecutor() + ); } @Override public void clusterChanged(ClusterChangedEvent event) { - if (updateTaskPending == false - && event.localNodeMaster() + Metadata currentMetadata = event.state().metadata(); + Metadata previousMetadata = event.previousState().metadata(); + if (event.localNodeMaster() && (event.previousState().nodes().isLocalNodeElectedMaster() == false - || event.state().metadata().indices() != event.previousState().metadata().indices())) { - final Map indexMetadataMap = event.state().metadata().indices(); - final var previousIndices = event.previousState().metadata().indices(); - final long triggerV = event.state().version(); - triggeredVersion = triggerV; + || currentMetadata.indices() != previousMetadata.indices() + || currentMetadata.dataStreams() != previousMetadata.dataStreams())) { + final Map indexMetadataMap = currentMetadata.indices(); + final var previousIndices = previousMetadata.indices(); + Map dataStreams = currentMetadata.dataStreams(); + Map previousDataStreams = previousMetadata.dataStreams(); // Fork to the management pool to avoid blocking the cluster applier thread unnecessarily for very large index counts // TODO: we should have a more efficient way of getting just the changed indices so that we don't have to fork here clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { @Override protected void doRun() { - if (triggeredVersion != triggerV) { - // don't run if another newer check task was triggered already - return; + Collection changedDataStreams = new ArrayList<>(); + Set dataStreamIndices = new HashSet<>(); + for (Map.Entry cursor : dataStreams.entrySet()) { + DataStream dataStream = cursor.getValue(); + if (dataStream != previousDataStreams.get(cursor.getKey())) { + if (requiresUpdate(dataStream)) { + changedDataStreams.add(dataStream); + } + } + + getIndicesBackingDataStream(dataStream).forEach(dataStreamIndices::add); } + + Collection changedIndices = new ArrayList<>(); for (Map.Entry cursor : indexMetadataMap.entrySet()) { - if (cursor.getValue() != previousIndices.get(cursor.getKey())) { - IndexMetadata indexMetadata = cursor.getValue(); + IndexMetadata indexMetadata = cursor.getValue(); + Index index = indexMetadata.getIndex(); + if (cursor.getValue() != previousIndices.get(cursor.getKey()) && dataStreamIndices.contains(index) == false) { if (requiresUpdate(indexMetadata)) { - updateTaskPending = true; - submitUnbatchedTask( - "system_index_metadata_upgrade_service {system metadata change}", - new SystemIndexMetadataUpdateTask() - ); - break; + changedIndices.add(index); } } } + + if (changedIndices.isEmpty() == false || changedDataStreams.isEmpty() == false) { + submitUpdateTask(changedIndices, changedDataStreams); + } } @Override @@ -91,6 +115,12 @@ public void onFailure(Exception e) { } } + // visible for testing + void submitUpdateTask(Collection changedIndices, Collection changedDataStreams) { + SystemIndexMetadataUpgradeTask task = new SystemIndexMetadataUpgradeTask(changedIndices, changedDataStreams); + taskQueue.submitTask("system-index-metadata-upgrade-service", task, null); + } + // package-private for testing boolean requiresUpdate(IndexMetadata indexMetadata) { final boolean shouldBeSystem = shouldBeSystem(indexMetadata); @@ -107,6 +137,30 @@ boolean requiresUpdate(IndexMetadata indexMetadata) { return false; } + // package-private for testing + boolean requiresUpdate(DataStream dataStream) { + final boolean shouldBeSystem = shouldBeSystem(dataStream); + + // should toggle system index status + if (shouldBeSystem != dataStream.isSystem()) { + return true; + } + + if (shouldBeSystem) { + return dataStream.isHidden() == false; + } + + return false; + } + + private boolean shouldBeSystem(DataStream dataStream) { + return systemIndices.isSystemDataStream(dataStream.getName()); + } + + private static Stream getIndicesBackingDataStream(DataStream dataStream) { + return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()); + } + // package-private for testing static boolean isVisible(IndexMetadata indexMetadata) { return indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_INDEX_HIDDEN, false) == false; @@ -114,8 +168,7 @@ static boolean isVisible(IndexMetadata indexMetadata) { // package-private for testing boolean shouldBeSystem(IndexMetadata indexMetadata) { - return systemIndices.isSystemIndex(indexMetadata.getIndex()) - || systemIndices.isSystemIndexBackingDataStream(indexMetadata.getIndex().getName()); + return systemIndices.isSystemIndex(indexMetadata.getIndex()); } // package-private for testing @@ -123,73 +176,148 @@ static boolean hasVisibleAlias(IndexMetadata indexMetadata) { return indexMetadata.getAliases().values().stream().anyMatch(a -> Boolean.FALSE.equals(a.isHidden())); } - @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here - private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { - clusterService.submitUnbatchedStateUpdateTask(source, task); - } + private record SystemIndexMetadataUpgradeTask(Collection changedIndices, Collection changedDataStreams) + implements + ClusterStateTaskListener { - // visible for testing - SystemIndexMetadataUpdateTask getTask() { - return new SystemIndexMetadataUpdateTask(); - } + @Override + public void onFailure(Exception e) { + logger.error("System index metadata upgrade failed", e); + } - public class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask { + @Override + public String toString() { + return "SystemIndexMetadataUpgradeTask[changedIndices=" + + changedIndices.stream().map(Index::getName).collect(Collectors.joining(",")) + + ";changedDataStreams=" + + changedDataStreams.stream().map(DataStream::getName).collect(Collectors.joining(",")) + + "]"; + } + } + private class SystemIndexMetadataUpgradeExecutor implements ClusterStateTaskExecutor { @Override - public ClusterState execute(ClusterState currentState) throws Exception { - final Map indexMetadataMap = currentState.metadata().indices(); + public ClusterState execute(BatchExecutionContext batchExecutionContext) { + ClusterState initialState = batchExecutionContext.initialState(); + + List> taskContexts = batchExecutionContext.taskContexts(); + List indices = taskContexts.stream() + .map(TaskContext::getTask) + .map(SystemIndexMetadataUpgradeTask::changedIndices) + .flatMap(Collection::stream) + .toList(); + List updatedMetadata = updateIndices(initialState, indices); + + List dataStreams = taskContexts.stream() + .map(TaskContext::getTask) + .map(SystemIndexMetadataUpgradeTask::changedDataStreams) + .flatMap(Collection::stream) + .toList(); + List updatedDataStreams = updateDataStreams(dataStreams); + List updatedBackingIndices = updateIndicesBackingDataStreams(initialState, updatedDataStreams); + + for (TaskContext taskContext : taskContexts) { + taskContext.success(() -> {}); + } + + if (updatedMetadata.isEmpty() == false || updatedDataStreams.isEmpty() == false) { + Metadata.Builder builder = Metadata.builder(initialState.metadata()); + updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true)); + updatedDataStreams.forEach(builder::put); + updatedBackingIndices.forEach(idxMeta -> builder.put(idxMeta, true)); + + return ClusterState.builder(initialState).metadata(builder).build(); + } + return initialState; + } + + private List updateIndices(ClusterState currentState, List indices) { + if (indices.isEmpty()) { + return Collections.emptyList(); + } + Metadata metadata = currentState.metadata(); final List updatedMetadata = new ArrayList<>(); - for (Map.Entry entry : indexMetadataMap.entrySet()) { - final IndexMetadata indexMetadata = entry.getValue(); + for (Index index : indices) { + IndexMetadata indexMetadata = metadata.index(index); final boolean shouldBeSystem = shouldBeSystem(indexMetadata); - IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); - boolean updated = false; - if (shouldBeSystem != indexMetadata.isSystem()) { - builder.system(indexMetadata.isSystem() == false); - updated = true; - } - if (shouldBeSystem && isVisible(indexMetadata)) { - builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true)); - builder.settingsVersion(builder.settingsVersion() + 1); - updated = true; + IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem); + if (updatedIndexMetadata != null) { + updatedMetadata.add(updatedIndexMetadata); } - if (shouldBeSystem && hasVisibleAlias(indexMetadata)) { - for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) { - if (Boolean.FALSE.equals(aliasMetadata.isHidden())) { - builder.removeAlias(aliasMetadata.alias()); - builder.putAlias( - AliasMetadata.builder(aliasMetadata.alias()) - .filter(aliasMetadata.filter()) - .indexRouting(aliasMetadata.indexRouting()) - .isHidden(true) - .searchRouting(aliasMetadata.searchRouting()) - .writeIndex(aliasMetadata.writeIndex()) - ); - } + } + return updatedMetadata; + } + + private IndexMetadata updateIndexIfNecessary(IndexMetadata indexMetadata, boolean shouldBeSystem) { + IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); + boolean updated = false; + if (shouldBeSystem != indexMetadata.isSystem()) { + builder.system(indexMetadata.isSystem() == false); + updated = true; + } + if (shouldBeSystem && isVisible(indexMetadata)) { + builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true)); + builder.settingsVersion(builder.settingsVersion() + 1); + updated = true; + } + if (shouldBeSystem && hasVisibleAlias(indexMetadata)) { + for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) { + if (Boolean.FALSE.equals(aliasMetadata.isHidden())) { + builder.removeAlias(aliasMetadata.alias()); + builder.putAlias( + AliasMetadata.builder(aliasMetadata.alias()) + .filter(aliasMetadata.filter()) + .indexRouting(aliasMetadata.indexRouting()) + .isHidden(true) + .searchRouting(aliasMetadata.searchRouting()) + .writeIndex(aliasMetadata.writeIndex()) + ); + updated = true; } } - if (updated) { - updatedMetadata.add(builder.build()); - } } + return updated ? builder.build() : null; + } - if (updatedMetadata.isEmpty() == false) { - final Metadata.Builder builder = Metadata.builder(currentState.metadata()); - updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true)); - return ClusterState.builder(currentState).metadata(builder).build(); + private List updateDataStreams(List dataStreams) { + if (dataStreams.isEmpty()) { + return Collections.emptyList(); + } + List updatedDataStreams = new ArrayList<>(); + for (DataStream dataStream : dataStreams) { + boolean shouldBeSystem = shouldBeSystem(dataStream); + if (dataStream.isSystem() != shouldBeSystem) { + DataStream.Builder dataStreamBuilder = dataStream.copy().setSystem(shouldBeSystem); + if (shouldBeSystem) { + dataStreamBuilder.setHidden(true); + } + + updatedDataStreams.add(dataStreamBuilder.build()); + } } - return currentState; + return updatedDataStreams; } - @Override - public void onFailure(Exception e) { - updateTaskPending = false; - logger.error("failed to update system index metadata", e); + private List updateIndicesBackingDataStreams(ClusterState currentState, List updatedDataStreams) { + if (updatedDataStreams.isEmpty()) { + return Collections.emptyList(); + } + Metadata metadata = currentState.metadata(); + final List updatedMetadata = new ArrayList<>(); + + for (DataStream updatedDataStream : updatedDataStreams) { + boolean shouldBeSystem = updatedDataStream.isSystem(); + List updatedIndicesMetadata = getIndicesBackingDataStreamMetadata(metadata, updatedDataStream).map( + idx -> updateIndexIfNecessary(idx, shouldBeSystem) + ).filter(Objects::nonNull).toList(); + + updatedMetadata.addAll(updatedIndicesMetadata); + } + return updatedMetadata; } - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - updateTaskPending = false; + private Stream getIndicesBackingDataStreamMetadata(Metadata metadata, DataStream dataStream) { + return getIndicesBackingDataStream(dataStream).map(metadata::index); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java index 9a78556f9239b..349e21af98b9e 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -9,17 +9,17 @@ package org.elasticsearch.indices; -import org.apache.lucene.util.automaton.CharacterRunAutomaton; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.index.Index; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; - -import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton; +import java.util.stream.Stream; /** * Describes a {@link DataStream} that is reserved for use by a system feature. @@ -53,7 +53,6 @@ public class SystemDataStreamDescriptor { private final Map componentTemplates; private final List allowedElasticProductOrigins; private final ExecutorNames executorNames; - private final CharacterRunAutomaton characterRunAutomaton; /** * Creates a new descriptor for a system data descriptor @@ -96,8 +95,6 @@ public SystemDataStreamDescriptor( throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination"); } this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS; - - this.characterRunAutomaton = new CharacterRunAutomaton(buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName))); } public String getDataStreamName() { @@ -110,7 +107,11 @@ public String getDataStreamName() { * @return List of names of backing indices */ public List getBackingIndexNames(Metadata metadata) { - return metadata.indices().keySet().stream().filter(this.characterRunAutomaton::run).toList(); + DataStream dataStream = metadata.dataStreams().get(dataStreamName); + if (dataStream == null) { + return Collections.emptyList(); + } + return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(Index::getName).toList(); } public String getDescription() { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java index 0dcea706e7f94..f19cfce264d20 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java @@ -11,20 +11,32 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase { @@ -49,17 +61,62 @@ public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase { .setOrigin("FAKE_ORIGIN") .build(); + private static final String SYSTEM_DATA_STREAM_NAME = ".my-ds"; + private static final String SYSTEM_DATA_STREAM_INDEX_NAME = DataStream.BACKING_INDEX_PREFIX + SYSTEM_DATA_STREAM_NAME + "-1"; + private static final String SYSTEM_DATA_STREAM_FAILSTORE_NAME = DataStream.FAILURE_STORE_PREFIX + SYSTEM_DATA_STREAM_NAME; + private static final SystemDataStreamDescriptor SYSTEM_DATA_STREAM_DESCRIPTOR = new SystemDataStreamDescriptor( + SYSTEM_DATA_STREAM_NAME, + "System datastream for test", + SystemDataStreamDescriptor.Type.INTERNAL, + ComposableIndexTemplate.builder().build(), + Collections.emptyMap(), + Collections.singletonList("FAKE_ORIGIN"), + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ); + private SystemIndexMetadataUpgradeService service; + private ClusterStateTaskListener task; + private ClusterStateTaskExecutor executor; + @SuppressWarnings("unchecked") @Before public void setUpTest() { // set up a system index upgrade service + ClusterService clusterService = mock(ClusterService.class); + MasterServiceTaskQueue queue = mock(MasterServiceTaskQueue.class); + when(clusterService.createTaskQueue(eq("system-indices-metadata-upgrade"), eq(Priority.NORMAL), any())).thenAnswer(invocation -> { + executor = invocation.getArgument(2, ClusterStateTaskExecutor.class); + return queue; + }); + doAnswer(invocation -> { + task = invocation.getArgument(1, ClusterStateTaskListener.class); + return null; + }).when(queue).submitTask(any(), any(), any()); + this.service = new SystemIndexMetadataUpgradeService( - new SystemIndices(List.of(new SystemIndices.Feature("foo", "a test feature", List.of(DESCRIPTOR)))), - mock(ClusterService.class) + new SystemIndices( + List.of( + new SystemIndices.Feature("foo", "a test feature", List.of(DESCRIPTOR)), + new SystemIndices.Feature( + "sds", + "system data stream feature", + Collections.emptyList(), + Collections.singletonList(SYSTEM_DATA_STREAM_DESCRIPTOR) + ) + ) + ), + clusterService ); } + private ClusterState executeTask(ClusterState clusterState) { + try { + return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(clusterState, executor, Collections.singletonList(task)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * When we upgrade Elasticsearch versions, existing indices may be newly * defined as system indices. If such indices are set without "hidden," we need @@ -75,6 +132,54 @@ public void testUpgradeVisibleIndexToSystemIndex() throws Exception { assertSystemUpgradeAppliesHiddenSetting(hiddenIndexMetadata); } + public void testUpgradeDataStreamToSystemDataStream() { + IndexMetadata dsIndexMetadata = IndexMetadata.builder(SYSTEM_DATA_STREAM_INDEX_NAME) + .system(false) + .settings(getSettingsBuilder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true)) + .build(); + IndexMetadata fsIndexMetadata = IndexMetadata.builder(SYSTEM_DATA_STREAM_FAILSTORE_NAME) + .system(false) + .settings(getSettingsBuilder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true)) + .build(); + DataStream.DataStreamIndices failureIndices = DataStream.DataStreamIndices.failureIndicesBuilder( + Collections.singletonList(fsIndexMetadata.getIndex()) + ).build(); + DataStream dataStream = DataStream.builder(SYSTEM_DATA_STREAM_NAME, Collections.singletonList(dsIndexMetadata.getIndex())) + .setFailureIndices(failureIndices) + .setHidden(false) + .setSystem(false) + .build(); + + assertTrue(dataStream.containsIndex(dsIndexMetadata.getIndex().getName())); + assertTrue(dataStream.containsIndex(fsIndexMetadata.getIndex().getName())); + + Metadata.Builder clusterMetadata = new Metadata.Builder(); + clusterMetadata.put(dataStream); + clusterMetadata.put(dsIndexMetadata, true); + clusterMetadata.put(fsIndexMetadata, true); + + ClusterState clusterState = ClusterState.builder(new ClusterName("system-index-metadata-upgrade-service-tests")) + .metadata(clusterMetadata.build()) + .customs(Map.of()) + .build(); + + service.submitUpdateTask(Collections.emptyList(), Collections.singletonList(dataStream)); + // Execute a metadata upgrade task on the initial cluster state + ClusterState newState = executeTask(clusterState); + + DataStream updatedDataStream = newState.metadata().dataStreams().get(dataStream.getName()); + assertThat(updatedDataStream.isSystem(), equalTo(true)); + assertThat(updatedDataStream.isHidden(), equalTo(true)); + + IndexMetadata updatedIndexMetadata = newState.metadata().index(dsIndexMetadata.getIndex().getName()); + assertThat(updatedIndexMetadata.isSystem(), equalTo(true)); + assertThat(updatedIndexMetadata.isHidden(), equalTo(true)); + + IndexMetadata updatedFailstoreMetadata = newState.metadata().index(fsIndexMetadata.getIndex().getName()); + assertThat(updatedFailstoreMetadata.isSystem(), equalTo(true)); + assertThat(updatedFailstoreMetadata.isHidden(), equalTo(true)); + } + /** * If a system index erroneously is set to visible, we should remedy that situation. */ @@ -209,7 +314,7 @@ public void testIsVisible() { assertThat(service.requiresUpdate(systemVisibleIndex), equalTo(true)); } - private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMetadata) throws Exception { + private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMetadata) { assertTrue("Metadata should require update but does not", service.requiresUpdate(hiddenIndexMetadata)); Metadata.Builder clusterMetadata = new Metadata.Builder(); clusterMetadata.put(IndexMetadata.builder(hiddenIndexMetadata)); @@ -219,8 +324,9 @@ private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMe .customs(Map.of()) .build(); + service.submitUpdateTask(Collections.singletonList(hiddenIndexMetadata.getIndex()), Collections.emptyList()); // Get a metadata upgrade task and execute it on the initial cluster state - ClusterState newState = service.getTask().execute(clusterState); + ClusterState newState = executeTask(clusterState); IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME); assertThat(result.isSystem(), equalTo(true)); @@ -237,8 +343,9 @@ private void assertSystemUpgradeHidesAlias(IndexMetadata visibleAliasMetadata) t .customs(Map.of()) .build(); + service.submitUpdateTask(Collections.singletonList(visibleAliasMetadata.getIndex()), Collections.emptyList()); // Get a metadata upgrade task and execute it on the initial cluster state - ClusterState newState = service.getTask().execute(clusterState); + ClusterState newState = executeTask(clusterState); IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME); assertThat(result.isSystem(), equalTo(true));