Skip to content

Commit 4ef97b4

Browse files
Change way how indices backing data streams are handled
1 parent 9a4ee2c commit 4ef97b4

File tree

3 files changed

+136
-54
lines changed

3 files changed

+136
-54
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java

Lines changed: 110 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
import org.elasticsearch.threadpool.ThreadPool;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.Map;
30+
import java.util.Objects;
31+
import java.util.stream.Stream;
2932

3033
/**
3134
* A service responsible for updating the metadata used by system indices.
@@ -41,47 +44,57 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
4144

4245
private volatile boolean updateTaskPending = false;
4346

44-
private volatile long triggeredVersion = -1L;
45-
4647
public SystemIndexMetadataUpgradeService(SystemIndices systemIndices, ClusterService clusterService) {
4748
this.systemIndices = systemIndices;
4849
this.clusterService = clusterService;
4950
}
5051

5152
@Override
5253
public void clusterChanged(ClusterChangedEvent event) {
54+
Metadata currentMetadata = event.state().metadata();
55+
Metadata previousMetadata = event.previousState().metadata();
5356
if (updateTaskPending == false
5457
&& event.localNodeMaster()
5558
&& (event.previousState().nodes().isLocalNodeElectedMaster() == false
56-
|| event.state().metadata().indices() != event.previousState().metadata().indices())) {
57-
final Map<String, IndexMetadata> indexMetadataMap = event.state().metadata().indices();
58-
final var previousIndices = event.previousState().metadata().indices();
59-
final long triggerV = event.state().version();
60-
triggeredVersion = triggerV;
59+
|| currentMetadata.indices() != previousMetadata.indices()
60+
|| currentMetadata.dataStreams() != previousMetadata.dataStreams())) {
61+
final Map<String, IndexMetadata> indexMetadataMap = currentMetadata.indices();
62+
final var previousIndices = previousMetadata.indices();
63+
Map<String, DataStream> dataStreams = currentMetadata.dataStreams();
64+
Map<String, DataStream> previousDataStreams = previousMetadata.dataStreams();
6165
// Fork to the management pool to avoid blocking the cluster applier thread unnecessarily for very large index counts
6266
// TODO: we should have a more efficient way of getting just the changed indices so that we don't have to fork here
6367
clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
6468
@Override
6569
protected void doRun() {
66-
if (triggeredVersion != triggerV) {
67-
// don't run if another newer check task was triggered already
68-
return;
69-
}
7070
for (Map.Entry<String, IndexMetadata> cursor : indexMetadataMap.entrySet()) {
7171
if (cursor.getValue() != previousIndices.get(cursor.getKey())) {
7272
IndexMetadata indexMetadata = cursor.getValue();
7373
if (requiresUpdate(indexMetadata)) {
74-
updateTaskPending = true;
75-
submitUnbatchedTask(
76-
"system_index_metadata_upgrade_service {system metadata change}",
77-
new SystemIndexMetadataUpdateTask()
78-
);
74+
submitUpdateTask();
75+
break;
76+
}
77+
}
78+
}
79+
for (Map.Entry<String, DataStream> cursor : dataStreams.entrySet()) {
80+
if (cursor.getValue() != previousDataStreams.get(cursor.getKey())) {
81+
DataStream dataStream = cursor.getValue();
82+
if (requiresUpdate(dataStream)) {
83+
submitUpdateTask();
7984
break;
8085
}
8186
}
8287
}
8388
}
8489

90+
private void submitUpdateTask() {
91+
updateTaskPending = true;
92+
submitUnbatchedTask(
93+
"system_index_metadata_upgrade_service {system metadata change}",
94+
new SystemIndexMetadataUpdateTask()
95+
);
96+
}
97+
8598
@Override
8699
public void onFailure(Exception e) {
87100
logger.error("unexpected exception on checking for metadata upgrades", e);
@@ -107,15 +120,34 @@ boolean requiresUpdate(IndexMetadata indexMetadata) {
107120
return false;
108121
}
109122

123+
// package-private for testing
124+
boolean requiresUpdate(DataStream dataStream) {
125+
final boolean shouldBeSystem = shouldBeSystem(dataStream);
126+
127+
// should toggle system index status
128+
if (shouldBeSystem != dataStream.isSystem()) {
129+
return true;
130+
}
131+
132+
if (shouldBeSystem) {
133+
return dataStream.isHidden() == false;
134+
}
135+
136+
return false;
137+
}
138+
139+
private boolean shouldBeSystem(DataStream dataStream) {
140+
return systemIndices.isSystemDataStream(dataStream.getName());
141+
}
142+
110143
// package-private for testing
111144
static boolean isVisible(IndexMetadata indexMetadata) {
112145
return indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_INDEX_HIDDEN, false) == false;
113146
}
114147

115148
// package-private for testing
116149
boolean shouldBeSystem(IndexMetadata indexMetadata) {
117-
return systemIndices.isSystemIndex(indexMetadata.getIndex())
118-
|| systemIndices.isSystemIndexBackingDataStream(indexMetadata.getIndex().getName());
150+
return systemIndices.isSystemIndex(indexMetadata.getIndex());
119151
}
120152

121153
// package-private for testing
@@ -139,11 +171,13 @@ private class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask {
139171
public ClusterState execute(ClusterState currentState) {
140172
List<IndexMetadata> updatedMetadata = updateIndices(currentState);
141173
List<DataStream> updatedDataStreams = updateDataStreams(currentState);
174+
List<IndexMetadata> updatedBackingIndices = updateIndicesBackingDataStreams(currentState, updatedDataStreams);
142175

143176
if (updatedMetadata.isEmpty() == false || updatedDataStreams.isEmpty() == false) {
144177
Metadata.Builder builder = Metadata.builder(currentState.metadata());
145178
updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true));
146179
updatedDataStreams.forEach(builder::put);
180+
updatedBackingIndices.forEach(idxMeta -> builder.put(idxMeta, true));
147181

148182
return ClusterState.builder(currentState).metadata(builder).build();
149183
}
@@ -156,43 +190,49 @@ private List<IndexMetadata> updateIndices(ClusterState currentState) {
156190
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
157191
final IndexMetadata indexMetadata = entry.getValue();
158192
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
159-
IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata);
160-
boolean updated = false;
161-
if (shouldBeSystem != indexMetadata.isSystem()) {
162-
builder.system(indexMetadata.isSystem() == false);
163-
updated = true;
164-
}
165-
if (shouldBeSystem && isVisible(indexMetadata)) {
166-
builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true));
167-
builder.settingsVersion(builder.settingsVersion() + 1);
168-
updated = true;
193+
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
194+
if (updatedIndexMetadata != null) {
195+
updatedMetadata.add(updatedIndexMetadata);
169196
}
170-
if (shouldBeSystem && hasVisibleAlias(indexMetadata)) {
171-
for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) {
172-
if (Boolean.FALSE.equals(aliasMetadata.isHidden())) {
173-
builder.removeAlias(aliasMetadata.alias());
174-
builder.putAlias(
175-
AliasMetadata.builder(aliasMetadata.alias())
176-
.filter(aliasMetadata.filter())
177-
.indexRouting(aliasMetadata.indexRouting())
178-
.isHidden(true)
179-
.searchRouting(aliasMetadata.searchRouting())
180-
.writeIndex(aliasMetadata.writeIndex())
181-
);
182-
}
197+
}
198+
return updatedMetadata;
199+
}
200+
201+
private IndexMetadata updateIndexIfNecessary(IndexMetadata indexMetadata, boolean shouldBeSystem) {
202+
IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata);
203+
boolean updated = false;
204+
if (shouldBeSystem != indexMetadata.isSystem()) {
205+
builder.system(indexMetadata.isSystem() == false);
206+
updated = true;
207+
}
208+
if (shouldBeSystem && isVisible(indexMetadata)) {
209+
builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true));
210+
builder.settingsVersion(builder.settingsVersion() + 1);
211+
updated = true;
212+
}
213+
if (shouldBeSystem && hasVisibleAlias(indexMetadata)) {
214+
for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) {
215+
if (Boolean.FALSE.equals(aliasMetadata.isHidden())) {
216+
builder.removeAlias(aliasMetadata.alias());
217+
builder.putAlias(
218+
AliasMetadata.builder(aliasMetadata.alias())
219+
.filter(aliasMetadata.filter())
220+
.indexRouting(aliasMetadata.indexRouting())
221+
.isHidden(true)
222+
.searchRouting(aliasMetadata.searchRouting())
223+
.writeIndex(aliasMetadata.writeIndex())
224+
);
225+
updated = true;
183226
}
184227
}
185-
if (updated) {
186-
updatedMetadata.add(builder.build());
187-
}
188228
}
189-
return updatedMetadata;
229+
return updated ? builder.build() : null;
190230
}
191231

192232
private List<DataStream> updateDataStreams(ClusterState currentState) {
193233
List<DataStream> updatedDataStreams = new ArrayList<>();
194234
for (DataStream dataStream : currentState.getMetadata().dataStreams().values()) {
195-
boolean shouldBeSystem = systemIndices.isSystemDataStream(dataStream.getName());
235+
boolean shouldBeSystem = shouldBeSystem(dataStream);
196236
if (dataStream.isSystem() != shouldBeSystem) {
197237
DataStream.Builder dataStreamBuilder = dataStream.copy().setSystem(shouldBeSystem);
198238
if (shouldBeSystem) {
@@ -205,6 +245,30 @@ private List<DataStream> updateDataStreams(ClusterState currentState) {
205245
return updatedDataStreams;
206246
}
207247

248+
private List<IndexMetadata> updateIndicesBackingDataStreams(ClusterState currentState, List<DataStream> updatedDataStreams) {
249+
if (updatedDataStreams.isEmpty()) {
250+
return Collections.emptyList();
251+
}
252+
Metadata metadata = currentState.metadata();
253+
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
254+
255+
for (DataStream updatedDataStream : updatedDataStreams) {
256+
boolean shouldBeSystem = updatedDataStream.isSystem();
257+
List<IndexMetadata> updatedIndicesMetadata = getIndicesBackingDataStreamMetadata(metadata, updatedDataStream)
258+
.map(idx -> updateIndexIfNecessary(idx, shouldBeSystem))
259+
.filter(Objects::nonNull)
260+
.toList();
261+
262+
updatedMetadata.addAll(updatedIndicesMetadata);
263+
}
264+
return updatedMetadata;
265+
}
266+
267+
private Stream<IndexMetadata> getIndicesBackingDataStreamMetadata(Metadata metadata, DataStream dataStream) {
268+
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream())
269+
.map(metadata::index);
270+
}
271+
208272
@Override
209273
public void onFailure(Exception e) {
210274
updateTaskPending = false;

server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@
99

1010
package org.elasticsearch.indices;
1111

12-
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
1312
import org.elasticsearch.cluster.metadata.ComponentTemplate;
1413
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
1514
import org.elasticsearch.cluster.metadata.DataStream;
15+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1616
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.index.Index;
1718

19+
import java.util.Collections;
1820
import java.util.List;
1921
import java.util.Map;
2022
import java.util.Objects;
21-
22-
import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton;
23+
import java.util.stream.Stream;
2324

2425
/**
2526
* Describes a {@link DataStream} that is reserved for use by a system feature.
@@ -53,7 +54,6 @@ public class SystemDataStreamDescriptor {
5354
private final Map<String, ComponentTemplate> componentTemplates;
5455
private final List<String> allowedElasticProductOrigins;
5556
private final ExecutorNames executorNames;
56-
private final CharacterRunAutomaton characterRunAutomaton;
5757

5858
/**
5959
* Creates a new descriptor for a system data descriptor
@@ -96,8 +96,6 @@ public SystemDataStreamDescriptor(
9696
throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination");
9797
}
9898
this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;
99-
100-
this.characterRunAutomaton = new CharacterRunAutomaton(buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName)));
10199
}
102100

103101
public String getDataStreamName() {
@@ -110,7 +108,13 @@ public String getDataStreamName() {
110108
* @return List of names of backing indices
111109
*/
112110
public List<String> getBackingIndexNames(Metadata metadata) {
113-
return metadata.indices().keySet().stream().filter(this.characterRunAutomaton::run).toList();
111+
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
112+
if (dataStream == null) {
113+
return Collections.emptyList();
114+
}
115+
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream())
116+
.map(Index::getName)
117+
.toList();
114118
}
115119

