Skip to content

Commit 78e8059

Browse files
Migrate SystemIndexMetadataUpgradeService from unbatched tasks to ClusterStateTaskExecutor and fix tests
1 parent 19f9700 commit 78e8059

File tree

3 files changed

+143
-67
lines changed

3 files changed

+143
-67
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -789,8 +789,9 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
789789
// ensure that no aliases reference index
790790
ensureNoAliasesOnIndex(clusterMetadata, index);
791791

792-
List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
793-
backingIndices.add(0, index);
792+
List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices.size() + 1);
793+
backingIndices.add(index);
794+
backingIndices.addAll(this.backingIndices.indices);
794795
assert backingIndices.size() == this.backingIndices.indices.size() + 1;
795796
return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
796797
.setGeneration(generation + 1)
@@ -815,8 +816,9 @@ public DataStream addFailureStoreIndex(Metadata clusterMetadata, Index index) {
815816

816817
ensureNoAliasesOnIndex(clusterMetadata, index);
817818

818-
List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices);
819-
updatedFailureIndices.add(0, index);
819+
List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices.size() + 1);
820+
updatedFailureIndices.add(index);
821+
updatedFailureIndices.addAll(failureIndices.indices);
820822
assert updatedFailureIndices.size() == failureIndices.indices.size() + 1;
821823
return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).build())
822824
.setGeneration(generation + 1)

server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java

Lines changed: 97 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,27 @@
1414
import org.elasticsearch.cluster.ClusterChangedEvent;
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.cluster.ClusterStateListener;
17-
import org.elasticsearch.cluster.ClusterStateUpdateTask;
17+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
18+
import org.elasticsearch.cluster.ClusterStateTaskListener;
1819
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
21+
import org.elasticsearch.common.Priority;
1922
import org.elasticsearch.common.settings.Settings;
2023
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
21-
import org.elasticsearch.core.SuppressForbidden;
24+
import org.elasticsearch.index.Index;
2225
import org.elasticsearch.indices.SystemIndexMappingUpdateService;
2326
import org.elasticsearch.indices.SystemIndices;
2427
import org.elasticsearch.threadpool.ThreadPool;
2528

2629
import java.util.ArrayList;
30+
import java.util.Collection;
2731
import java.util.Collections;
32+
import java.util.HashSet;
2833
import java.util.List;
2934
import java.util.Map;
3035
import java.util.Objects;
36+
import java.util.Set;
37+
import java.util.stream.Collectors;
3138
import java.util.stream.Stream;
3239

