Skip to content

Commit 6289eb9

Browse files
Converting an Existing Data Stream to a System DataStream is Broken
1 parent 2e84950 commit 6289eb9

File tree

3 files changed

+97
-14
lines changed

3 files changed

+97
-14
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,13 @@ public boolean isFailureStoreIndex(String indexName) {
301301
return failureIndices.containsIndex(indexName);
302302
}
303303

304+
/**
305+
* Returns true if the index name provided belongs to this data stream.
306+
*/
307+
public boolean containsIndex(String indexName) {
308+
return backingIndices.containsIndex(indexName) || failureIndices.containsIndex(indexName);
309+
}
310+
304311
public DataStreamOptions getDataStreamOptions() {
305312
return dataStreamOptions;
306313
}
@@ -1039,7 +1046,7 @@ private boolean isIndexOlderThan(
10391046
* we return false.
10401047
*/
10411048
public boolean isIndexManagedByDataStreamLifecycle(Index index, Function<String, IndexMetadata> indexMetadataSupplier) {
1042-
if (backingIndices.containsIndex(index.getName()) == false && failureIndices.containsIndex(index.getName()) == false) {
1049+
if (containsIndex(index.getName()) == false) {
10431050
return false;
10441051
}
10451052
IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());

server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,28 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
129129
}
130130

131131
// visible for testing
132-
SystemIndexMetadataUpdateTask getTask() {
133-
return new SystemIndexMetadataUpdateTask();
132+
ClusterState executeMetadataUpdateTask(ClusterState clusterState) {
133+
return new SystemIndexMetadataUpdateTask().execute(clusterState);
134134
}
135135

136-
public class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask {
136+
private class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask {
137137

138138
@Override
139-
public ClusterState execute(ClusterState currentState) throws Exception {
139+
public ClusterState execute(ClusterState currentState) {
140+
List<IndexMetadata> updatedMetadata = updateIndices(currentState);
141+
List<DataStream> updatedDataStreams = updateDataStreams(currentState);
142+
143+
if (updatedMetadata.isEmpty() == false || updatedDataStreams.isEmpty() == false) {
144+
Metadata.Builder builder = Metadata.builder(currentState.metadata());
145+
updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true));
146+
updatedDataStreams.forEach(builder::put);
147+
148+
return ClusterState.builder(currentState).metadata(builder).build();
149+
}
150+
return currentState;
151+
}
152+
153+
private List<IndexMetadata> updateIndices(ClusterState currentState) {
140154
final Map<String, IndexMetadata> indexMetadataMap = currentState.metadata().indices();
141155
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
142156
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
@@ -172,13 +186,22 @@ public ClusterState execute(ClusterState currentState) throws Exception {
172186
updatedMetadata.add(builder.build());
173187
}
174188
}
189+
return updatedMetadata;
190+
}
175191

176-
if (updatedMetadata.isEmpty() == false) {
177-
final Metadata.Builder builder = Metadata.builder(currentState.metadata());
178-
updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true));
179-
return ClusterState.builder(currentState).metadata(builder).build();
192+
private List<DataStream> updateDataStreams(ClusterState currentState) {
193+
List<DataStream> updatedDataStreams = new ArrayList<>();
194+
for (DataStream dataStream : currentState.getMetadata().dataStreams().values()) {
195+
if (dataStream.isSystem() == false && systemIndices.isSystemDataStream(dataStream.getName())) {
196+
DataStream updatedDataStream = dataStream.copy()
197+
.setSystem(true)
198+
.setHidden(true)
199+
.build();
200+
201+
updatedDataStreams.add(updatedDataStream);
202+
}
180203
}
181-
return currentState;
204+
return updatedDataStreams;
182205
}
183206

184207
@Override

server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import org.elasticsearch.cluster.service.ClusterService;
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.index.IndexVersion;
17+
import org.elasticsearch.indices.ExecutorNames;
18+
import org.elasticsearch.indices.SystemDataStreamDescriptor;
1719
import org.elasticsearch.indices.SystemIndexDescriptor;
1820
import org.elasticsearch.indices.SystemIndices;
1921
import org.elasticsearch.test.ESTestCase;
2022
import org.junit.Before;
2123

24+
import java.util.Collections;
2225
import java.util.List;
2326
import java.util.Locale;
2427
import java.util.Map;
@@ -49,13 +52,25 @@ public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase {
4952
.setOrigin("FAKE_ORIGIN")
5053
.build();
5154

55+
private static final String SYSTEM_DATA_STREAM_NAME = ".my-ds";
56+
private static final String SYSTEM_DATA_STREAM_INDEX_NAME = DataStream.BACKING_INDEX_PREFIX + SYSTEM_DATA_STREAM_NAME + "-1";
57+
private static final SystemDataStreamDescriptor SYSTEM_DATA_STREAM_DESCRIPTOR = new SystemDataStreamDescriptor(
58+
SYSTEM_DATA_STREAM_NAME, "System datastream for test", SystemDataStreamDescriptor.Type.INTERNAL,
59+
ComposableIndexTemplate.builder().build(), Collections.emptyMap(), Collections.singletonList("FAKE_ORIGIN"),
60+
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
61+
);
62+
5263
private SystemIndexMetadataUpgradeService service;
5364

5465
@Before
5566
public void setUpTest() {
5667
// set up a system index upgrade service
5768
this.service = new SystemIndexMetadataUpgradeService(
58-
new SystemIndices(List.of(new SystemIndices.Feature("foo", "a test feature", List.of(DESCRIPTOR)))),
69+
new SystemIndices(List.of(
70+
new SystemIndices.Feature("foo", "a test feature", List.of(DESCRIPTOR)),
71+
new SystemIndices.Feature("sds", "system data stream feature",
72+
Collections.emptyList(), Collections.singletonList(SYSTEM_DATA_STREAM_DESCRIPTOR))
73+
)),
5974
mock(ClusterService.class)
6075
);
6176
}
@@ -75,6 +90,19 @@ public void testUpgradeVisibleIndexToSystemIndex() throws Exception {
7590
assertSystemUpgradeAppliesHiddenSetting(hiddenIndexMetadata);
7691
}
7792

93+
public void testUpgradeDataStreamToSystemDataStream() {
94+
IndexMetadata dsIndexMetadata = IndexMetadata.builder(SYSTEM_DATA_STREAM_INDEX_NAME)
95+
.system(false)
96+
.settings(getSettingsBuilder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true))
97+
.build();
98+
DataStream dataStream = DataStream.builder(SYSTEM_DATA_STREAM_NAME, Collections.singletonList(dsIndexMetadata.getIndex()))
99+
.setHidden(false)
100+
.setSystem(false)
101+
.build();
102+
103+
assertSystemUpgradeAppliesHiddenSettingForDataStream(dataStream, dsIndexMetadata);
104+
}
105+
78106
/**
79107
* If a system index erroneously is set to visible, we should remedy that situation.
80108
*/
@@ -209,7 +237,7 @@ public void testIsVisible() {
209237
assertThat(service.requiresUpdate(systemVisibleIndex), equalTo(true));
210238
}
211239

212-
private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMetadata) throws Exception {
240+
private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMetadata) {
213241
assertTrue("Metadata should require update but does not", service.requiresUpdate(hiddenIndexMetadata));
214242
Metadata.Builder clusterMetadata = new Metadata.Builder();
215243
clusterMetadata.put(IndexMetadata.builder(hiddenIndexMetadata));
@@ -220,13 +248,38 @@ private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMe
220248
.build();
221249

222250
// Get a metadata upgrade task and execute it on the initial cluster state
223-
ClusterState newState = service.getTask().execute(clusterState);
251+
ClusterState newState = service.executeMetadataUpdateTask(clusterState);
224252

225253
IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME);
226254
assertThat(result.isSystem(), equalTo(true));
227255
assertThat(result.isHidden(), equalTo(true));
228256
}
229257

