Skip to content

Commit a7b78e0

Browse files
committed
Refactor: handler order calculations
1 parent 9c0c27a commit a7b78e0

File tree

6 files changed

+156
-130
lines changed

6 files changed

+156
-130
lines changed

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

Lines changed: 27 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,26 @@
3737

3838
import java.util.ArrayList;
3939
import java.util.HashMap;
40-
import java.util.LinkedHashSet;
4140
import java.util.List;
4241
import java.util.Map;
4342
import java.util.Optional;
43+
import java.util.SequencedCollection;
4444
import java.util.SequencedSet;
4545
import java.util.Set;
4646
import java.util.function.Consumer;
4747
import java.util.function.Function;
4848
import java.util.stream.Collectors;
4949
import java.util.stream.Stream;
5050

51+
import static java.util.Collections.unmodifiableMap;
5152
import static org.elasticsearch.ExceptionsHelper.stackTrace;
5253
import static org.elasticsearch.cluster.metadata.ReservedStateMetadata.EMPTY_VERSION;
5354
import static org.elasticsearch.core.Strings.format;
5455
import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.checkErrorVersion;
5556
import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.isNewError;
5657
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
5758
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.keysForHandler;
59+
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.orderedStateHandlers;
5860

5961
/**
6062
* Controller class for storing and reserving a portion of the {@link ClusterState}
@@ -302,9 +304,9 @@ public void process(
302304
Map<String, Object> reservedState = reservedStateChunk.state();
303305
ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();
304306

305-
SequencedSet<String> orderedHandlers;
307+
SequencedSet<String> updateSequence;
306308
try {
307-
orderedHandlers = orderedClusterStateHandlers(reservedState.keySet());
309+
updateSequence = orderedStateHandlers(reservedState.keySet(), clusterHandlers);
308310
} catch (Exception e) {
309311
ErrorState errorState = new ErrorState(
310312
namespace,
@@ -334,7 +336,7 @@ public void process(
334336
}
335337

336338
// We trial run all handler validations to ensure that we can process all of the cluster state error free.
337-
var trialRunErrors = trialRun(namespace, state, reservedStateChunk, orderedHandlers);
339+
var trialRunErrors = trialRun(namespace, state, reservedStateChunk, updateSequence);
338340
// this is not using the modified trial state above, but that doesn't matter, we're just setting errors here
339341
var error = checkAndReportError(Optional.empty(), namespace, trialRunErrors, reservedStateVersion, versionCheck);
340342

@@ -349,7 +351,7 @@ public void process(
349351
reservedStateChunk,
350352
versionCheck,
351353
clusterHandlers,
352-
orderedHandlers,
354+
updateSequence,
353355
this::updateErrorState,
354356
new ActionListener<>() {
355357
@Override
@@ -413,13 +415,13 @@ public void process(
413415
) {
414416
ReservedStateChunk reservedStateChunk;
415417
ReservedStateVersion reservedStateVersion;
416-
LinkedHashSet<String> orderedHandlers;
418+
SequencedSet<String> updateSequence;
417419

418420
try {
419421
reservedStateChunk = mergeReservedStateChunks(reservedStateChunks);
420422
Map<String, Object> reservedState = reservedStateChunk.state();
421423
reservedStateVersion = reservedStateChunk.metadata();
422-
orderedHandlers = orderedProjectStateHandlers(reservedState.keySet());
424+
updateSequence = orderedStateHandlers(reservedState.keySet(), projectHandlers);
423425
} catch (Exception e) {
424426
ErrorState errorState = new ErrorState(
425427
projectId,
@@ -462,7 +464,7 @@ public void process(
462464
}
463465

464466
// We trial run all handler validations to ensure that we can process all of the cluster state error free.
465-
var trialRunErrors = trialRun(projectId, namespace, state, reservedStateChunk, orderedHandlers);
467+
var trialRunErrors = trialRun(projectId, namespace, state, reservedStateChunk, updateSequence);
466468
// this is not using the modified trial state above, but that doesn't matter, we're just setting errors here
467469
var error = checkAndReportError(Optional.of(projectId), namespace, trialRunErrors, reservedStateVersion, versionCheck);
468470

@@ -478,7 +480,7 @@ public void process(
478480
reservedStateChunk,
479481
versionCheck,
480482
projectHandlers,
481-
orderedHandlers,
483+
updateSequence,
482484
this::updateErrorState,
483485
new ActionListener<>() {
484486
@Override
@@ -618,14 +620,14 @@ List<String> trialRun(
618620
String namespace,
619621
ClusterState currentState,
620622
ReservedStateChunk stateChunk,
621-
SequencedSet<String> orderedHandlers
623+
SequencedCollection<String> updateSequence
622624
) {
623625
return trialRun(
624626
currentState.metadata().reservedStateMetadata().get(namespace),
625627
currentState,
626628
stateChunk,
627629
clusterHandlers,
628-
orderedHandlers
630+
updateSequence
629631
);
630632
}
631633

@@ -643,15 +645,15 @@ List<String> trialRun(
643645
String namespace,
644646
ClusterState currentState,
645647
ReservedStateChunk stateChunk,
646-
SequencedSet<String> orderedHandlers
648+
SequencedCollection<String> updateSequence
647649
) {
648650
return trialRun(
649651
projectId,
650652
currentState.metadata().reservedStateMetadata().get(namespace),
651653
currentState,
652654
stateChunk,
653655
projectHandlers,
654-
orderedHandlers
656+
updateSequence
655657
);
656658
}
657659

@@ -661,13 +663,13 @@ private static List<String> trialRun(
661663
ClusterState currentState,
662664
ReservedStateChunk stateChunk,
663665
Map<String, ReservedProjectStateHandler<?>> handlers,
664-
SequencedSet<String> orderedHandlers
666+
SequencedCollection<String> updateSequence
665667
) {
666668
Map<String, Object> reservedState = stateChunk.state();
667669

668670
List<String> errors = new ArrayList<>();
669671

670-
for (var handlerName : orderedHandlers) {
672+
for (var handlerName : updateSequence) {
671673
ReservedProjectStateHandler<?> handler = handlers.get(handlerName);
672674
try {
673675
Set<String> existingKeys = keysForHandler(existingMetadata, handlerName);
@@ -691,13 +693,13 @@ private static List<String> trialRun(
691693
ClusterState currentState,
692694
ReservedStateChunk stateChunk,
693695
Map<String, ReservedClusterStateHandler<?>> handlers,
694-
SequencedSet<String> orderedHandlers
696+
SequencedCollection<String> updateSequence
695697
) {
696698
Map<String, Object> reservedState = stateChunk.state();
697699

698700
List<String> errors = new ArrayList<>();
699701

700-
for (var handlerName : orderedHandlers) {
702+
for (var handlerName : updateSequence) {
701703
ReservedClusterStateHandler<?> handler = handlers.get(handlerName);
702704
try {
703705
Set<String> existingKeys = keysForHandler(existingMetadata, handlerName);
@@ -740,84 +742,6 @@ static ClusterState remove(ReservedProjectStateHandler<?> handler, ProjectId pro
740742
return handler.remove(projectId, transformState);
741743
}
742744

743-
/**
744-
* Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to
745-
* execute for a given list of handler names supplied through the {@link ReservedStateChunk}.
746-
* @param handlerNames Names of handlers found in the {@link ReservedStateChunk}
747-
*/
748-
SequencedSet<String> orderedClusterStateHandlers(Set<String> handlerNames) {
749-
LinkedHashSet<String> orderedHandlers = new LinkedHashSet<>();
750-
LinkedHashSet<String> dependencyStack = new LinkedHashSet<>();
751-
752-
for (String key : handlerNames) {
753-
addStateHandler(clusterHandlers, key, handlerNames, orderedHandlers, dependencyStack);
754-
}
755-
756-
return orderedHandlers;
757-
}
758-
759-
/**
760-
* Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to
761-
* execute for a given list of handler names supplied through the {@link ReservedStateChunk}.
762-
* @param handlerNames Names of handlers found in the {@link ReservedStateChunk}
763-
*/
764-
LinkedHashSet<String> orderedProjectStateHandlers(Set<String> handlerNames) {
765-
LinkedHashSet<String> orderedHandlers = new LinkedHashSet<>();
766-
LinkedHashSet<String> dependencyStack = new LinkedHashSet<>();
767-
768-
for (String key : handlerNames) {
769-
addStateHandler(projectHandlers, key, handlerNames, orderedHandlers, dependencyStack);
770-
}
771-
772-
return orderedHandlers;
773-
}
774-
775-
private void addStateHandler(
776-
Map<String, ? extends ReservedStateHandler<?>> handlers,
777-
String key,
778-
Set<String> keys,
779-
SequencedSet<String> ordered,
780-
SequencedSet<String> visited
781-
) {
782-
if (visited.contains(key)) {
783-
StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: ");
784-
visited.forEach(s -> {
785-
msg.append(s);
786-
msg.append(" -> ");
787-
});
788-
msg.append(key);
789-
throw new IllegalStateException(msg.toString());
790-
}
791-
792-
if (ordered.contains(key)) {
793-
// already added by another dependent handler
794-
return;
795-
}
796-
797-
visited.add(key);
798-
ReservedStateHandler<?> handler = handlers.get(key);
799-
800-
if (handler == null) {
801-
throw new IllegalStateException("Unknown handler type: " + key);
802-
}
803-
804-
for (String dependency : handler.dependencies()) {
805-
if (keys.contains(dependency) == false) {
806-
throw new IllegalStateException("Missing handler dependency definition: " + key + " -> " + dependency);
807-
}
808-
addStateHandler(handlers, dependency, keys, ordered, visited);
809-
}
810-
811-
for (String dependency : handler.optionalDependencies()) {
812-
if (keys.contains(dependency)) {
813-
addStateHandler(handlers, dependency, keys, ordered, visited);
814-
}
815-
}
816-
817-
visited.remove(key);
818-
ordered.add(key);
819-
}
820-
821745
/**
822746
* Adds additional {@link ReservedClusterStateHandler} to the handler registry
823747
* @param handler an additional reserved state handler to be added
@@ -836,4 +760,12 @@ public void installProjectStateHandler(ReservedProjectStateHandler<?> handler) {
836760
projectHandlers.put(handler.name(), handler);
837761
clusterHandlers.put(handler.name(), adaptForDefaultProject(handler));
838762
}
763+
764+
Map<String, ReservedClusterStateHandler<?>> clusterHandlers() {
765+
return unmodifiableMap(clusterHandlers);
766+
}
767+
768+
Map<String, ReservedProjectStateHandler<?>> projectHandlers() {
769+
return unmodifiableMap(projectHandlers);
770+
}
839771
}

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateUpdateTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
1919
import org.elasticsearch.reservedstate.TransformState;
2020

21-
import java.util.Collection;
2221
import java.util.Map;
2322
import java.util.Optional;
23+
import java.util.SequencedCollection;
2424
import java.util.function.Consumer;
2525

2626
public class ReservedClusterStateUpdateTask extends ReservedStateUpdateTask<ReservedClusterStateHandler<?>> {
@@ -29,11 +29,11 @@ public ReservedClusterStateUpdateTask(
2929
ReservedStateChunk stateChunk,
3030
ReservedStateVersionCheck versionCheck,
3131
Map<String, ReservedClusterStateHandler<?>> handlers,
32-
Collection<String> orderedHandlers,
32+
SequencedCollection<String> updateSequence,
3333
Consumer<ErrorState> errorReporter,
3434
ActionListener<ActionResponse.Empty> listener
3535
) {
36-
super(namespace, stateChunk, versionCheck, handlers, orderedHandlers, errorReporter, listener);
36+
super(namespace, stateChunk, versionCheck, handlers, updateSequence, errorReporter, listener);
3737
}
3838

3939
@Override

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedProjectStateUpdateTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
2020
import org.elasticsearch.reservedstate.TransformState;
2121

22-
import java.util.Collection;
2322
import java.util.Map;
2423
import java.util.Optional;
24+
import java.util.SequencedCollection;
2525
import java.util.function.Consumer;
2626

2727
public class ReservedProjectStateUpdateTask extends ReservedStateUpdateTask<ReservedProjectStateHandler<?>> {
@@ -33,11 +33,11 @@ public ReservedProjectStateUpdateTask(
3333
ReservedStateChunk stateChunk,
3434
ReservedStateVersionCheck versionCheck,
3535
Map<String, ReservedProjectStateHandler<?>> handlers,
36-
Collection<String> orderedHandlers,
36+
SequencedCollection<String> updateSequence,
3737
Consumer<ErrorState> errorReporter,
3838
ActionListener<ActionResponse.Empty> listener
3939
) {
40-
super(namespace, stateChunk, versionCheck, handlers, orderedHandlers, errorReporter, listener);
40+
super(namespace, stateChunk, versionCheck, handlers, updateSequence, errorReporter, listener);
4141
this.projectId = projectId;
4242
}
4343

0 commit comments

Comments
 (0)