116120
public String getDescription() {

server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase {
5454

5555
private static final String SYSTEM_DATA_STREAM_NAME = ".my-ds";
5656
private static final String SYSTEM_DATA_STREAM_INDEX_NAME = DataStream.BACKING_INDEX_PREFIX + SYSTEM_DATA_STREAM_NAME + "-1";
57+
private static final String SYSTEM_DATA_STREAM_FAILSTORE_NAME = DataStream.FAILURE_STORE_PREFIX + SYSTEM_DATA_STREAM_NAME;
5758
private static final SystemDataStreamDescriptor SYSTEM_DATA_STREAM_DESCRIPTOR = new SystemDataStreamDescriptor(
5859
SYSTEM_DATA_STREAM_NAME,
5960
"System datastream for test",
@@ -105,17 +106,26 @@ public void testUpgradeDataStreamToSystemDataStream() {
105106
.system(false)
106107
.settings(getSettingsBuilder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true))
107108
.build();
109+
IndexMetadata fsIndexMetadata = IndexMetadata.builder(SYSTEM_DATA_STREAM_FAILSTORE_NAME)
110+
.system(false)
111+
.settings(getSettingsBuilder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true))
112+
.build();
113+
DataStream.DataStreamIndices failureIndices = DataStream.DataStreamIndices
114+
.failureIndicesBuilder(Collections.singletonList(fsIndexMetadata.getIndex()))
115+
.build();
108116
DataStream dataStream = DataStream.builder(SYSTEM_DATA_STREAM_NAME, Collections.singletonList(dsIndexMetadata.getIndex()))
117+
.setFailureIndices(failureIndices)
109118
.setHidden(false)
110119
.setSystem(false)
111120
.build();
112121

113122
assertTrue(dataStream.containsIndex(dsIndexMetadata.getIndex().getName()));
114-
assertTrue("Metadata should require update but does not", service.requiresUpdate(dsIndexMetadata));
123+
assertTrue(dataStream.containsIndex(fsIndexMetadata.getIndex().getName()));
115124

116125
Metadata.Builder clusterMetadata = new Metadata.Builder();
117126
clusterMetadata.put(dataStream);
118127
clusterMetadata.put(dsIndexMetadata, true);
128+
clusterMetadata.put(fsIndexMetadata, true);
119129

120130
ClusterState clusterState = ClusterState.builder(new ClusterName("system-index-metadata-upgrade-service-tests"))
121131
.metadata(clusterMetadata.build())
@@ -132,6 +142,10 @@ public void testUpgradeDataStreamToSystemDataStream() {
132142
IndexMetadata updatedIndexMetadata = newState.metadata().index(dsIndexMetadata.getIndex().getName());
133143
assertThat(updatedIndexMetadata.isSystem(), equalTo(true));
134144
assertThat(updatedIndexMetadata.isHidden(), equalTo(true));
145+
146+
IndexMetadata updatedFailstoreMetadata = newState.metadata().index(fsIndexMetadata.getIndex().getName());
147+
assertThat(updatedFailstoreMetadata.isSystem(), equalTo(true));
148+
assertThat(updatedFailstoreMetadata.isHidden(), equalTo(true));
135149
}
136150

137151
/**

0 commit comments

Comments
 (0)