-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Converting an Existing Data Stream to a System DataStream is Broken #121392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6289eb9
5e92c45
a1b6ceb
251e88b
9a4ee2c
4ef97b4
61f9c4f
1a7a5d5
19f9700
78e8059
d20b021
ead1cc8
4993fdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,14 +129,28 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String | |
| } | ||
|
|
||
| // visible for testing | ||
| SystemIndexMetadataUpdateTask getTask() { | ||
| return new SystemIndexMetadataUpdateTask(); | ||
| ClusterState executeMetadataUpdateTask(ClusterState clusterState) { | ||
| return new SystemIndexMetadataUpdateTask().execute(clusterState); | ||
| } | ||
|
|
||
| public class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask { | ||
| private class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask { | ||
|
|
||
| @Override | ||
| public ClusterState execute(ClusterState currentState) throws Exception { | ||
| public ClusterState execute(ClusterState currentState) { | ||
| List<IndexMetadata> updatedMetadata = updateIndices(currentState); | ||
| List<DataStream> updatedDataStreams = updateDataStreams(currentState); | ||
|
|
||
| if (updatedMetadata.isEmpty() == false || updatedDataStreams.isEmpty() == false) { | ||
| Metadata.Builder builder = Metadata.builder(currentState.metadata()); | ||
| updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true)); | ||
| updatedDataStreams.forEach(builder::put); | ||
|
|
||
| return ClusterState.builder(currentState).metadata(builder).build(); | ||
| } | ||
| return currentState; | ||
| } | ||
|
|
||
| private List<IndexMetadata> updateIndices(ClusterState currentState) { | ||
| final Map<String, IndexMetadata> indexMetadataMap = currentState.metadata().indices(); | ||
| final List<IndexMetadata> updatedMetadata = new ArrayList<>(); | ||
| for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) { | ||
|
|
@@ -172,13 +186,22 @@ public ClusterState execute(ClusterState currentState) throws Exception { | |
| updatedMetadata.add(builder.build()); | ||
| } | ||
| } | ||
| return updatedMetadata; | ||
| } | ||
|
|
||
| if (updatedMetadata.isEmpty() == false) { | ||
| final Metadata.Builder builder = Metadata.builder(currentState.metadata()); | ||
| updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true)); | ||
| return ClusterState.builder(currentState).metadata(builder).build(); | ||
| private List<DataStream> updateDataStreams(ClusterState currentState) { | ||
| List<DataStream> updatedDataStreams = new ArrayList<>(); | ||
| for (DataStream dataStream : currentState.getMetadata().dataStreams().values()) { | ||
| if (dataStream.isSystem() == false && systemIndices.isSystemDataStream(dataStream.getName())) { | ||
| DataStream updatedDataStream = dataStream.copy() | ||
| .setSystem(true) | ||
| .setHidden(true) | ||
| .build(); | ||
|
|
||
| updatedDataStreams.add(updatedDataStream); | ||
|
||
| } | ||
| } | ||
| return currentState; | ||
| return updatedDataStreams; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,11 +14,14 @@ | |
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.index.IndexVersion; | ||
| import org.elasticsearch.indices.ExecutorNames; | ||
| import org.elasticsearch.indices.SystemDataStreamDescriptor; | ||
| import org.elasticsearch.indices.SystemIndexDescriptor; | ||
| import org.elasticsearch.indices.SystemIndices; | ||
| import org.elasticsearch.test.ESTestCase; | ||
| import org.junit.Before; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
|
|
@@ -49,13 +52,25 @@ public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase { | |
| .setOrigin("FAKE_ORIGIN") | ||
| .build(); | ||
|
|
||
| private static final String SYSTEM_DATA_STREAM_NAME = ".my-ds"; | ||
| private static final String SYSTEM_DATA_STREAM_INDEX_NAME = DataStream.BACKING_INDEX_PREFIX + SYSTEM_DATA_STREAM_NAME + "-1"; | ||
| private static final SystemDataStreamDescriptor SYSTEM_DATA_STREAM_DESCRIPTOR = new SystemDataStreamDescriptor( | ||
| SYSTEM_DATA_STREAM_NAME, "System datastream for test", SystemDataStreamDescriptor.Type.INTERNAL, | ||
| ComposableIndexTemplate.builder().build(), Collections.emptyMap(), Collections.singletonList("FAKE_ORIGIN"), | ||
| ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS | ||
| ); | ||
|
|
||
| private SystemIndexMetadataUpgradeService service; | ||
|
|
||
| @Before | ||
| public void setUpTest() { | ||
| // set up a system index upgrade service | ||
| this.service = new SystemIndexMetadataUpgradeService( | ||
| new SystemIndices(List.of(new SystemIndices.Feature("foo", "a test feature", List.of(DESCRIPTOR)))), | ||
| new SystemIndices(List.of( | ||
| new SystemIndices.Feature("foo", "a test feature", List.of(DESCRIPTOR)), | ||
| new SystemIndices.Feature("sds", "system data stream feature", | ||
| Collections.emptyList(), Collections.singletonList(SYSTEM_DATA_STREAM_DESCRIPTOR)) | ||
| )), | ||
| mock(ClusterService.class) | ||
| ); | ||
| } | ||
|
|
@@ -75,6 +90,19 @@ public void testUpgradeVisibleIndexToSystemIndex() throws Exception { | |
| assertSystemUpgradeAppliesHiddenSetting(hiddenIndexMetadata); | ||
| } | ||
|
|
||
| public void testUpgradeDataStreamToSystemDataStream() { | ||
| IndexMetadata dsIndexMetadata = IndexMetadata.builder(SYSTEM_DATA_STREAM_INDEX_NAME) | ||
| .system(false) | ||
| .settings(getSettingsBuilder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true)) | ||
| .build(); | ||
| DataStream dataStream = DataStream.builder(SYSTEM_DATA_STREAM_NAME, Collections.singletonList(dsIndexMetadata.getIndex())) | ||
| .setHidden(false) | ||
| .setSystem(false) | ||
| .build(); | ||
|
|
||
| assertSystemUpgradeAppliesHiddenSettingForDataStream(dataStream, dsIndexMetadata); | ||
|
||
| } | ||
|
|
||
| /** | ||
| * If a system index erroneously is set to visible, we should remedy that situation. | ||
| */ | ||
|
|
@@ -209,7 +237,7 @@ public void testIsVisible() { | |
| assertThat(service.requiresUpdate(systemVisibleIndex), equalTo(true)); | ||
| } | ||
|
|
||
| private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMetadata) throws Exception { | ||
| private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMetadata) { | ||
| assertTrue("Metadata should require update but does not", service.requiresUpdate(hiddenIndexMetadata)); | ||
| Metadata.Builder clusterMetadata = new Metadata.Builder(); | ||
| clusterMetadata.put(IndexMetadata.builder(hiddenIndexMetadata)); | ||
|
|
@@ -220,13 +248,38 @@ private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMe | |
| .build(); | ||
|
|
||
| // Get a metadata upgrade task and execute it on the initial cluster state | ||
| ClusterState newState = service.getTask().execute(clusterState); | ||
| ClusterState newState = service.executeMetadataUpdateTask(clusterState); | ||
|
|
||
| IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME); | ||
| assertThat(result.isSystem(), equalTo(true)); | ||
| assertThat(result.isHidden(), equalTo(true)); | ||
| } | ||
|
|
||
| private void assertSystemUpgradeAppliesHiddenSettingForDataStream(DataStream shouldBeSystemDataStream, IndexMetadata dsIndexMetadata) { | ||
| assertTrue(shouldBeSystemDataStream.containsIndex(dsIndexMetadata.getIndex().getName())); | ||
| assertTrue("Metadata should require update but does not", service.requiresUpdate(dsIndexMetadata)); | ||
|
|
||
| Metadata.Builder clusterMetadata = new Metadata.Builder(); | ||
| clusterMetadata.put(shouldBeSystemDataStream); | ||
| clusterMetadata.put(dsIndexMetadata, true); | ||
|
|
||
| ClusterState clusterState = ClusterState.builder(new ClusterName("system-index-metadata-upgrade-service-tests")) | ||
| .metadata(clusterMetadata.build()) | ||
| .customs(Map.of()) | ||
| .build(); | ||
|
|
||
| // Execute a metadata upgrade task on the initial cluster state | ||
| ClusterState newState = service.executeMetadataUpdateTask(clusterState); | ||
|
|
||
| DataStream updatedDataStream = newState.metadata().dataStreams().get(shouldBeSystemDataStream.getName()); | ||
| assertThat(updatedDataStream.isSystem(), equalTo(true)); | ||
| assertThat(updatedDataStream.isHidden(), equalTo(true)); | ||
|
|
||
| IndexMetadata updatedIndexMetadata = newState.metadata().index(dsIndexMetadata.getIndex().getName()); | ||
| assertThat(updatedIndexMetadata.isSystem(), equalTo(true)); | ||
| assertThat(updatedIndexMetadata.isHidden(), equalTo(true)); | ||
| } | ||
|
|
||
| private void assertSystemUpgradeHidesAlias(IndexMetadata visibleAliasMetadata) throws Exception { | ||
| assertTrue("Metadata should require update but does not", service.requiresUpdate(visibleAliasMetadata)); | ||
| Metadata.Builder clusterMetadata = new Metadata.Builder(); | ||
|
|
@@ -238,7 +291,7 @@ private void assertSystemUpgradeHidesAlias(IndexMetadata visibleAliasMetadata) t | |
| .build(); | ||
|
|
||
| // Get a metadata upgrade task and execute it on the initial cluster state | ||
| ClusterState newState = service.getTask().execute(clusterState); | ||
| ClusterState newState = service.executeMetadataUpdateTask(clusterState); | ||
|
|
||
| IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME); | ||
| assertThat(result.isSystem(), equalTo(true)); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At https://github.com/elastic/elasticsearch/pull/121392/files#diff-1123b904a4743231e7321993f73a75ae603c6658b1e6acf6bf8656b69370a410L70 we currently only check for indices that should become system indices. Should we also check for data streams there?
It seems that if a data stream exists, it always has at least one backing index. If the data stream is non-system, the backing index will also be non-system. Then, when the data stream is marked as system, the index will be picked up by the task, and both will be converted to system ones. Does this approach make sense, or should we explicitly check for data streams as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, good question. For now that is the case, but we have talked about changing this one day but there are no concrete plans.
If I am not mistaken, this is just looping over one more data set, so I think it would be better to be "thorough" and avoid surprises in the future. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add check for data streams here as well. Thank you!