Skip to content

Commit 2d34177

Browse files
committed
Support reserved state to be spread accross several files
1 parent 2026ce3 commit 2d34177

File tree

1 file changed

+60
-11
lines changed

1 file changed

+60
-11
lines changed

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public void process(
187187
process(namespace, stateChunk, versionCheck, errorListener);
188188
}
189189

190-
ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
190+
public ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
191191
try {
192192
return stateChunkParser.apply(parser, null);
193193
} catch (Exception e) {
@@ -377,6 +377,7 @@ public void onFailure(Exception e) {
377377
* @param projectId the project state to modify
378378
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
379379
* @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process
380+
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
380381
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
381382
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
382383
*/
@@ -387,29 +388,56 @@ public void process(
387388
ReservedStateVersionCheck versionCheck,
388389
Consumer<Exception> errorListener
389390
) {
390-
Map<String, Object> reservedState = reservedStateChunk.state();
391-
ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();
391+
process(projectId, namespace, List.of(reservedStateChunk), versionCheck, errorListener);
392+
}
392393

394+
/**
395+
* Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser}
396+
*
397+
* @param projectId the project state to modify
398+
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
399+
* @param reservedStateChunks a {@link ReservedStateChunk} composite state object to process
400+
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
401+
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
402+
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
403+
*/
404+
public void process(
405+
ProjectId projectId,
406+
String namespace,
407+
List<ReservedStateChunk> reservedStateChunks,
408+
ReservedStateVersionCheck versionCheck,
409+
Consumer<Exception> errorListener
410+
) {
411+
ReservedStateChunk reservedStateChunk;
412+
ReservedStateVersion reservedStateVersion;
393413
LinkedHashSet<String> orderedHandlers;
414+
394415
try {
416+
reservedStateChunk = mergeReservedStateChunks(reservedStateChunks);
417+
Map<String, Object> reservedState = reservedStateChunk.state();
418+
reservedStateVersion = reservedStateChunk.metadata();
395419
orderedHandlers = orderedProjectStateHandlers(reservedState.keySet());
396420
} catch (Exception e) {
397421
ErrorState errorState = new ErrorState(
398422
projectId,
399423
namespace,
400-
reservedStateVersion.version(),
424+
reservedStateChunks.getFirst().metadata().version(),
401425
versionCheck,
402426
e,
403427
ReservedStateErrorMetadata.ErrorKind.PARSING
404428
);
405429

406-
updateErrorState(errorState);
407-
logger.debug(
408-
"error processing project [{}] change request for [{}] with the following errors [{}]",
409-
projectId,
410-
namespace,
411-
errorState
412-
);
430+
if (clusterService.state().metadata().hasProject(projectId)) {
431+
updateErrorState(errorState);
432+
logger.debug(
433+
"error processing project [{}] change request for [{}] with the following errors [{}]",
434+
projectId,
435+
namespace,
436+
errorState
437+
);
438+
} else {
439+
logger.info("Error processing project [{}] create request for [{}].", projectId, namespace);
440+
}
413441

414442
errorListener.accept(
415443
new IllegalStateException(
@@ -476,6 +504,27 @@ public void onFailure(Exception e) {
476504
);
477505
}
478506

507+
private static ReservedStateChunk mergeReservedStateChunks(List<ReservedStateChunk> chunks) {
508+
ReservedStateVersion reservedStateVersion = null;
509+
Map<String, Object> mergedChunks = new HashMap<>();
510+
for (var chunk : chunks) {
511+
if (reservedStateVersion == null) {
512+
reservedStateVersion = chunk.metadata();
513+
} else if (chunk.metadata().equals(reservedStateVersion) == false) {
514+
throw new IllegalStateException(
515+
"Failed to merge reserved state chunks because of version mismatch: ["
516+
+ reservedStateVersion
517+
+ "] != ["
518+
+ chunk.metadata()
519+
+ "]"
520+
);
521+
}
522+
mergedChunks.putAll(chunk.state());
523+
}
524+
525+
return new ReservedStateChunk(mergedChunks, reservedStateVersion);
526+
}
527+
479528
// package private for testing
480529
Exception checkAndReportError(
481530
Optional<ProjectId> projectId,

0 commit comments

Comments
 (0)