3340
/**
@@ -41,20 +48,20 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
4148

4249
private final SystemIndices systemIndices;
4350
private final ClusterService clusterService;
44-
45-
private volatile boolean updateTaskPending = false;
51+
private final MasterServiceTaskQueue<SystemIndexMetadataUpgradeTask> taskQueue;
4652

4753
public SystemIndexMetadataUpgradeService(SystemIndices systemIndices, ClusterService clusterService) {
4854
this.systemIndices = systemIndices;
4955
this.clusterService = clusterService;
56+
this.taskQueue = clusterService.createTaskQueue("system-indices-metadata-upgrade", Priority.NORMAL,
57+
new SystemIndexMetadataUpgradeExecutor());
5058
}
5159

5260
@Override
5361
public void clusterChanged(ClusterChangedEvent event) {
5462
Metadata currentMetadata = event.state().metadata();
5563
Metadata previousMetadata = event.previousState().metadata();
56-
if (updateTaskPending == false
57-
&& event.localNodeMaster()
64+
if (event.localNodeMaster()
5865
&& (event.previousState().nodes().isLocalNodeElectedMaster() == false
5966
|| currentMetadata.indices() != previousMetadata.indices()
6067
|| currentMetadata.dataStreams() != previousMetadata.dataStreams())) {
@@ -67,32 +74,33 @@ public void clusterChanged(ClusterChangedEvent event) {
6774
clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
6875
@Override
6976
protected void doRun() {
70-
for (Map.Entry<String, IndexMetadata> cursor : indexMetadataMap.entrySet()) {
71-
if (cursor.getValue() != previousIndices.get(cursor.getKey())) {
72-
IndexMetadata indexMetadata = cursor.getValue();
73-
if (requiresUpdate(indexMetadata)) {
74-
submitUpdateTask();
75-
return;
77+
Collection<DataStream> changedDataStreams = new ArrayList<>();
78+
Set<Index> dataStreamIndices = new HashSet<>();
79+
for (Map.Entry<String, DataStream> cursor : dataStreams.entrySet()) {
80+
DataStream dataStream = cursor.getValue();
81+
if (dataStream != previousDataStreams.get(cursor.getKey())) {
82+
if (requiresUpdate(dataStream)) {
83+
changedDataStreams.add(dataStream);
7684
}
7785
}
86+
87+
getIndicesBackingDataStream(dataStream).forEach(dataStreamIndices::add);
7888
}
79-
for (Map.Entry<String, DataStream> cursor : dataStreams.entrySet()) {
80-
if (cursor.getValue() != previousDataStreams.get(cursor.getKey())) {
81-
DataStream dataStream = cursor.getValue();
82-
if (requiresUpdate(dataStream)) {
83-
submitUpdateTask();
84-
return;
89+
90+
Collection<Index> changedIndices = new ArrayList<>();
91+
for (Map.Entry<String, IndexMetadata> cursor : indexMetadataMap.entrySet()) {
92+
IndexMetadata indexMetadata = cursor.getValue();
93+
Index index = indexMetadata.getIndex();
94+
if (cursor.getValue() != previousIndices.get(cursor.getKey()) && dataStreamIndices.contains(index) == false) {
95+
if (requiresUpdate(indexMetadata)) {
96+
changedIndices.add(index);
8597
}
8698
}
8799
}
88-
}
89100

90-
private void submitUpdateTask() {
91-
updateTaskPending = true;
92-
submitUnbatchedTask(
93-
"system_index_metadata_upgrade_service {system metadata change}",
94-
new SystemIndexMetadataUpdateTask()
95-
);
101+
if (changedIndices.isEmpty() == false || changedDataStreams.isEmpty() == false) {
102+
submitUpdateTask(changedIndices, changedDataStreams);
103+
}
96104
}
97105

98106
@Override
@@ -104,6 +112,12 @@ public void onFailure(Exception e) {
104112
}
105113
}
106114

115+
// visible for testing
116+
void submitUpdateTask(Collection<Index> changedIndices, Collection<DataStream> changedDataStreams) {
117+
SystemIndexMetadataUpgradeTask task = new SystemIndexMetadataUpgradeTask(changedIndices, changedDataStreams);
118+
taskQueue.submitTask("system-index-metadata-upgrade-service", task, null);
119+
}
120+
107121
// package-private for testing
108122
boolean requiresUpdate(IndexMetadata indexMetadata) {
109123
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
@@ -140,6 +154,10 @@ private boolean shouldBeSystem(DataStream dataStream) {
140154
return systemIndices.isSystemDataStream(dataStream.getName());
141155
}
142156

157+
private static Stream<Index> getIndicesBackingDataStream(DataStream dataStream) {
158+
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream());
159+
}
160+
143161
// package-private for testing
144162
static boolean isVisible(IndexMetadata indexMetadata) {
145163
return indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_INDEX_HIDDEN, false) == false;
@@ -155,40 +173,68 @@ static boolean hasVisibleAlias(IndexMetadata indexMetadata) {
155173
return indexMetadata.getAliases().values().stream().anyMatch(a -> Boolean.FALSE.equals(a.isHidden()));
156174
}
157175

158-
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
159-
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
160-
clusterService.submitUnbatchedStateUpdateTask(source, task);
161-
}
176+
private record SystemIndexMetadataUpgradeTask(Collection<Index> changedIndices,
177+
Collection<DataStream> changedDataStreams) implements ClusterStateTaskListener {
162178

163-
// visible for testing
164-
ClusterState executeMetadataUpdateTask(ClusterState clusterState) {
165-
return new SystemIndexMetadataUpdateTask().execute(clusterState);
166-
}
179+
@Override
180+
public void onFailure(Exception e) {
181+
logger.error("System index metadata upgrade failed", e);
182+
}
167183

168-
private class SystemIndexMetadataUpdateTask extends ClusterStateUpdateTask {
184+
@Override
185+
public String toString() {
186+
return "SystemIndexMetadataUpgradeTask[changedIndices="
187+
+ changedIndices.stream().map(Index::getName).collect(Collectors.joining(","))
188+
+ ";changedDataStreams="
189+
+ changedDataStreams.stream().map(DataStream::getName).collect(Collectors.joining(","))
190+
+ "]";
191+
}
192+
}
169193

194+
private class SystemIndexMetadataUpgradeExecutor implements ClusterStateTaskExecutor<SystemIndexMetadataUpgradeTask> {
170195
@Override
171-
public ClusterState execute(ClusterState currentState) {
172-
List<IndexMetadata> updatedMetadata = updateIndices(currentState);
173-
List<DataStream> updatedDataStreams = updateDataStreams(currentState);
174-
List<IndexMetadata> updatedBackingIndices = updateIndicesBackingDataStreams(currentState, updatedDataStreams);
196+
public ClusterState execute(BatchExecutionContext<SystemIndexMetadataUpgradeTask> batchExecutionContext) {
197+
ClusterState initialState = batchExecutionContext.initialState();
198+
199+
List<? extends TaskContext<SystemIndexMetadataUpgradeTask>> taskContexts = batchExecutionContext.taskContexts();
200+
List<Index> indices = taskContexts.stream()
201+
.map(TaskContext::getTask)
202+
.map(SystemIndexMetadataUpgradeTask::changedIndices)
203+
.flatMap(Collection::stream)
204+
.toList();
205+
List<IndexMetadata> updatedMetadata = updateIndices(initialState, indices);
206+
207+
List<DataStream> dataStreams = taskContexts.stream()
208+
.map(TaskContext::getTask)
209+
.map(SystemIndexMetadataUpgradeTask::changedDataStreams)
210+
.flatMap(Collection::stream)
211+
.toList();
212+
List<DataStream> updatedDataStreams = updateDataStreams(dataStreams);
213+
List<IndexMetadata> updatedBackingIndices = updateIndicesBackingDataStreams(initialState, updatedDataStreams);
214+
215+
for (TaskContext<SystemIndexMetadataUpgradeTask> taskContext : taskContexts) {
216+
taskContext.success(() -> {});
217+
}
175218

176219
if (updatedMetadata.isEmpty() == false || updatedDataStreams.isEmpty() == false) {
177-
Metadata.Builder builder = Metadata.builder(currentState.metadata());
220+
Metadata.Builder builder = Metadata.builder(initialState.metadata());
178221
updatedMetadata.forEach(idxMeta -> builder.put(idxMeta, true));
179222
updatedDataStreams.forEach(builder::put);
180223
updatedBackingIndices.forEach(idxMeta -> builder.put(idxMeta, true));
181224

182-
return ClusterState.builder(currentState).metadata(builder).build();
225+
return ClusterState.builder(initialState).metadata(builder).build();
183226
}
184-
return currentState;
227+
return initialState;
185228
}
186229

187-
private List<IndexMetadata> updateIndices(ClusterState currentState) {
188-
final Map<String, IndexMetadata> indexMetadataMap = currentState.metadata().indices();
230+
private List<IndexMetadata> updateIndices(ClusterState currentState, List<Index> indices) {
231+
if (indices.isEmpty()) {
232+
return Collections.emptyList();
233+
}
234+
Metadata metadata = currentState.metadata();
189235
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
190-
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
191-
final IndexMetadata indexMetadata = entry.getValue();
236+
for (Index index : indices) {
237+
IndexMetadata indexMetadata = metadata.index(index);
192238
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
193239
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
194240
if (updatedIndexMetadata != null) {
@@ -229,9 +275,12 @@ private IndexMetadata updateIndexIfNecessary(IndexMetadata indexMetadata, boolea
229275
return updated ? builder.build() : null;
230276
}
231277

232-
private List<DataStream> updateDataStreams(ClusterState currentState) {
278+
private List<DataStream> updateDataStreams(List<DataStream> dataStreams) {
279+
if (dataStreams.isEmpty()) {
280+
return Collections.emptyList();
281+
}
233282
List<DataStream> updatedDataStreams = new ArrayList<>();
234-
for (DataStream dataStream : currentState.getMetadata().dataStreams().values()) {
283+
for (DataStream dataStream : dataStreams) {
235284
boolean shouldBeSystem = shouldBeSystem(dataStream);
236285
if (dataStream.isSystem() != shouldBeSystem) {
237286
DataStream.Builder dataStreamBuilder = dataStream.copy().setSystem(shouldBeSystem);
@@ -264,18 +313,7 @@ private List<IndexMetadata> updateIndicesBackingDataStreams(ClusterState current
264313
}
265314

266315
private Stream<IndexMetadata> getIndicesBackingDataStreamMetadata(Metadata metadata, DataStream dataStream) {
267-
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(metadata::index);
268-
}
269-
270-
@Override
271-
public void onFailure(Exception e) {
272-
updateTaskPending = false;
273-
logger.error("failed to update system index metadata", e);
274-
}
275-
276-
@Override
277-
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
278-
updateTaskPending = false;
316+
return getIndicesBackingDataStream(dataStream).map(metadata::index);
279317
}
280318
}
281319
}

server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111

1212
import org.elasticsearch.cluster.ClusterName;
1313
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
15+
import org.elasticsearch.cluster.ClusterStateTaskListener;
1416
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
18+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
19+
import org.elasticsearch.common.Priority;
1520
import org.elasticsearch.common.settings.Settings;
1621
import org.elasticsearch.index.IndexVersion;
1722
import org.elasticsearch.indices.ExecutorNames;
@@ -27,7 +32,11 @@
2732
import java.util.Map;
2833

2934
import static org.hamcrest.Matchers.equalTo;
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.eq;
37+
import static org.mockito.Mockito.doAnswer;
3038
import static org.mockito.Mockito.mock;
39+
import static org.mockito.Mockito.when;
3140

3241
public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase {
3342

@@ -66,10 +75,26 @@ public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase {
6675
);
6776

6877
private SystemIndexMetadataUpgradeService service;
78+
private ClusterStateTaskListener task;
79+
private ClusterStateTaskExecutor<ClusterStateTaskListener> executor;
6980

81+
@SuppressWarnings("unchecked")
7082
@Before
7183
public void setUpTest() {
7284
// set up a system index upgrade service
85+
ClusterService clusterService = mock(ClusterService.class);
86+
MasterServiceTaskQueue<ClusterStateTaskListener> queue = mock(MasterServiceTaskQueue.class);
87+
when(clusterService.createTaskQueue(eq("system-indices-metadata-upgrade"), eq(Priority.NORMAL), any()))
88+
.thenAnswer(invocation -> {
89+
executor = invocation.getArgument(2, ClusterStateTaskExecutor.class);
90+
return queue;
91+
});
92+
doAnswer(invocation -> {
93+
task = invocation.getArgument(1, ClusterStateTaskListener.class);
94+
return null;
95+
}).when(queue)
96+
.submitTask(any(), any(), any());
97+
7398
this.service = new SystemIndexMetadataUpgradeService(
7499
new SystemIndices(
75100
List.of(
@@ -82,10 +107,18 @@ public void setUpTest() {
82107
)
83108
)
84109
),
85-
mock(ClusterService.class)
110+
clusterService
86111
);
87112
}
88113

114+
private ClusterState executeTask(ClusterState clusterState) {
115+
try {
116+
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(clusterState, executor, Collections.singletonList(task));
117+
} catch (Exception e) {
118+
throw new RuntimeException(e);
119+
}
120+
}
121+
89122
/**
90123
* When we upgrade Elasticsearch versions, existing indices may be newly
91124
* defined as system indices. If such indices are set without "hidden," we need
@@ -132,8 +165,9 @@ public void testUpgradeDataStreamToSystemDataStream() {
132165
.customs(Map.of())
133166
.build();
134167

168+
service.submitUpdateTask(Collections.emptyList(), Collections.singletonList(dataStream));
135169
// Execute a metadata upgrade task on the initial cluster state
136-
ClusterState newState = service.executeMetadataUpdateTask(clusterState);
170+
ClusterState newState = executeTask(clusterState);
137171

138172
DataStream updatedDataStream = newState.metadata().dataStreams().get(dataStream.getName());
139173
assertThat(updatedDataStream.isSystem(), equalTo(true));
@@ -292,8 +326,9 @@ private void assertSystemUpgradeAppliesHiddenSetting(IndexMetadata hiddenIndexMe
292326
.customs(Map.of())
293327
.build();
294328

329+
service.submitUpdateTask(Collections.singletonList(hiddenIndexMetadata.getIndex()), Collections.emptyList());
295330
// Get a metadata upgrade task and execute it on the initial cluster state
296-
ClusterState newState = service.executeMetadataUpdateTask(clusterState);
331+
ClusterState newState = executeTask(clusterState);
297332

298333
IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME);
299334
assertThat(result.isSystem(), equalTo(true));
@@ -310,8 +345,9 @@ private void assertSystemUpgradeHidesAlias(IndexMetadata visibleAliasMetadata) t
310345
.customs(Map.of())
311346
.build();
312347

348+
service.submitUpdateTask(Collections.singletonList(visibleAliasMetadata.getIndex()), Collections.emptyList());
313349
// Get a metadata upgrade task and execute it on the initial cluster state
314-
ClusterState newState = service.executeMetadataUpdateTask(clusterState);
350+
ClusterState newState = executeTask(clusterState);
315351

316352
IndexMetadata result = newState.metadata().index(SYSTEM_INDEX_NAME);
317353
assertThat(result.isSystem(), equalTo(true));

0 commit comments

Comments
 (0)