Skip to content

Commit 4095a58

Browse files
committed
Allow for passing several reserved state chunks in single process
1 parent 0b83425 commit 4095a58

File tree

2 files changed

+253
-4
lines changed

2 files changed

+253
-4
lines changed

server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.routing.RerouteService;
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.Priority;
25+
import org.elasticsearch.common.util.set.Sets;
2526
import org.elasticsearch.core.FixForMultiProject;
2627
import org.elasticsearch.core.Tuple;
2728
import org.elasticsearch.env.BuildVersion;
@@ -187,7 +188,7 @@ public void process(
187188
process(namespace, stateChunk, versionCheck, errorListener);
188189
}
189190

190-
ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
191+
public ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
191192
try {
192193
return stateChunkParser.apply(parser, null);
193194
} catch (Exception e) {
@@ -377,6 +378,7 @@ public void onFailure(Exception e) {
377378
* @param projectId the project state to modify
378379
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
379380
* @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process
381+
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
380382
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
381383
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
382384
*/
@@ -387,17 +389,43 @@ public void process(
387389
ReservedStateVersionCheck versionCheck,
388390
Consumer<Exception> errorListener
389391
) {
390-
Map<String, Object> reservedState = reservedStateChunk.state();
391-
ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();
392+
process(projectId, namespace, List.of(reservedStateChunk), versionCheck, errorListener);
393+
}
392394

395+
/**
396+
* Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} by combining several chunks
397+
* into one
398+
*
399+
* @param projectId the project state to modify
400+
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
401+
* @param reservedStateChunks a list of {@link ReservedStateChunk} composite state objects to process
402+
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
403+
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
404+
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
405+
*/
406+
public void process(
407+
ProjectId projectId,
408+
String namespace,
409+
List<ReservedStateChunk> reservedStateChunks,
410+
ReservedStateVersionCheck versionCheck,
411+
Consumer<Exception> errorListener
412+
) {
413+
ReservedStateChunk reservedStateChunk;
414+
ReservedStateVersion reservedStateVersion;
393415
LinkedHashSet<String> orderedHandlers;
416+
394417
try {
418+
reservedStateChunk = reservedStateChunks.size() == 1
419+
? reservedStateChunks.getFirst()
420+
: mergeReservedStateChunks(reservedStateChunks);
421+
Map<String, Object> reservedState = reservedStateChunk.state();
422+
reservedStateVersion = reservedStateChunk.metadata();
395423
orderedHandlers = orderedProjectStateHandlers(reservedState.keySet());
396424
} catch (Exception e) {
397425
ErrorState errorState = new ErrorState(
398426
projectId,
399427
namespace,
400-
reservedStateVersion.version(),
428+
reservedStateChunks.getFirst().metadata().version(),
401429
versionCheck,
402430
e,
403431
ReservedStateErrorMetadata.ErrorKind.PARSING
@@ -476,6 +504,32 @@ public void onFailure(Exception e) {
476504
);
477505
}
478506

507+
private static ReservedStateChunk mergeReservedStateChunks(List<ReservedStateChunk> chunks) {
508+
if (chunks.isEmpty()) {
509+
throw new IllegalArgumentException("No chunks provided");
510+
}
511+
512+
ReservedStateVersion reservedStateVersion = chunks.getFirst().metadata();
513+
Map<String, Object> mergedChunks = new HashMap<>(chunks.size());
514+
for (var chunk : chunks) {
515+
Set<String> duplicateKeys = Sets.intersection(chunk.state().keySet(), mergedChunks.keySet());
516+
if (chunk.metadata().equals(reservedStateVersion) == false) {
517+
throw new IllegalStateException(
518+
"Failed to merge reserved state chunks because of version mismatch: ["
519+
+ reservedStateVersion
520+
+ "] != ["
521+
+ chunk.metadata()
522+
+ "]"
523+
);
524+
} else if (duplicateKeys.isEmpty() == false) {
525+
throw new IllegalStateException("Failed to merge reserved state chunks because of duplicate keys: " + duplicateKeys);
526+
}
527+
mergedChunks.putAll(chunk.state());
528+
}
529+
530+
return new ReservedStateChunk(mergedChunks, reservedStateVersion);
531+
}
532+
479533
// package private for testing
480534
Exception checkAndReportError(
481535
Optional<ProjectId> projectId,
@@ -518,6 +572,11 @@ void updateErrorState(ErrorState errorState) {
518572
return;
519573
}
520574

575+
if (errorState.projectId().isPresent() && clusterService.state().metadata().hasProject(errorState.projectId().get()) == false) {
576+
// Can't update error state for a project that doesn't exist yet
577+
return;
578+
}
579+
521580
submitErrorUpdateTask(errorState);
522581
}
523582

server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.routing.RerouteService;
2525
import org.elasticsearch.cluster.service.ClusterService;
2626
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
27+
import org.elasticsearch.common.Strings;
2728
import org.elasticsearch.common.settings.ClusterSettings;
2829
import org.elasticsearch.common.settings.Settings;
2930
import org.elasticsearch.core.Releasable;
@@ -41,6 +42,7 @@
4142
import org.mockito.ArgumentMatchers;
4243

4344
import java.io.IOException;
45+
import java.util.ArrayList;
4446
import java.util.Collection;
4547
import java.util.LinkedHashSet;
4648
import java.util.List;
@@ -60,13 +62,15 @@
6062
import static org.hamcrest.Matchers.is;
6163
import static org.hamcrest.Matchers.not;
6264
import static org.hamcrest.Matchers.notNullValue;
65+
import static org.hamcrest.Matchers.nullValue;
6366
import static org.hamcrest.Matchers.sameInstance;
6467
import static org.hamcrest.Matchers.startsWith;
6568
import static org.mockito.ArgumentMatchers.any;
6669
import static org.mockito.ArgumentMatchers.anyString;
6770
import static org.mockito.Mockito.doNothing;
6871
import static org.mockito.Mockito.doReturn;
6972
import static org.mockito.Mockito.mock;
73+
import static org.mockito.Mockito.never;
7074
import static org.mockito.Mockito.spy;
7175
import static org.mockito.Mockito.times;
7276
import static org.mockito.Mockito.verify;
@@ -76,6 +80,20 @@
7680

7781
public class ReservedClusterStateServiceTests extends ESTestCase {
7882

83+
private static final String TEST_CHUNK_TEMPLATE = """
84+
{
85+
"metadata": {
86+
"version": "%s",
87+
"compatibility": "8.4.0"
88+
},
89+
"state": {
90+
"%s": {
91+
"nothing": "useful"
92+
}
93+
}
94+
}
95+
""";
96+
7997
@SuppressWarnings("unchecked")
8098
private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mockTaskQueue() {
8199
return (MasterServiceTaskQueue<T>) mock(MasterServiceTaskQueue.class);
@@ -349,6 +367,178 @@ public void success(Runnable onPublicationSuccess) {
349367
verify(rerouteService, times(1)).reroute(anyString(), any(), any());
350368
}
351369

370+
public void testUpdateErrorStateNonExistingProject() {
371+
ClusterService clusterService = mock(ClusterService.class);
372+
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
373+
when(clusterService.state()).thenReturn(state);
374+
375+
ReservedClusterStateService service = new ReservedClusterStateService(
376+
clusterService,
377+
mock(RerouteService.class),
378+
List.of(),
379+
List.of()
380+
);
381+
382+
ErrorState error = new ErrorState(
383+
Optional.of(randomUniqueProjectId()),
384+
"namespace",
385+
2L,
386+
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
387+
List.of("error"),
388+
ReservedStateErrorMetadata.ErrorKind.TRANSIENT
389+
);
390+
service.updateErrorState(error);
391+
verify(clusterService, never()).createTaskQueue(any(), any(), any());
392+
}
393+
394+
public void testProcessMultipleChunks() throws Exception {
395+
ClusterService clusterService = mock(ClusterService.class);
396+
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
397+
final ClusterName clusterName = new ClusterName("elasticsearch");
398+
399+
ClusterState state = ClusterState.builder(clusterName).build();
400+
ProjectId projectId = randomProjectIdOrDefault();
401+
state = setupProject(state, Optional.of(projectId));
402+
when(clusterService.state()).thenReturn(state);
403+
404+
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
405+
List<ReservedStateChunk> chunks = new ArrayList<>();
406+
407+
String[] randomStateKeys = generateRandomStringArray(randomIntBetween(5, 10), randomIntBetween(10, 15), false);
408+
409+
List<ReservedClusterStateHandler<ProjectMetadata, ?>> projectHandlers = new ArrayList<>();
410+
for (var key : randomStateKeys) {
411+
projectHandlers.add(spy(new TestProjectStateHandler(key)));
412+
}
413+
414+
ReservedClusterStateService controller = new ReservedClusterStateService(
415+
clusterService,
416+
mock(RerouteService.class),
417+
List.of(),
418+
projectHandlers
419+
);
420+
421+
for (var testHandler : randomStateKeys) {
422+
String testChunkJSON = Strings.format(TEST_CHUNK_TEMPLATE, 1, testHandler);
423+
try (
424+
XContentParser chunkParser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testChunkJSON);
425+
) {
426+
chunks.add(controller.parse(projectId, "test", chunkParser));
427+
}
428+
}
429+
430+
controller.process(
431+
projectId,
432+
"test",
433+
chunks,
434+
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
435+
exceptionRef::set
436+
);
437+
438+
assertThat(exceptionRef.get(), nullValue());
439+
440+
for (var projectHandler : projectHandlers) {
441+
verify(projectHandler, times(1)).transform(any(), any());
442+
}
443+
}
444+
445+
public void testProcessMultipleChunksVersionMismatch() throws Exception {
446+
ClusterService clusterService = mock(ClusterService.class);
447+
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
448+
final ClusterName clusterName = new ClusterName("elasticsearch");
449+
450+
ClusterState state = ClusterState.builder(clusterName).build();
451+
ProjectId projectId = randomProjectIdOrDefault();
452+
state = setupProject(state, Optional.of(projectId));
453+
when(clusterService.state()).thenReturn(state);
454+
455+
String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test1");
456+
String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 2, "test2");
457+
458+
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
459+
List<ReservedStateChunk> chunks = new ArrayList<>();
460+
461+
TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test1"));
462+
TestProjectStateHandler projectStateHandler2 = spy(new TestProjectStateHandler("test2"));
463+
464+
ReservedClusterStateService controller = new ReservedClusterStateService(
465+
clusterService,
466+
mock(RerouteService.class),
467+
List.of(),
468+
List.of(projectStateHandler1, projectStateHandler2)
469+
);
470+
471+
try (
472+
XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
473+
XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
474+
) {
475+
chunks.add(controller.parse(projectId, "test", chunkParser1));
476+
chunks.add(controller.parse(projectId, "test", chunkParser2));
477+
}
478+
479+
controller.process(
480+
projectId,
481+
"test",
482+
chunks,
483+
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
484+
exceptionRef::set
485+
);
486+
487+
assertThat(exceptionRef.get(), notNullValue());
488+
assertThat(exceptionRef.get().getMessage(), containsString("Failed to merge reserved state chunks because of version mismatch: ["));
489+
verify(projectStateHandler1, times(0)).transform(any(), any());
490+
verify(projectStateHandler2, times(0)).transform(any(), any());
491+
}
492+
493+
public void testProcessMultipleChunksDuplicateKeys() throws Exception {
494+
ClusterService clusterService = mock(ClusterService.class);
495+
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
496+
final ClusterName clusterName = new ClusterName("elasticsearch");
497+
498+
ClusterState state = ClusterState.builder(clusterName).build();
499+
ProjectId projectId = randomProjectIdOrDefault();
500+
state = setupProject(state, Optional.of(projectId));
501+
when(clusterService.state()).thenReturn(state);
502+
503+
String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
504+
String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
505+
506+
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
507+
List<ReservedStateChunk> chunks = new ArrayList<>();
508+
509+
TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test"));
510+
511+
ReservedClusterStateService controller = new ReservedClusterStateService(
512+
clusterService,
513+
mock(RerouteService.class),
514+
List.of(),
515+
List.of(projectStateHandler1)
516+
);
517+
518+
try (
519+
XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
520+
XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
521+
) {
522+
chunks.add(controller.parse(projectId, "test", chunkParser1));
523+
chunks.add(controller.parse(projectId, "test", chunkParser2));
524+
}
525+
526+
controller.process(
527+
projectId,
528+
"test",
529+
chunks,
530+
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
531+
exceptionRef::set
532+
);
533+
534+
assertThat(exceptionRef.get(), notNullValue());
535+
assertThat(
536+
exceptionRef.get().getMessage(),
537+
containsString("Failed to merge reserved state chunks because of duplicate keys: [test]")
538+
);
539+
verify(projectStateHandler1, times(0)).transform(any(), any());
540+
}
541+
352542
public void testUpdateErrorState() {
353543
ClusterService clusterService = mock(ClusterService.class);
354544
ClusterState state = ClusterState.builder(new ClusterName("test")).build();

0 commit comments

Comments
 (0)