diff --git a/docs/changelog/124574.yaml b/docs/changelog/124574.yaml new file mode 100644 index 0000000000000..32e49b4cfd9c4 --- /dev/null +++ b/docs/changelog/124574.yaml @@ -0,0 +1,5 @@ +pr: 124574 +summary: Allow passing several reserved state chunks in single process call +area: Infra/Settings +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index a05fc4cbd834b..d33a61e1a3bfd 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.BuildVersion; @@ -187,7 +188,7 @@ public void process( process(namespace, stateChunk, versionCheck, errorListener); } - ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) { + public ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) { try { return stateChunkParser.apply(parser, null); } catch (Exception e) { @@ -377,6 +378,7 @@ public void onFailure(Exception e) { * @param projectId the project state to modify * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata * @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process + * @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the * cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version. */ @@ -387,17 +389,41 @@ public void process( ReservedStateVersionCheck versionCheck, Consumer errorListener ) { - Map reservedState = reservedStateChunk.state(); - ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata(); + process(projectId, namespace, List.of(reservedStateChunk), versionCheck, errorListener); + } + /** + * Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} by combining several chunks + * into one + * + * @param projectId the project state to modify + * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata + * @param reservedStateChunks a list of {@link ReservedStateChunk} composite state objects to process + * @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions + * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the + * cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version. + */ + public void process( + ProjectId projectId, + String namespace, + List reservedStateChunks, + ReservedStateVersionCheck versionCheck, + Consumer errorListener + ) { + ReservedStateChunk reservedStateChunk; + ReservedStateVersion reservedStateVersion; LinkedHashSet orderedHandlers; + try { + reservedStateChunk = mergeReservedStateChunks(reservedStateChunks); + Map reservedState = reservedStateChunk.state(); + reservedStateVersion = reservedStateChunk.metadata(); orderedHandlers = orderedProjectStateHandlers(reservedState.keySet()); } catch (Exception e) { ErrorState errorState = new ErrorState( projectId, namespace, - reservedStateVersion.version(), + reservedStateChunks.getFirst().metadata().version(), versionCheck, e, ReservedStateErrorMetadata.ErrorKind.PARSING @@ -476,6 +502,36 @@ public void onFailure(Exception e) { ); } + private static ReservedStateChunk mergeReservedStateChunks(List chunks) { + if (chunks.isEmpty()) { + throw new IllegalArgumentException("No chunks provided"); + } + + if (chunks.size() == 1) { + return chunks.getFirst(); + } + + ReservedStateVersion reservedStateVersion = chunks.getFirst().metadata(); + Map mergedChunks = new HashMap<>(chunks.size()); + for (var chunk : chunks) { + Set duplicateKeys = Sets.intersection(chunk.state().keySet(), mergedChunks.keySet()); + if (chunk.metadata().equals(reservedStateVersion) == false) { + throw new IllegalStateException( + "Failed to merge reserved state chunks because of version mismatch: [" + + reservedStateVersion + + "] != [" + + chunk.metadata() + + "]" + ); + } else if (duplicateKeys.isEmpty() == false) { + throw new IllegalStateException("Failed to merge reserved state chunks because of duplicate keys: " + duplicateKeys); + } + mergedChunks.putAll(chunk.state()); + } + + return new ReservedStateChunk(mergedChunks, reservedStateVersion); + } + // package private for testing Exception checkAndReportError( Optional projectId, @@ -518,6 +574,11 @@ void updateErrorState(ErrorState errorState) { return; } + if (errorState.projectId().isPresent() && clusterService.state().metadata().hasProject(errorState.projectId().get()) == false) { + // Can't update error state for a project that doesn't exist yet + return; + } + submitErrorUpdateTask(errorState); } diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 9b8ad3a4ad9d7..1cb2190850ed9 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Releasable; @@ -41,6 +42,7 @@ import org.mockito.ArgumentMatchers; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; @@ -60,6 +62,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; import static org.mockito.ArgumentMatchers.any; @@ -67,6 +70,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -76,6 +80,20 @@ public class ReservedClusterStateServiceTests extends ESTestCase { + private static final String TEST_CHUNK_TEMPLATE = """ + { + "metadata": { + "version": "%s", + "compatibility": "8.4.0" + }, + "state": { + "%s": { + "nothing": "useful" + } + } + } + """; + @SuppressWarnings("unchecked") private static MasterServiceTaskQueue mockTaskQueue() { return (MasterServiceTaskQueue) mock(MasterServiceTaskQueue.class); @@ -349,6 +367,178 @@ public void success(Runnable onPublicationSuccess) { verify(rerouteService, times(1)).reroute(anyString(), any(), any()); } + public void testUpdateErrorStateNonExistingProject() { + ClusterService clusterService = mock(ClusterService.class); + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService service = new ReservedClusterStateService( + clusterService, + mock(RerouteService.class), + List.of(), + List.of() + ); + + ErrorState error = new ErrorState( + Optional.of(randomUniqueProjectId()), + "namespace", + 2L, + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + List.of("error"), + ReservedStateErrorMetadata.ErrorKind.TRANSIENT + ); + service.updateErrorState(error); + verify(clusterService, never()).createTaskQueue(any(), any(), any()); + } + + public void testProcessMultipleChunks() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue()); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + ProjectId projectId = randomProjectIdOrDefault(); + state = setupProject(state, Optional.of(projectId)); + when(clusterService.state()).thenReturn(state); + + AtomicReference exceptionRef = new AtomicReference<>(); + List chunks = new ArrayList<>(); + + String[] randomStateKeys = generateRandomStringArray(randomIntBetween(5, 10), randomIntBetween(10, 15), false); + + List> projectHandlers = new ArrayList<>(); + for (var key : randomStateKeys) { + projectHandlers.add(spy(new TestProjectStateHandler(key))); + } + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + mock(RerouteService.class), + List.of(), + projectHandlers + ); + + for (var testHandler : randomStateKeys) { + String testChunkJSON = Strings.format(TEST_CHUNK_TEMPLATE, 1, testHandler); + try ( + XContentParser chunkParser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testChunkJSON); + ) { + chunks.add(controller.parse(projectId, "test", chunkParser)); + } + } + + controller.process( + projectId, + "test", + chunks, + randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + exceptionRef::set + ); + + assertThat(exceptionRef.get(), nullValue()); + + for (var projectHandler : projectHandlers) { + verify(projectHandler, times(1)).transform(any(), any()); + } + } + + public void testProcessMultipleChunksVersionMismatch() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue()); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + ProjectId projectId = randomProjectIdOrDefault(); + state = setupProject(state, Optional.of(projectId)); + when(clusterService.state()).thenReturn(state); + + String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test1"); + String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 2, "test2"); + + AtomicReference exceptionRef = new AtomicReference<>(); + List chunks = new ArrayList<>(); + + TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test1")); + TestProjectStateHandler projectStateHandler2 = spy(new TestProjectStateHandler("test2")); + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + mock(RerouteService.class), + List.of(), + List.of(projectStateHandler1, projectStateHandler2) + ); + + try ( + XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1); + XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2) + ) { + chunks.add(controller.parse(projectId, "test", chunkParser1)); + chunks.add(controller.parse(projectId, "test", chunkParser2)); + } + + controller.process( + projectId, + "test", + chunks, + randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + exceptionRef::set + ); + + assertThat(exceptionRef.get(), notNullValue()); + assertThat(exceptionRef.get().getMessage(), containsString("Failed to merge reserved state chunks because of version mismatch: [")); + verify(projectStateHandler1, times(0)).transform(any(), any()); + verify(projectStateHandler2, times(0)).transform(any(), any()); + } + + public void testProcessMultipleChunksDuplicateKeys() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue()); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + ProjectId projectId = randomProjectIdOrDefault(); + state = setupProject(state, Optional.of(projectId)); + when(clusterService.state()).thenReturn(state); + + String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test"); + String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test"); + + AtomicReference exceptionRef = new AtomicReference<>(); + List chunks = new ArrayList<>(); + + TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test")); + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + mock(RerouteService.class), + List.of(), + List.of(projectStateHandler1) + ); + + try ( + XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1); + XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2) + ) { + chunks.add(controller.parse(projectId, "test", chunkParser1)); + chunks.add(controller.parse(projectId, "test", chunkParser2)); + } + + controller.process( + projectId, + "test", + chunks, + randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + exceptionRef::set + ); + + assertThat(exceptionRef.get(), notNullValue()); + assertThat( + exceptionRef.get().getMessage(), + containsString("Failed to merge reserved state chunks because of duplicate keys: [test]") + ); + verify(projectStateHandler1, times(0)).transform(any(), any()); + } + public void testUpdateErrorState() { ClusterService clusterService = mock(ClusterService.class); ClusterState state = ClusterState.builder(new ClusterName("test")).build();