Skip to content

Commit 7183f18

Browse files
jfredenomricohenn
authored andcommitted
Allow passing several reserved state chunks in single process call (elastic#124574)
This PR overloads the `process` method and allows it to be used with several `ReservedStateChunks`. The purpose is to allow several state chunks to be spread across several files but handled as a single cluster state update by validating and merging them into a single representation of the `ReservedStateChunk`.
1 parent c01bf85 commit 7183f18

File tree

3 files changed

+260
-4
lines changed

3 files changed

+260
-4
lines changed

docs/changelog/124574.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124574
2+
summary: Allow passing several reserved state chunks in single process call
3+
area: Infra/Settings
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.routing.RerouteService;
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.Priority;
25+
import org.elasticsearch.common.util.set.Sets;
2526
import org.elasticsearch.core.FixForMultiProject;
2627
import org.elasticsearch.core.Tuple;
2728
import org.elasticsearch.env.BuildVersion;
@@ -187,7 +188,7 @@ public void process(
187188
process(namespace, stateChunk, versionCheck, errorListener);
188189
}
189190

190-
ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
191+
public ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
191192
try {
192193
return stateChunkParser.apply(parser, null);
193194
} catch (Exception e) {
@@ -377,6 +378,7 @@ public void onFailure(Exception e) {
377378
* @param projectId the project state to modify
378379
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
379380
* @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process
381+
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
380382
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
381383
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
382384
*/
@@ -387,17 +389,41 @@ public void process(
387389
ReservedStateVersionCheck versionCheck,
388390
Consumer<Exception> errorListener
389391
) {
390-
Map<String, Object> reservedState = reservedStateChunk.state();
391-
ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();
392+
process(projectId, namespace, List.of(reservedStateChunk), versionCheck, errorListener);
393+
}
392394

395+
/**
396+
* Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} by combining several chunks
397+
* into one
398+
*
399+
* @param projectId the project state to modify
400+
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
401+
* @param reservedStateChunks a list of {@link ReservedStateChunk} composite state objects to process
402+
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
403+
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
404+
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
405+
*/
406+
public void process(
407+
ProjectId projectId,
408+
String namespace,
409+
List<ReservedStateChunk> reservedStateChunks,
410+
ReservedStateVersionCheck versionCheck,
411+
Consumer<Exception> errorListener
412+
) {
413+
ReservedStateChunk reservedStateChunk;
414+
ReservedStateVersion reservedStateVersion;
393415
LinkedHashSet<String> orderedHandlers;
416+
394417
try {
418+
reservedStateChunk = mergeReservedStateChunks(reservedStateChunks);
419+
Map<String, Object> reservedState = reservedStateChunk.state();
420+
reservedStateVersion = reservedStateChunk.metadata();
395421
orderedHandlers = orderedProjectStateHandlers(reservedState.keySet());
396422
} catch (Exception e) {
397423
ErrorState errorState = new ErrorState(
398424
projectId,
399425
namespace,
400-
reservedStateVersion.version(),
426+
reservedStateChunks.getFirst().metadata().version(),
401427
versionCheck,
402428
e,
403429
ReservedStateErrorMetadata.ErrorKind.PARSING
@@ -476,6 +502,36 @@ public void onFailure(Exception e) {
476502
);
477503
}
478504

505+
private static ReservedStateChunk mergeReservedStateChunks(List<ReservedStateChunk> chunks) {
506+
if (chunks.isEmpty()) {
507+
throw new IllegalArgumentException("No chunks provided");
508+
}
509+
510+
if (chunks.size() == 1) {
511+
return chunks.getFirst();
512+
}
513+
514+
ReservedStateVersion reservedStateVersion = chunks.getFirst().metadata();
515+
Map<String, Object> mergedChunks = new HashMap<>(chunks.size());
516+
for (var chunk : chunks) {
517+
Set<String> duplicateKeys = Sets.intersection(chunk.state().keySet(), mergedChunks.keySet());
518+
if (chunk.metadata().equals(reservedStateVersion) == false) {
519+
throw new IllegalStateException(
520+
"Failed to merge reserved state chunks because of version mismatch: ["
521+
+ reservedStateVersion
522+
+ "] != ["
523+
+ chunk.metadata()
524+
+ "]"
525+
);
526+
} else if (duplicateKeys.isEmpty() == false) {
527+
throw new IllegalStateException("Failed to merge reserved state chunks because of duplicate keys: " + duplicateKeys);
528+
}
529+
mergedChunks.putAll(chunk.state());
530+
}
531+
532+
return new ReservedStateChunk(mergedChunks, reservedStateVersion);
533+
}
534+
479535
// package private for testing
480536
Exception checkAndReportError(
481537
Optional<ProjectId> projectId,
@@ -518,6 +574,11 @@ void updateErrorState(ErrorState errorState) {
518574
return;
519575
}
520576

577+
if (errorState.projectId().isPresent() && clusterService.state().metadata().hasProject(errorState.projectId().get()) == false) {
578+
// Can't update error state for a project that doesn't exist yet
579+
return;
580+
}
581+
521582
submitErrorUpdateTask(errorState);
522583
}
523584

server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.routing.RerouteService;
2525
import org.elasticsearch.cluster.service.ClusterService;
2626
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
27+
import org.elasticsearch.common.Strings;
2728
import org.elasticsearch.common.settings.ClusterSettings;
2829
import org.elasticsearch.common.settings.Settings;
2930
import org.elasticsearch.core.Releasable;
@@ -41,6 +42,7 @@
4142
import org.mockito.ArgumentMatchers;
4243

4344
import java.io.IOException;
45+
import java.util.ArrayList;
4446
import java.util.Collection;
4547
import java.util.LinkedHashSet;
4648
import java.util.List;
@@ -60,13 +62,15 @@
6062
import static org.hamcrest.Matchers.is;
6163
import static org.hamcrest.Matchers.not;
6264
import static org.hamcrest.Matchers.notNullValue;
65+
import static org.hamcrest.Matchers.nullValue;
6366
import static org.hamcrest.Matchers.sameInstance;
6467
import static org.hamcrest.Matchers.startsWith;
6568
import static org.mockito.ArgumentMatchers.any;
6669
import static org.mockito.ArgumentMatchers.anyString;
6770
import static org.mockito.Mockito.doNothing;
6871
import static org.mockito.Mockito.doReturn;
6972
import static org.mockito.Mockito.mock;
73+
import static org.mockito.Mockito.never;
7074
import static org.mockito.Mockito.spy;
7175
import static org.mockito.Mockito.times;
7276
import static org.mockito.Mockito.verify;
@@ -76,6 +80,20 @@
7680

7781
public class ReservedClusterStateServiceTests extends ESTestCase {
7882

83+
private static final String TEST_CHUNK_TEMPLATE = """
84+
{
85+
"metadata": {
86+
"version": "%s",
87+
"compatibility": "8.4.0"
88+
},
89+
"state": {
90+
"%s": {
91+
"nothing": "useful"
92+
}
93+
}
94+
}
95+
""";
96+
7997
@SuppressWarnings("unchecked")
8098
private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mockTaskQueue() {
8199
return (MasterServiceTaskQueue<T>) mock(MasterServiceTaskQueue.class);
@@ -349,6 +367,178 @@ public void success(Runnable onPublicationSuccess) {
349367
verify(rerouteService, times(1)).reroute(anyString(), any(), any());
350368
}
351369

370+
public void testUpdateErrorStateNonExistingProject() {
371+
ClusterService clusterService = mock(ClusterService.class);
372+
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
373+
when(clusterService.state()).thenReturn(state);
374+
375+
ReservedClusterStateService service = new ReservedClusterStateService(
376+
clusterService,
377+
mock(RerouteService.class),
378+
List.of(),
379+
List.of()
380+
);
381+
382+
ErrorState error = new ErrorState(
383+
Optional.of(randomUniqueProjectId()),
384+
"namespace",
385+
2L,
386+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
387+
List.of("error"),
388+
ReservedStateErrorMetadata.ErrorKind.TRANSIENT
389+
);
390+
service.updateErrorState(error);
391+
verify(clusterService, never()).createTaskQueue(any(), any(), any());
392+
}
393+
394+
public void testProcessMultipleChunks() throws Exception {
395+
ClusterService clusterService = mock(ClusterService.class);
396+
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
397+
final ClusterName clusterName = new ClusterName("elasticsearch");
398+
399+
ClusterState state = ClusterState.builder(clusterName).build();
400+
ProjectId projectId = randomProjectIdOrDefault();
401+
state = setupProject(state, Optional.of(projectId));
402+
when(clusterService.state()).thenReturn(state);
403+
404+
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
405+
List<ReservedStateChunk> chunks = new ArrayList<>();
406+
407+
String[] randomStateKeys = generateRandomStringArray(randomIntBetween(5, 10), randomIntBetween(10, 15), false);
408+
409+
List<ReservedClusterStateHandler<ProjectMetadata, ?>> projectHandlers = new ArrayList<>();
410+
for (var key : randomStateKeys) {
411+
projectHandlers.add(spy(new TestProjectStateHandler(key)));
412+
}
413+
414+
ReservedClusterStateService controller = new ReservedClusterStateService(
415+
clusterService,
416+
mock(RerouteService.class),
417+
List.of(),
418+
projectHandlers
419+
);
420+
421+
for (var testHandler : randomStateKeys) {
422+
String testChunkJSON = Strings.format(TEST_CHUNK_TEMPLATE, 1, testHandler);
423+
try (
424+
XContentParser chunkParser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testChunkJSON);
425+
) {
426+
chunks.add(controller.parse(projectId, "test", chunkParser));
427+
}
428+
}
429+
430+
controller.process(
431+
projectId,
432+
"test",
433+
chunks,
434+
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
435+
exceptionRef::set
436+
);
437+
438+
assertThat(exceptionRef.get(), nullValue());
439+
440+
for (var projectHandler : projectHandlers) {
441+
verify(projectHandler, times(1)).transform(any(), any());
442+
}
443+
}
444+
445+
public void testProcessMultipleChunksVersionMismatch() throws Exception {
446+
ClusterService clusterService = mock(ClusterService.class);
447+
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
448+
final ClusterName clusterName = new ClusterName("elasticsearch");
449+
450+
ClusterState state = ClusterState.builder(clusterName).build();
451+
ProjectId projectId = randomProjectIdOrDefault();
452+
state = setupProject(state, Optional.of(projectId));
453+
when(clusterService.state()).thenReturn(state);
454+
455+
String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test1");
456+
String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 2, "test2");
457+
458+
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
459+
List<ReservedStateChunk> chunks = new ArrayList<>();
460+
461+
TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test1"));
462+
TestProjectStateHandler projectStateHandler2 = spy(new TestProjectStateHandler("test2"));
463+
464+
ReservedClusterStateService controller = new ReservedClusterStateService(
465+
clusterService,
466+
mock(RerouteService.class),
467+
List.of(),
468+
List.of(projectStateHandler1, projectStateHandler2)
469+
);
470+
471+
try (
472+
XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
473+
XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
474+
) {
475+
chunks.add(controller.parse(projectId, "test", chunkParser1));
476+
chunks.add(controller.parse(projectId, "test", chunkParser2));
477+
}
478+
479+
controller.process(
480+
projectId,
481+
"test",
482+
chunks,
483+
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
484+
exceptionRef::set
485+
);
486+
487+
assertThat(exceptionRef.get(), notNullValue());
488+
assertThat(exceptionRef.get().getMessage(), containsString("Failed to merge reserved state chunks because of version mismatch: ["));
489+
verify(projectStateHandler1, times(0)).transform(any(), any());
490+
verify(projectStateHandler2, times(0)).transform(any(), any());
491+
}
492+
493+
public void testProcessMultipleChunksDuplicateKeys() throws Exception {
494+
ClusterService clusterService = mock(ClusterService.class);
495+
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
496+
final ClusterName clusterName = new ClusterName("elasticsearch");
497+
498+
ClusterState state = ClusterState.builder(clusterName).build();
499+
ProjectId projectId = randomProjectIdOrDefault();
500+
state = setupProject(state, Optional.of(projectId));
501+
when(clusterService.state()).thenReturn(state);
502+
503+
String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
504+
String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
505+
506+
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
507+
List<ReservedStateChunk> chunks = new ArrayList<>();
508+
509+
TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test"));
510+
511+
ReservedClusterStateService controller = new ReservedClusterStateService(
512+
clusterService,
513+
mock(RerouteService.class),
514+
List.of(),
515+
List.of(projectStateHandler1)
516+
);
517+
518+
try (
519+
XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
520+
XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
521+
) {
522+
chunks.add(controller.parse(projectId, "test", chunkParser1));
523+
chunks.add(controller.parse(projectId, "test", chunkParser2));
524+
}
525+
526+
controller.process(
527+
projectId,
528+
"test",
529+
chunks,
530+
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
531+
exceptionRef::set
532+
);
533+
534+
assertThat(exceptionRef.get(), notNullValue());
535+
assertThat(
536+
exceptionRef.get().getMessage(),
537+
containsString("Failed to merge reserved state chunks because of duplicate keys: [test]")
538+
);
539+
verify(projectStateHandler1, times(0)).transform(any(), any());
540+
}
541+
352542
public void testUpdateErrorState() {
353543
ClusterService clusterService = mock(ClusterService.class);
354544
ClusterState state = ClusterState.builder(new ClusterName("test")).build();

0 commit comments

Comments
 (0)