Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.BuildVersion;
Expand Down Expand Up @@ -187,7 +188,7 @@ public void process(
process(namespace, stateChunk, versionCheck, errorListener);
}

ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
public ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be used by a caller to parse several state chunks before passing them to process.

try {
return stateChunkParser.apply(parser, null);
} catch (Exception e) {
Expand Down Expand Up @@ -377,6 +378,7 @@ public void onFailure(Exception e) {
* @param projectId the project state to modify
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
* @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
*/
Expand All @@ -387,17 +389,43 @@ public void process(
ReservedStateVersionCheck versionCheck,
Consumer<Exception> errorListener
) {
Map<String, Object> reservedState = reservedStateChunk.state();
ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();
process(projectId, namespace, List.of(reservedStateChunk), versionCheck, errorListener);
}

/**
* Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} by combining several chunks
* into one
*
* @param projectId the project state to modify
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
* @param reservedStateChunks a list of {@link ReservedStateChunk} composite state objects to process
* @param versionCheck Enum representing whether a reserved state should be processed based on the current and new versions
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
*/
public void process(
ProjectId projectId,
String namespace,
List<ReservedStateChunk> reservedStateChunks,
ReservedStateVersionCheck versionCheck,
Consumer<Exception> errorListener
) {
ReservedStateChunk reservedStateChunk;
ReservedStateVersion reservedStateVersion;
LinkedHashSet<String> orderedHandlers;

try {
reservedStateChunk = reservedStateChunks.size() == 1
? reservedStateChunks.getFirst()
: mergeReservedStateChunks(reservedStateChunks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Seems to me mergeReservedStateChunks could handle this case for you. Then the caller doesn't need a ternary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's nicer! Thanks!

Map<String, Object> reservedState = reservedStateChunk.state();
reservedStateVersion = reservedStateChunk.metadata();
orderedHandlers = orderedProjectStateHandlers(reservedState.keySet());
} catch (Exception e) {
ErrorState errorState = new ErrorState(
projectId,
namespace,
reservedStateVersion.version(),
reservedStateChunks.getFirst().metadata().version(),
versionCheck,
e,
ReservedStateErrorMetadata.ErrorKind.PARSING
Expand Down Expand Up @@ -476,6 +504,32 @@ public void onFailure(Exception e) {
);
}

private static ReservedStateChunk mergeReservedStateChunks(List<ReservedStateChunk> chunks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are merging the list of chunks into a single ReservedStateChunk, why do we also need to change the process method signature? Maybe I am missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do want to change the process method signature, I'd think it is probably more useful to accept a Map<String, ReservedStateChunk> so that each chunk can be stored in its own namespace rather than a single one. This seems useful to separate regular settings and secrets. It does lead to more code changes. So maybe that is something you are trying to avoid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I've spent some time thinking about this.

Since we are merging the list of chunks into a single ReservedStateChunk, why do we also need to change the process method signature? Maybe I am missing something?

Before this change process was called with an XContentParser instance (the parser for the json files), once for each file. This is still supported.

With this change we call process once per project to trigger a single cluster state update instead of two and that's why we need to either call process with one parser for each file (I find this approach a little awkward) or call the parse method from the caller MultiProjectFileSettingsService and then pass the resulting ReservedStateChunk instances to process (this is the approach I went with).

If we do want to change the process method signature, I'd think it is probably more useful to accept a Map<String, ReservedStateChunk> so that each chunk can be stored in its own namespace rather than a single one. This seems useful to separate regular settings and secrets. It does lead to more code changes. So maybe that is something you are trying to avoid?

The purpose of this change is to be able to spread the state for a single namespace across several chunks. Since reserved state is versioned per namespace I think it makes the most sense to have a single one for both.

The alternative I think you're proposing is to have several reserved state namespaces in a single cluster state update. We'd then have to add version dependencies between namespaces, which I think is out of scope for the ReservedClusterStateService but could definitely be implemented in the MultiProjectFileSettingsService. I'm leaning towards sticking with a single namespace, but happy to discuss this further.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change process was called with an XContentParser instance

Thanks for explaining. I see the issue now.

The alternative I think you're proposing is to have several reserved state namespaces in a single cluster state update.

After commenting on the other PR about passing down both project settings and secrets at the same time, I now think it might be better to use a single namespace as you have proposed. Sorry for the back and forth.

if (chunks.isEmpty()) {
throw new IllegalArgumentException("No chunks provided");
}

ReservedStateVersion reservedStateVersion = chunks.getFirst().metadata();
Map<String, Object> mergedChunks = new HashMap<>(chunks.size());
for (var chunk : chunks) {
Set<String> duplicateKeys = Sets.intersection(chunk.state().keySet(), mergedChunks.keySet());
if (chunk.metadata().equals(reservedStateVersion) == false) {
throw new IllegalStateException(
"Failed to merge reserved state chunks because of version mismatch: ["
+ reservedStateVersion
+ "] != ["
+ chunk.metadata()
+ "]"
);
} else if (duplicateKeys.isEmpty() == false) {
throw new IllegalStateException("Failed to merge reserved state chunks because of duplicate keys: " + duplicateKeys);
}
mergedChunks.putAll(chunk.state());
}

return new ReservedStateChunk(mergedChunks, reservedStateVersion);
}

// package private for testing
Exception checkAndReportError(
Optional<ProjectId> projectId,
Expand Down Expand Up @@ -518,6 +572,11 @@ void updateErrorState(ErrorState errorState) {
return;
}

if (errorState.projectId().isPresent() && clusterService.state().metadata().hasProject(errorState.projectId().get()) == false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug. If parsing fails for a project that doesn't exist yet, the error state reporting won't work since it's persisted in the project metadata that doesn't exist yet. This case needs to be handled in the caller (multi-project file service).

// Can't update error state for a project that doesn't exist yet
return;
}

submitErrorUpdateTask(errorState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
Expand All @@ -41,6 +42,7 @@
import org.mockito.ArgumentMatchers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -60,13 +62,15 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -76,6 +80,20 @@

public class ReservedClusterStateServiceTests extends ESTestCase {

private static final String TEST_CHUNK_TEMPLATE = """
{
"metadata": {
"version": "%s",
"compatibility": "8.4.0"
},
"state": {
"%s": {
"nothing": "useful"
}
}
}
""";

@SuppressWarnings("unchecked")
private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mockTaskQueue() {
return (MasterServiceTaskQueue<T>) mock(MasterServiceTaskQueue.class);
Expand Down Expand Up @@ -349,6 +367,178 @@ public void success(Runnable onPublicationSuccess) {
verify(rerouteService, times(1)).reroute(anyString(), any(), any());
}

public void testUpdateErrorStateNonExistingProject() {
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
when(clusterService.state()).thenReturn(state);

ReservedClusterStateService service = new ReservedClusterStateService(
clusterService,
mock(RerouteService.class),
List.of(),
List.of()
);

ErrorState error = new ErrorState(
Optional.of(randomUniqueProjectId()),
"namespace",
2L,
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
List.of("error"),
ReservedStateErrorMetadata.ErrorKind.TRANSIENT
);
service.updateErrorState(error);
verify(clusterService, never()).createTaskQueue(any(), any(), any());
}

public void testProcessMultipleChunks() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
final ClusterName clusterName = new ClusterName("elasticsearch");

ClusterState state = ClusterState.builder(clusterName).build();
ProjectId projectId = randomProjectIdOrDefault();
state = setupProject(state, Optional.of(projectId));
when(clusterService.state()).thenReturn(state);

AtomicReference<Exception> exceptionRef = new AtomicReference<>();
List<ReservedStateChunk> chunks = new ArrayList<>();

String[] randomStateKeys = generateRandomStringArray(randomIntBetween(5, 10), randomIntBetween(10, 15), false);

List<ReservedClusterStateHandler<ProjectMetadata, ?>> projectHandlers = new ArrayList<>();
for (var key : randomStateKeys) {
projectHandlers.add(spy(new TestProjectStateHandler(key)));
}

ReservedClusterStateService controller = new ReservedClusterStateService(
clusterService,
mock(RerouteService.class),
List.of(),
projectHandlers
);

for (var testHandler : randomStateKeys) {
String testChunkJSON = Strings.format(TEST_CHUNK_TEMPLATE, 1, testHandler);
try (
XContentParser chunkParser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testChunkJSON);
) {
chunks.add(controller.parse(projectId, "test", chunkParser));
}
}

controller.process(
projectId,
"test",
chunks,
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
exceptionRef::set
);

assertThat(exceptionRef.get(), nullValue());

for (var projectHandler : projectHandlers) {
verify(projectHandler, times(1)).transform(any(), any());
}
}

public void testProcessMultipleChunksVersionMismatch() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
final ClusterName clusterName = new ClusterName("elasticsearch");

ClusterState state = ClusterState.builder(clusterName).build();
ProjectId projectId = randomProjectIdOrDefault();
state = setupProject(state, Optional.of(projectId));
when(clusterService.state()).thenReturn(state);

String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test1");
String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 2, "test2");

AtomicReference<Exception> exceptionRef = new AtomicReference<>();
List<ReservedStateChunk> chunks = new ArrayList<>();

TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test1"));
TestProjectStateHandler projectStateHandler2 = spy(new TestProjectStateHandler("test2"));

ReservedClusterStateService controller = new ReservedClusterStateService(
clusterService,
mock(RerouteService.class),
List.of(),
List.of(projectStateHandler1, projectStateHandler2)
);

try (
XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
) {
chunks.add(controller.parse(projectId, "test", chunkParser1));
chunks.add(controller.parse(projectId, "test", chunkParser2));
}

controller.process(
projectId,
"test",
chunks,
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
exceptionRef::set
);

assertThat(exceptionRef.get(), notNullValue());
assertThat(exceptionRef.get().getMessage(), containsString("Failed to merge reserved state chunks because of version mismatch: ["));
verify(projectStateHandler1, times(0)).transform(any(), any());
verify(projectStateHandler2, times(0)).transform(any(), any());
}

public void testProcessMultipleChunksDuplicateKeys() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
final ClusterName clusterName = new ClusterName("elasticsearch");

ClusterState state = ClusterState.builder(clusterName).build();
ProjectId projectId = randomProjectIdOrDefault();
state = setupProject(state, Optional.of(projectId));
when(clusterService.state()).thenReturn(state);

String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");

AtomicReference<Exception> exceptionRef = new AtomicReference<>();
List<ReservedStateChunk> chunks = new ArrayList<>();

TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test"));

ReservedClusterStateService controller = new ReservedClusterStateService(
clusterService,
mock(RerouteService.class),
List.of(),
List.of(projectStateHandler1)
);

try (
XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
) {
chunks.add(controller.parse(projectId, "test", chunkParser1));
chunks.add(controller.parse(projectId, "test", chunkParser2));
}

controller.process(
projectId,
"test",
chunks,
randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
exceptionRef::set
);

assertThat(exceptionRef.get(), notNullValue());
assertThat(
exceptionRef.get().getMessage(),
containsString("Failed to merge reserved state chunks because of duplicate keys: [test]")
);
verify(projectStateHandler1, times(0)).transform(any(), any());
}

public void testUpdateErrorState() {
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
Expand Down