Skip to content

Commit 48436c6

Browse files
committed
Add and test removal logic
1 parent b99c83a commit 48436c6

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.ArrayList;
2828
import java.util.Collection;
2929
import java.util.Collections;
30+
import java.util.HashSet;
3031
import java.util.LinkedHashSet;
3132
import java.util.List;
3233
import java.util.Map;
@@ -131,7 +132,23 @@ protected ClusterState execute(final ClusterState currentState) {
131132
}
132133

133134
// Now, any existing handler not listed in updateSequence must have been removed.
134-
// TODO: Removal logic
135+
// We do removals after updates in case one of the updated handlers depends on one of these,
136+
// to give that handler a chance to clean up before its dependency vanishes.
137+
if (reservedStateMetadata != null) {
138+
Set<String> toRemove = new HashSet<>(reservedStateMetadata.handlers().keySet());
139+
toRemove.removeAll(updateSequence);
140+
var reverseRemovalSequence = List.copyOf(orderedStateHandlers(toRemove, handlers));
141+
for (var iter = reverseRemovalSequence.listIterator(reverseRemovalSequence.size()); iter.hasPrevious();) {
142+
String handlerName = iter.previous();
143+
var handler = handlers.get(handlerName);
144+
try {
145+
Set<String> existingKeys = keysForHandler(reservedStateMetadata, handlerName);
146+
state = handler.remove(new TransformState(state, existingKeys));
147+
} catch (Exception e) {
148+
errors.add(format("Error processing %s state removal: %s", handler.name(), stackTrace(e)));
149+
}
150+
}
151+
}
135152

136153
checkAndThrowOnError(errors, reservedStateVersion, versionCheck);
137154

server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.mockito.ArgumentMatchers;
3939

4040
import java.io.IOException;
41+
import java.util.ArrayList;
4142
import java.util.Collection;
4243
import java.util.LinkedHashSet;
4344
import java.util.List;
@@ -241,6 +242,138 @@ public void testInitEmptyTask() {
241242
);
242243
}
243244