258+
private void assertSystemUpgradeAppliesHiddenSettingForDataStream(DataStream shouldBeSystemDataStream, IndexMetadata dsIndexMetadata) {
259+
assertTrue(shouldBeSystemDataStream.containsIndex(dsIndexMetadata.getIndex().getName()));
260+
assertTrue("Metadata should require update but does not", service.requiresUpdate(dsIndexMetadata));
261+
262+
Metadata.Builder clusterMetadata = new Metadata.Builder();
263+
clusterMetadata.put(shouldBeSystemDataStream);
264+
clusterMetadata.put(dsIndexMetadata, true);
265+
266+
ClusterState clusterState = ClusterState.builder(new ClusterName("system-index-metadata-upgrade-service-tests"))
267+
.metadata(clusterMetadata.build())
268+
.customs(Map.of())
269+
.build();
270+
271+
// Execute a metadata upgrade task on the initial cluster state
272+
ClusterState newState = service.executeMetadataUpdateTask(clusterState);
273+
274+
DataStream updatedDataStream = newState.metadata().dataStreams().get(shouldBeSystemDataStream.getName());
275+
assertThat(updatedDataStream.isSystem(), equalTo(true));
276+
assertThat(updatedDataStream.isHidden(), equalTo(true));
277+
278+
IndexMetadata updatedIndexMetadata = newState.metadata().index(dsIndexMetadata.getIndex().getName());
279+
assertThat(updatedIndexMetadata.isSystem(), equalTo(true));
280+
assertThat(updatedIndexMetadata.isHidden(), equalTo(true));
281+
}
282+
230283
private void assertSystemUpgradeHidesAlias(IndexMetadata visibleAliasMetadata) throws Exception {
231284
assertTrue("Metadata should require update but does not", service.requiresUpdate(visibleAliasMetadata));
232285
Metadata.Builder clusterMetadata = new Metadata.Builder();
@@ -238,7 +291,7 @@ private void assertSystemUpgradeHidesAlias(IndexMetadata visibleAliasMetadata) t
238291
.build();
239292

240293
// Get a metadata upgrade task and execute it on the initial cluster state
241-
ClusterState newState = service.getTask().execute(clusterState);
294+
ClusterState newState = service.executeMetadataUpdateTask(clusterState);
242295

243296
IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME);
244297
assertThat(result.isSystem(), equalTo(true));

0 commit comments

Comments
 (0)