Skip to content

Commit 4da7299

Browse files
authored
Use single-task queues in ReservedClusterStateService (#118351)
* Refactor: submitUpdateTask method * Test for one task per reserved state udate; currently fails * Separate queue per task * Spotless
1 parent d745315 commit 4da7299

File tree

2 files changed

+92
-17
lines changed

2 files changed

+92
-17
lines changed

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
1919
import org.elasticsearch.cluster.routing.RerouteService;
2020
import org.elasticsearch.cluster.service.ClusterService;
21-
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2221
import org.elasticsearch.common.Priority;
2322
import org.elasticsearch.core.Tuple;
2423
import org.elasticsearch.env.BuildVersion;
@@ -61,8 +60,6 @@ public class ReservedClusterStateService {
6160

6261
final Map<String, ReservedClusterStateHandler<?>> handlers;
6362
final ClusterService clusterService;
64-
private final MasterServiceTaskQueue<ReservedStateUpdateTask> updateTaskQueue;
65-
private final MasterServiceTaskQueue<ReservedStateErrorTask> errorTaskQueue;
6663

6764
@SuppressWarnings("unchecked")
6865
private final ConstructingObjectParser<ReservedStateChunk, Void> stateChunkParser = new ConstructingObjectParser<>(
@@ -77,6 +74,8 @@ public class ReservedClusterStateService {
7774
return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]);
7875
}
7976
);
77+
private final ReservedStateUpdateTaskExecutor updateTaskExecutor;
78+
private final ReservedStateErrorTaskExecutor errorTaskExecutor;
8079