245+
public void testTransformAndRemoveGetCalled() throws Exception {
246+
// TODO: Ought to do this for project state updates too.
247+
248+
// This records the calls made to the handler
249+
ArrayList<Operation> operations = new ArrayList<>();
250+
var handler = new TestStateHandler("test_cluster_state_handler") {
251+
@Override
252+
public TransformState transform(Object sourceObj, TransformState prevState) throws Exception {
253+
@SuppressWarnings("unchecked")
254+
Map<String, Object> source = (Map<String, Object>) sourceObj;
255+
operations.add(new Operation.Transform(source, prevState.keys()));
256+
return new TransformState(prevState.state(), source.keySet());
257+
}
258+
259+
@Override
260+
public ClusterState remove(TransformState prevState) {
261+
operations.add(new Operation.Remove(prevState.keys()));
262+
return prevState.state();
263+
}
264+
};
265+
266+
ClusterState state1 = ClusterState.EMPTY_STATE;
267+
268+
// 1. Add our section to the reserved state chunk
269+
ReservedStateChunk initialStateChunk = new ReservedStateChunk(
270+
Map.of("test_handler_name", Map.of("key1", "value1")),
271+
new ReservedStateVersion(1L, Version.CURRENT)
272+
);
273+
ReservedStateUpdateTask addTask = new ReservedStateUpdateTask(
274+
"test_namespace",
275+
initialStateChunk,
276+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
277+
Map.of("test_handler_name", handler),
278+
List.copyOf(initialStateChunk.state().keySet()),
279+
Assert::assertNull,
280+
ActionListener.noop()
281+
);
282+
ClusterState state2 = addTask.execute(state1);
283+
284+
assertEquals(List.of(new Operation.Transform(Map.of("key1", "value1"), Set.of())), operations);
285+
var expected2 = Map.of(
286+
"test_namespace",
287+
new ReservedStateMetadata(
288+
"test_namespace",
289+
initialStateChunk.metadata().version(),
290+
Map.of("test_handler_name", new ReservedStateHandlerMetadata("test_handler_name", Set.of("key1"))),
291+
null
292+
)
293+
);
294+
assertEquals("Our section of the reserved state has been added", expected2, state2.metadata().reservedStateMetadata());
295+
296+
operations.clear();
297+
298+
// 2. Change our section of the reserved state
299+
ReservedStateChunk changedStateChunk = new ReservedStateChunk(
300+
Map.of("test_handler_name", Map.of("key2", "value2")),
301+
new ReservedStateVersion(2L, Version.CURRENT)
302+
);
303+
ReservedStateUpdateTask changeTask = new ReservedStateUpdateTask(
304+
"test_namespace",
305+
changedStateChunk,
306+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
307+
Map.of("test_handler_name", handler),
308+
List.copyOf(changedStateChunk.state().keySet()),
309+
Assert::assertNull,
310+
ActionListener.noop()
311+
);
312+
ClusterState state3 = changeTask.execute(state2);
313+
314+
assertEquals(List.of(new Operation.Transform(Map.of("key2", "value2"), Set.of("key1"))), operations);
315+
var expected3 = Map.of(
316+
"test_namespace",
317+
new ReservedStateMetadata(
318+
"test_namespace",
319+
changedStateChunk.metadata().version(),
320+
Map.of("test_handler_name", new ReservedStateHandlerMetadata("test_handler_name", Set.of("key2"))),
321+
null
322+
)
323+
);
324+
assertEquals("Our section of the removed state is updated", expected3, state3.metadata().reservedStateMetadata());
325+
326+
operations.clear();
327+
328+
// 3. Remove our section of the state chunk
329+
ReservedStateChunk removedStateChunk = new ReservedStateChunk(Map.of(), new ReservedStateVersion(3L, Version.CURRENT));
330+
ReservedStateUpdateTask removeTask = new ReservedStateUpdateTask(
331+
"test_namespace",
332+
removedStateChunk,
333+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
334+
Map.of("test_handler_name", handler),
335+
List.copyOf(removedStateChunk.state().keySet()),
336+
Assert::assertNull,
337+
ActionListener.noop()
338+
);
339+
var state4 = removeTask.execute(state3);
340+
341+
assertEquals(List.of(new Operation.Remove(Set.of("key2"))), operations);
342+
var expected4 = Map.of(
343+
"test_namespace",
344+
new ReservedStateMetadata("test_namespace", removedStateChunk.metadata().version(), Map.of(), null)
345+
);
346+
assertEquals("Our section of the removed state is gone", expected4, state4.metadata().reservedStateMetadata());
347+
348+
operations.clear();
349+
350+
// 4. Resubmit without our section and make sure it's a no-op
351+
ReservedStateChunk stillGoneStateChunk = new ReservedStateChunk(Map.of(), new ReservedStateVersion(4L, Version.CURRENT));
352+
ReservedStateUpdateTask noopTask = new ReservedStateUpdateTask(
353+
"test_namespace",
354+
stillGoneStateChunk,
355+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
356+
Map.of("test_handler_name", handler),
357+
List.copyOf(stillGoneStateChunk.state().keySet()),
358+
Assert::assertNull,
359+
ActionListener.noop()
360+
);
361+
var state5 = noopTask.execute(state4);
362+
363+
assertEquals(List.of(), operations);
364+
var expected5 = Map.of(
365+
"test_namespace",
366+
new ReservedStateMetadata("test_namespace", stillGoneStateChunk.metadata().version(), Map.of(), null)
367+
);
368+
assertEquals("Our section of the removed state is still gone", expected5, state5.metadata().reservedStateMetadata());
369+
}
370+
371+
private sealed interface Operation {
372+
record Transform(Object source, Set<String> prevKeys) implements Operation {}
373+
374+
record Remove(Set<String> prevKeys) implements Operation {}
375+
}
376+
244377
public void testUpdateStateTasks() throws Exception {
245378
RerouteService rerouteService = mock(RerouteService.class);
246379

0 commit comments

Comments
 (0)