8180
/**
8281
* Controller class for saving and reserving {@link ClusterState}.
@@ -89,12 +88,8 @@ public ReservedClusterStateService(
8988
List<ReservedClusterStateHandler<?>> handlerList
9089
) {
9190
this.clusterService = clusterService;
92-
this.updateTaskQueue = clusterService.createTaskQueue(
93-
"reserved state update",
94-
Priority.URGENT,
95-
new ReservedStateUpdateTaskExecutor(rerouteService)
96-
);
97-
this.errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, new ReservedStateErrorTaskExecutor());
91+
this.updateTaskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService);
92+
this.errorTaskExecutor = new ReservedStateErrorTaskExecutor();
9893
this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity()));
9994
stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> {
10095
if (handlers.containsKey(name) == false) {
@@ -160,7 +155,7 @@ public void process(
160155
public void initEmpty(String namespace, ActionListener<ActionResponse.Empty> listener) {
161156
var missingVersion = new ReservedStateVersion(EMPTY_VERSION, BuildVersion.current());
162157
var emptyState = new ReservedStateChunk(Map.of(), missingVersion);
163-
updateTaskQueue.submitTask(
158+
submitUpdateTask(
164159
"empty initial cluster state [" + namespace + "]",
165160
new ReservedStateUpdateTask(
166161
namespace,
@@ -171,10 +166,8 @@ public void initEmpty(String namespace, ActionListener<ActionResponse.Empty> lis
171166
// error state should not be possible since there is no metadata being parsed or processed
172167
errorState -> { throw new AssertionError(); },
173168
listener
174-
),
175-
null
169+
)
176170
);
177-
178171
}
179172

180173
/**
@@ -234,15 +227,15 @@ public void process(
234227
errorListener.accept(error);
235228
return;
236229
}
237-
updateTaskQueue.submitTask(
230+
submitUpdateTask(
238231
"reserved cluster state [" + namespace + "]",
239232
new ReservedStateUpdateTask(
240233
namespace,
241234
reservedStateChunk,
242235
versionCheck,
243236
handlers,
244237
orderedHandlers,
245-
ReservedClusterStateService.this::updateErrorState,
238+
this::updateErrorState,
246239
new ActionListener<>() {
247240
@Override
248241
public void onResponse(ActionResponse.Empty empty) {
@@ -261,8 +254,7 @@ public void onFailure(Exception e) {
261254
}
262255
}
263256
}
264-
),
265-
null
257+
)
266258
);
267259
}
268260

@@ -293,6 +285,11 @@ Exception checkAndReportError(
293285
return null;
294286
}
295287

288+
void submitUpdateTask(String source, ReservedStateUpdateTask task) {
289+
var updateTaskQueue = clusterService.createTaskQueue("reserved state update", Priority.URGENT, updateTaskExecutor);
290+
updateTaskQueue.submitTask(source, task, null);
291+
}
292+
296293
// package private for testing
297294
void updateErrorState(ErrorState errorState) {
298295
// optimistic check here - the cluster state might change after this, so also need to re-check later
@@ -305,6 +302,7 @@ void updateErrorState(ErrorState errorState) {
305302
}
306303

307304
private void submitErrorUpdateTask(ErrorState errorState) {
305+
var errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, errorTaskExecutor);
308306
errorTaskQueue.submitTask(
309307
"reserved cluster state update error for [ " + errorState.namespace() + "]",
310308
new ReservedStateErrorTask(errorState, new ActionListener<>() {

server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.atomic.AtomicBoolean;
4848
import java.util.concurrent.atomic.AtomicReference;
4949
import java.util.function.Consumer;
50+
import java.util.function.LongFunction;
5051

5152
import static org.hamcrest.Matchers.anyOf;
5253
import static org.hamcrest.Matchers.contains;
@@ -67,6 +68,7 @@
6768
import static org.mockito.Mockito.spy;
6869
import static org.mockito.Mockito.times;
6970
import static org.mockito.Mockito.verify;
71+
import static org.mockito.Mockito.verifyNoInteractions;
7072
import static org.mockito.Mockito.verifyNoMoreInteractions;
7173
import static org.mockito.Mockito.when;
7274

@@ -332,6 +334,81 @@ public void testUpdateErrorState() {
332334
verifyNoMoreInteractions(errorQueue);
333335
}
334336

337+
@SuppressWarnings("unchecked")
338+
public void testOneUpdateTaskPerQueue() {
339+
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
340+
MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
341+
MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
342+
MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();
343+
344+
ClusterService clusterService = mock(ClusterService.class);
345+
when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-update tasks
346+
.thenReturn(unusedQueue);
347+
when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any()))
348+
.thenReturn(queue1, queue2, unusedQueue);
349+
when(clusterService.state()).thenReturn(state);
350+
351+
ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of());
352+
LongFunction<ReservedStateUpdateTask> update = version -> {
353+
ReservedStateUpdateTask task = spy(
354+
new ReservedStateUpdateTask(
355+
"test",
356+
new ReservedStateChunk(Map.of(), new ReservedStateVersion(version, BuildVersion.current())),
357+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
358+
Map.of(),
359+
Set.of(),
360+
errorState -> {},
361+
ActionListener.noop()
362+
)
363+
);
364+
doReturn(state).when(task).execute(any());
365+
return task;
366+
};
367+
368+
service.submitUpdateTask("test", update.apply(2L));
369+
service.submitUpdateTask("test", update.apply(3L));
370+
371+
// One task to each queue
372+
verify(queue1).submitTask(any(), any(), any());
373+
verify(queue2).submitTask(any(), any(), any());
374+
375+
// No additional unexpected tasks
376+
verifyNoInteractions(unusedQueue);
377+
}
378+
379+
@SuppressWarnings("unchecked")
380+
public void testOneErrorTaskPerQueue() {
381+
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
382+
MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
383+
MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
384+
MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();
385+
386+
ClusterService clusterService = mock(ClusterService.class);
387+
when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-error tasks
388+
.thenReturn(unusedQueue);
389+
when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any()))
390+
.thenReturn(queue1, queue2, unusedQueue);
391+
when(clusterService.state()).thenReturn(state);
392+
393+
ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of());
394+
LongFunction<ErrorState> error = version -> new ErrorState(
395+
"namespace",
396+
version,
397+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
398+
List.of("error"),
399+
ReservedStateErrorMetadata.ErrorKind.TRANSIENT
400+
);
401+
service.updateErrorState(error.apply(2));
402+
service.updateErrorState(error.apply(3));
403+
404+
// One task to each queue
405+
verify(queue1).submitTask(any(), any(), any());
406+
verify(queue2).submitTask(any(), any(), any());
407+
408+
// No additional unexpected tasks
409+
verifyNoInteractions(unusedQueue);
410+
}
411+
335412
public void testErrorStateTask() throws Exception {
336413
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
337414

0 commit comments

Comments
 (0)