From 570845afac7e8764b86ad00d82857b3e2aec604c Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Tue, 15 Jul 2025 19:19:58 -0300 Subject: [PATCH 1/2] Make system index migration project-aware Updates all code related to system index migration to work properly in a multi-project context. At the time of writing, this module is not enabled in Serverless and will thus not be run in multi-project mode for now. --- .../elasticsearch/indices/SystemIndices.java | 20 +--- .../plugins/SystemIndexPlugin.java | 23 ++--- .../xpack/core/ml/MlMetadata.java | 9 ++ .../core/transform/TransformMetadata.java | 26 +++++- .../AbstractFeatureMigrationIntegTest.java | 32 +++---- .../action/FeatureMigrationIT.java | 8 +- .../action/MultiFeatureMigrationIT.java | 46 +++++----- ...fecycleIndexMetadataTransportActionIT.java | 21 +++-- ...ransportGetFeatureUpgradeStatusAction.java | 33 ++++--- .../TransportPostFeatureUpgradeAction.java | 12 ++- .../task/MigrationResultsUpdateTask.java | 16 +++- .../task/SystemIndexMigrationExecutor.java | 6 +- .../task/SystemIndexMigrationInfo.java | 92 ++----------------- .../task/SystemIndexMigrationTaskState.java | 6 +- .../task/SystemIndexMigrator.java | 74 +++++++-------- .../task/SystemResourceMigrationInfo.java | 22 ++--- ...ortGetFeatureUpgradeStatusActionTests.java | 35 +++---- .../SystemIndexMigrationMetadataTests.java | 5 +- .../xpack/ml/MachineLearning.java | 12 +-- .../xpack/ml/MachineLearningTests.java | 22 ++--- .../xpack/transform/Transform.java | 12 +-- .../transform/TransformMetadataTests.java | 41 --------- .../xpack/transform/TransformTests.java | 22 ++--- .../elasticsearch/xpack/watcher/Watcher.java | 19 ++-- 24 files changed, 250 insertions(+), 364 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index 6860728e6eaf0..3a3d141a32ede 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -1008,18 +1008,13 @@ public static void cleanUpFeature( } // No-op pre-migration function to be used as the default in case none are provided. - private static void noopPreMigrationFunction( - ClusterService clusterService, - Client client, - ActionListener> listener - ) { + private static void noopPreMigrationFunction(ProjectMetadata project, Client client, ActionListener> listener) { listener.onResponse(Collections.emptyMap()); } // No-op pre-migration function to be used as the default in case none are provided. private static void noopPostMigrationFunction( Map preUpgradeMetadata, - ClusterService clusterService, Client client, ActionListener listener ) { @@ -1028,25 +1023,20 @@ private static void noopPostMigrationFunction( /** * Type for the handler that's invoked prior to migrating a Feature's system indices. - * See {@link SystemIndexPlugin#prepareForIndicesMigration(ClusterService, Client, ActionListener)}. + * See {@link SystemIndexPlugin#prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)}. */ @FunctionalInterface public interface MigrationPreparationHandler { - void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener); + void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener); } /** * Type for the handler that's invoked when all of a feature's system indices have been migrated. - * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, ClusterService, Client, ActionListener)}. + * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, Client, ActionListener)}. */ @FunctionalInterface public interface MigrationCompletionHandler { - void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ); + void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener); } public interface CleanupFunction { diff --git a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java index 4efea68683635..ff2fc515ce2e0 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -45,8 +46,8 @@ * in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will simply retrieve a list * of system and associated indices and delete them. * - *

An implementation may also override {@link #prepareForIndicesMigration(ClusterService, Client, ActionListener)} and - * {@link #indicesMigrationComplete(Map, ClusterService, Client, ActionListener)} in order to take special action before and after a + *

An implementation may also override {@link #prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)} and + * {@link #indicesMigrationComplete(Map, Client, ActionListener)} in order to take special action before and after a * feature migration, which will temporarily block access to system indices. For example, a plugin might want to enter a safe mode and * reject certain requests while the migration is in progress. See org.elasticsearch.upgrades.SystemIndexMigrationExecutor for * more details. @@ -126,19 +127,19 @@ default void cleanUpFeature( * very rare. * * This method can also store metadata to be passed to - * {@link SystemIndexPlugin#indicesMigrationComplete(Map, ClusterService, Client, ActionListener)} when it is called; see the + * {@link SystemIndexPlugin#indicesMigrationComplete(Map, Client, ActionListener)} when it is called; see the * {@code listener} parameter for details. * - * @param clusterService The cluster service. + * @param project The project metadata. * @param client A {@link org.elasticsearch.client.internal.ParentTaskAssigningClient} with the parent task set to the upgrade task. * Does not set the origin header, so implementors of this method will likely want to wrap it in an * {@link org.elasticsearch.client.internal.OriginSettingClient}. * @param listener A listener that should have {@link ActionListener#onResponse(Object)} called once all necessary preparations for the * upgrade of indices owned by this plugin have been completed. The {@link Map} passed to the listener will be stored - * and passed to {@link #indicesMigrationComplete(Map, ClusterService, Client, ActionListener)}. Note the contents of + * and passed to {@link #indicesMigrationComplete(Map, Client, ActionListener)}. Note the contents of * the map *must* be writeable using {@link org.elasticsearch.common.io.stream.StreamOutput#writeGenericValue(Object)}. */ - default void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { + default void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { listener.onResponse(Collections.emptyMap()); } @@ -155,20 +156,14 @@ default void prepareForIndicesMigration(ClusterService clusterService, Client cl * with no data format changes allowed). * * @param preUpgradeMetadata The metadata that was given to the listener by - * {@link #prepareForIndicesMigration(ClusterService, Client, ActionListener)}. - * @param clusterService The cluster service. + * {@link #prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)}. * @param client A {@link org.elasticsearch.client.internal.ParentTaskAssigningClient} with the parent task set to the upgrade task. * Does not set the origin header, so implementors of this method will likely want to wrap it in an * {@link org.elasticsearch.client.internal.OriginSettingClient}. * @param listener A listener that should have {@code ActionListener.onResponse(true)} called once all actions following the upgrade * of this plugin's system indices have been completed. */ - default void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { + default void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener) { listener.onResponse(true); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 723b821abd17f..38ceca998db70 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.SimpleDiffable; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; @@ -252,4 +253,12 @@ public static MlMetadata getMlMetadata(ClusterState state) { } return mlMetadata; } + + public static MlMetadata getMlMetadata(ProjectMetadata project) { + MlMetadata mlMetadata = project == null ? null : project.custom(TYPE); + if (mlMetadata == null) { + return EMPTY_METADATA; + } + return mlMetadata; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java index f76e14c66a57c..0e23358d1329c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java @@ -15,10 +15,12 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -212,8 +214,9 @@ public TransformMetadata build() { } /** - * @deprecated use {@link #transformMetadata(ClusterState, ProjectId)} + * @deprecated use {@link #transformMetadata(ProjectMetadata)} */ + @FixForMultiProject @Deprecated(forRemoval = true) public static TransformMetadata getTransformMetadata(ClusterState state) { TransformMetadata TransformMetadata = (state == null) ? null : state.metadata().getSingleProjectCustom(TYPE); @@ -223,6 +226,11 @@ public static TransformMetadata getTransformMetadata(ClusterState state) { return TransformMetadata; } + /** + * @deprecated use {@link #transformMetadata(ProjectMetadata)} + */ + @FixForMultiProject + @Deprecated(forRemoval = true) public static TransformMetadata transformMetadata(@Nullable ClusterState state, @Nullable ProjectId projectId) { if (state == null || projectId == null) { return EMPTY_METADATA; @@ -230,18 +238,32 @@ public static TransformMetadata transformMetadata(@Nullable ClusterState state, return transformMetadata(state.projectState(projectId)); } + /** + * @deprecated use {@link #transformMetadata(ProjectMetadata)} + */ + @FixForMultiProject + @Deprecated(forRemoval = true) public static TransformMetadata transformMetadata(@Nullable ProjectState projectState) { if (projectState == null) { return EMPTY_METADATA; } - TransformMetadata transformMetadata = projectState.metadata().custom(TYPE); + return transformMetadata(projectState.metadata()); + } + + public static TransformMetadata transformMetadata(ProjectMetadata project) { + TransformMetadata transformMetadata = project == null ? null : project.custom(TYPE); if (transformMetadata == null) { return EMPTY_METADATA; } return transformMetadata; } + @Deprecated(forRemoval = true) public static boolean upgradeMode(ClusterState state) { return getTransformMetadata(state).upgradeMode(); } + + public static boolean upgradeMode(ProjectMetadata project) { + return transformMetadata(project).upgradeMode(); + } } diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java index 8ef0027256082..6589e3b907f19 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java @@ -20,10 +20,9 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; @@ -57,7 +56,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -151,7 +150,11 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase protected String masterName; protected static ProjectMetadata assertMetadataAfterMigration(String featureName) { - ProjectMetadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject(); + ProjectMetadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) + .get() + .getState() + .metadata() + .getProject(ProjectId.DEFAULT); // Check that the results metadata is what we expect. FeatureMigrationResults currentResults = finalMetadata.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, notNullValue()); @@ -169,8 +172,8 @@ public void setup() { masterAndDataNode = internalCluster().startNode(); TestPlugin testPlugin = getPlugin(TestPlugin.class); - testPlugin.preMigrationHook.set((state) -> Collections.emptyMap()); - testPlugin.postMigrationHook.set((state, metadata) -> {}); + testPlugin.preMigrationHook.set((project) -> Collections.emptyMap()); + testPlugin.postMigrationHook.set((metadata) -> {}); } protected T getPlugin(Class type) { @@ -340,8 +343,8 @@ protected static TestPlugin.BlockingActionFilter blockAction(String actionTypeNa } public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin { - public final AtomicReference>> preMigrationHook = new AtomicReference<>(); - public final AtomicReference>> postMigrationHook = new AtomicReference<>(); + public final AtomicReference>> preMigrationHook = new AtomicReference<>(); + public final AtomicReference>> postMigrationHook = new AtomicReference<>(); private final BlockingActionFilter blockingActionFilter; public TestPlugin() { @@ -375,18 +378,13 @@ public Collection getAssociatedIndexDescriptors() { } @Override - public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - listener.onResponse(preMigrationHook.get().apply(clusterService.state())); + public void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { + listener.onResponse(preMigrationHook.get().apply(project)); } @Override - public void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { - postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata); + public void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener) { + postMigrationHook.get().accept(preUpgradeMetadata); listener.onResponse(true); } diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java index cb42300bb31ff..bdefca8520592 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.service.ClusterService; @@ -153,7 +154,7 @@ public void testMigrateSystemIndex() throws Exception { SetOnce preUpgradeHookCalled = new SetOnce<>(); SetOnce postUpgradeHookCalled = new SetOnce<>(); - getPlugin(TestPlugin.class).preMigrationHook.set(clusterState -> { + getPlugin(TestPlugin.class).preMigrationHook.set(project -> { // Check that the ordering of these calls is correct. assertThat(postUpgradeHookCalled.get(), nullValue()); Map metadata = new HashMap<>(); @@ -170,7 +171,7 @@ public void testMigrateSystemIndex() throws Exception { return metadata; }); - getPlugin(TestPlugin.class).postMigrationHook.set((clusterState, metadata) -> { + getPlugin(TestPlugin.class).postMigrationHook.set((metadata) -> { assertThat(preUpgradeHookCalled.get(), is(true)); assertThat(metadata, hasEntry("stringKey", "stringValue")); @@ -182,7 +183,8 @@ public void testMigrateSystemIndex() throws Exception { assertThat(innerMap, hasEntry("innerKey", "innerValue")); // We shouldn't have any results in the cluster state as no features have fully finished yet. - FeatureMigrationResults currentResults = clusterState.metadata().getProject().custom(FeatureMigrationResults.TYPE); + final var project = clusterService().state().metadata().getProject(ProjectId.DEFAULT); + FeatureMigrationResults currentResults = project.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, nullValue()); postUpgradeHookCalled.set(true); }); diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java index 11aa44fe32337..81dd27aa35392 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java @@ -10,9 +10,8 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersions; @@ -32,7 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -91,7 +90,7 @@ public void testMultipleFeatureMigration() throws Exception { SetOnce secondPluginPreMigrationHookCalled = new SetOnce<>(); SetOnce secondPluginPostMigrationHookCalled = new SetOnce<>(); - getPlugin(TestPlugin.class).preMigrationHook.set(clusterState -> { + getPlugin(TestPlugin.class).preMigrationHook.set(project -> { // None of the other hooks should have been called yet. assertThat(postMigrationHookCalled.get(), nullValue()); assertThat(secondPluginPreMigrationHookCalled.get(), nullValue()); @@ -100,7 +99,7 @@ public void testMultipleFeatureMigration() throws Exception { metadata.put("stringKey", "first plugin value"); // We shouldn't have any results in the cluster state given no features have finished yet. - FeatureMigrationResults currentResults = clusterState.metadata().getProject().custom(FeatureMigrationResults.TYPE); + FeatureMigrationResults currentResults = project.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, nullValue()); preMigrationHookCalled.set(true); @@ -108,7 +107,7 @@ public void testMultipleFeatureMigration() throws Exception { return metadata; }); - getPlugin(TestPlugin.class).postMigrationHook.set((clusterState, metadata) -> { + getPlugin(TestPlugin.class).postMigrationHook.set(metadata -> { // Check that the hooks have been called or not as expected. assertThat(preMigrationHookCalled.get(), is(true)); assertThat(secondPluginPreMigrationHookCalled.get(), nullValue()); @@ -117,14 +116,15 @@ public void testMultipleFeatureMigration() throws Exception { assertThat(metadata, hasEntry("stringKey", "first plugin value")); // We shouldn't have any results in the cluster state given no features have finished yet. - FeatureMigrationResults currentResults = clusterState.metadata().getProject().custom(FeatureMigrationResults.TYPE); + final var project = clusterService().state().metadata().getProject(ProjectId.DEFAULT); + FeatureMigrationResults currentResults = project.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, nullValue()); postMigrationHookCalled.set(true); hooksCalled.countDown(); }); - getPlugin(SecondPlugin.class).preMigrationHook.set(clusterState -> { + getPlugin(SecondPlugin.class).preMigrationHook.set(project -> { // Check that the hooks have been called or not as expected. assertThat(preMigrationHookCalled.get(), is(true)); assertThat(postMigrationHookCalled.get(), is(true)); @@ -134,7 +134,7 @@ public void testMultipleFeatureMigration() throws Exception { metadata.put("stringKey", "second plugin value"); // But now, we should have results, as we're in a new feature! - FeatureMigrationResults currentResults = clusterState.metadata().getProject().custom(FeatureMigrationResults.TYPE); + FeatureMigrationResults currentResults = project.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, notNullValue()); assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(FEATURE_NAME))); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true)); @@ -146,7 +146,7 @@ public void testMultipleFeatureMigration() throws Exception { return metadata; }); - getPlugin(SecondPlugin.class).postMigrationHook.set((clusterState, metadata) -> { + getPlugin(SecondPlugin.class).postMigrationHook.set(metadata -> { // Check that the hooks have been called or not as expected. assertThat(preMigrationHookCalled.get(), is(true)); assertThat(postMigrationHookCalled.get(), is(true)); @@ -155,7 +155,8 @@ public void testMultipleFeatureMigration() throws Exception { assertThat(metadata, hasEntry("stringKey", "second plugin value")); // And here, the results should be the same, as we haven't updated the state with this feature's status yet. - FeatureMigrationResults currentResults = clusterState.metadata().getProject().custom(FeatureMigrationResults.TYPE); + final var project = clusterService().state().metadata().getProject(ProjectId.DEFAULT); + FeatureMigrationResults currentResults = project.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, notNullValue()); assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(FEATURE_NAME))); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true)); @@ -193,7 +194,11 @@ public void testMultipleFeatureMigration() throws Exception { assertTrue("the second plugin's pre-migration hook wasn't actually called", secondPluginPreMigrationHookCalled.get()); assertTrue("the second plugin's post-migration hook wasn't actually called", secondPluginPostMigrationHookCalled.get()); - ProjectMetadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject(); + ProjectMetadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) + .get() + .getState() + .metadata() + .getProject(ProjectId.DEFAULT); // Check that the results metadata is what we expect FeatureMigrationResults currentResults = finalMetadata.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, notNullValue()); @@ -263,8 +268,8 @@ public void testMultipleFeatureMigration() throws Exception { public static class SecondPlugin extends Plugin implements SystemIndexPlugin { - private final AtomicReference>> preMigrationHook = new AtomicReference<>(); - private final AtomicReference>> postMigrationHook = new AtomicReference<>(); + private final AtomicReference>> preMigrationHook = new AtomicReference<>(); + private final AtomicReference>> postMigrationHook = new AtomicReference<>(); public SecondPlugin() {} @@ -284,18 +289,13 @@ public Collection getSystemIndexDescriptors(Settings sett } @Override - public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - listener.onResponse(preMigrationHook.get().apply(clusterService.state())); + public void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { + listener.onResponse(preMigrationHook.get().apply(project)); } @Override - public void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { - postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata); + public void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener) { + postMigrationHook.get().accept(preUpgradeMetadata); listener.onResponse(true); } } diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java index 63b3bafee9eae..79fc569a49cc8 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java @@ -21,7 +21,8 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; @@ -147,15 +148,15 @@ public void testILMState() throws Exception { var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex))); - IndexMetadata destBefore = getClusterMetadata(destIndex).getProject().index(destIndex); + IndexMetadata destBefore = getClusterMetadata(destIndex).index(destIndex); assertNull(destBefore.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)); // copy over the metadata copyMetadata(backingIndex, destIndex); var metadataAfter = getClusterMetadata(backingIndex, destIndex); - IndexMetadata sourceAfter = metadataAfter.getProject().index(backingIndex); - IndexMetadata destAfter = metadataAfter.getProject().index(destIndex); + IndexMetadata sourceAfter = metadataAfter.index(backingIndex); + IndexMetadata destAfter = metadataAfter.index(destIndex); assertNotNull(destAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)); assertEquals( sourceAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY), @@ -183,8 +184,8 @@ public void testRolloverInfos() throws Exception { safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex))); var metadataBefore = getClusterMetadata(backingIndex, destIndex); - IndexMetadata source = metadataBefore.getProject().index(backingIndex); - IndexMetadata destBefore = metadataBefore.getProject().index(destIndex); + IndexMetadata source = metadataBefore.index(backingIndex); + IndexMetadata destBefore = metadataBefore.index(destIndex); // sanity check not equal before the copy if (backingIndex.equals(writeIndex)) { @@ -198,7 +199,7 @@ public void testRolloverInfos() throws Exception { copyMetadata(backingIndex, destIndex); // now rollover info should be equal - IndexMetadata destAfter = getClusterMetadata(destIndex).getProject().index(destIndex); + IndexMetadata destAfter = getClusterMetadata(destIndex).index(destIndex); assertEquals(source.getRolloverInfos(), destAfter.getRolloverInfos()); } } @@ -282,7 +283,9 @@ private String rollover(String dataStream) { return rolloverResponse.getNewIndex(); } - private Metadata getClusterMetadata(String... indices) { - return safeGet(clusterAdmin().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).indices(indices))).getState().metadata(); + private ProjectMetadata getClusterMetadata(String... indices) { + return safeGet(clusterAdmin().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).indices(indices))).getState() + .metadata() + .getProject(ProjectId.DEFAULT); } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java index ff26eb000e405..884594f0dd438 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java @@ -13,6 +13,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexVersion; @@ -48,6 +50,7 @@ public class TransportGetFeatureUpgradeStatusAction extends TransportMasterNodeA GetFeatureUpgradeStatusResponse> { private final SystemIndices systemIndices; + private final ProjectResolver projectResolver; @Inject public TransportGetFeatureUpgradeStatusAction( @@ -55,7 +58,8 @@ public TransportGetFeatureUpgradeStatusAction( ThreadPool threadPool, ActionFilters actionFilters, ClusterService clusterService, - SystemIndices systemIndices + SystemIndices systemIndices, + ProjectResolver projectResolver ) { super( GetFeatureUpgradeStatusAction.NAME, @@ -69,6 +73,7 @@ public TransportGetFeatureUpgradeStatusAction( ); this.systemIndices = systemIndices; + this.projectResolver = projectResolver; } @Override @@ -79,13 +84,14 @@ protected void masterOperation( ActionListener listener ) throws Exception { + final var project = projectResolver.getProjectMetadata(state); List features = systemIndices.getFeatures() .stream() .sorted(Comparator.comparing(SystemIndices.Feature::getName)) - .map(feature -> getFeatureUpgradeStatus(state, feature)) + .map(feature -> getFeatureUpgradeStatus(project, feature)) .toList(); - boolean migrationTaskExists = PersistentTasksCustomMetadata.getTaskWithId(state, SYSTEM_INDEX_UPGRADE_TASK_NAME) != null; + boolean migrationTaskExists = PersistentTasksCustomMetadata.getTaskWithId(project, SYSTEM_INDEX_UPGRADE_TASK_NAME) != null; GetFeatureUpgradeStatusResponse.UpgradeStatus initalStatus = migrationTaskExists ? IN_PROGRESS : NO_MIGRATION_NEEDED; GetFeatureUpgradeStatusResponse.UpgradeStatus status = Stream.concat( @@ -99,17 +105,20 @@ protected void masterOperation( listener.onResponse(new GetFeatureUpgradeStatusResponse(features, status)); } - static GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus getFeatureUpgradeStatus(ClusterState state, SystemIndices.Feature feature) { + static GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus getFeatureUpgradeStatus( + ProjectMetadata project, + SystemIndices.Feature feature + ) { String featureName = feature.getName(); PersistentTasksCustomMetadata.PersistentTask migrationTask = PersistentTasksCustomMetadata - .getTaskWithId(state, SYSTEM_INDEX_UPGRADE_TASK_NAME); + .getTaskWithId(project, SYSTEM_INDEX_UPGRADE_TASK_NAME); final String currentFeature = Optional.ofNullable(migrationTask) .map(task -> task.getState()) .map(taskState -> ((SystemIndexMigrationTaskState) taskState).getCurrentFeature()) .orElse(null); - List indexInfos = getIndexInfos(state, feature); + List indexInfos = getIndexInfos(project, feature); IndexVersion minimumVersion = indexInfos.stream() .map(GetFeatureUpgradeStatusResponse.IndexInfo::getVersion) @@ -134,9 +143,9 @@ static GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus getFeatureUpgradeSta } // visible for testing - static List getIndexInfos(ClusterState state, SystemIndices.Feature feature) { + static List getIndexInfos(ProjectMetadata project, SystemIndices.Feature feature) { final SingleFeatureMigrationResult featureStatus = Optional.ofNullable( - (FeatureMigrationResults) state.metadata().getProject().custom(FeatureMigrationResults.TYPE) + (FeatureMigrationResults) project.custom(FeatureMigrationResults.TYPE) ).map(FeatureMigrationResults::getFeatureStatuses).map(results -> results.get(feature.getName())).orElse(null); final String failedResourceName = featureStatus == null ? null : featureStatus.getFailedResourceName(); @@ -145,9 +154,9 @@ static List getIndexInfos(ClusterStat Stream indexInfoStream = feature.getIndexDescriptors() .stream() - .flatMap(descriptor -> descriptor.getMatchingIndices(state.metadata().getProject()).stream()) + .flatMap(descriptor -> descriptor.getMatchingIndices(project).stream()) .sorted(String::compareTo) - .map(index -> state.metadata().getProject().index(index)) + .map(project::index) .map( indexMetadata -> new GetFeatureUpgradeStatusResponse.IndexInfo( indexMetadata.getIndex().getName(), @@ -164,10 +173,10 @@ static List getIndexInfos(ClusterStat // we don't know migration of which backing index has failed, // so, unfortunately, have to report exception for all indices for now - return descriptor.getMatchingIndices(state.metadata().getProject()) + return descriptor.getMatchingIndices(project) .stream() .sorted(String::compareTo) - .map(index -> state.metadata().getProject().index(index)) + .map(project::index) .map( indexMetadata -> new GetFeatureUpgradeStatusResponse.IndexInfo( indexMetadata.getIndex().getName(), diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportPostFeatureUpgradeAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportPostFeatureUpgradeAction.java index ee4ee93568aa6..85c28e46786dd 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportPostFeatureUpgradeAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportPostFeatureUpgradeAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; @@ -43,6 +44,7 @@ public class TransportPostFeatureUpgradeAction extends TransportMasterNodeAction private final SystemIndices systemIndices; private final PersistentTasksService persistentTasksService; + private final ProjectResolver projectResolver; @Inject public TransportPostFeatureUpgradeAction( @@ -51,7 +53,8 @@ public TransportPostFeatureUpgradeAction( ActionFilters actionFilters, ClusterService clusterService, SystemIndices systemIndices, - PersistentTasksService persistentTasksService + PersistentTasksService persistentTasksService, + ProjectResolver projectResolver ) { super( PostFeatureUpgradeAction.NAME, @@ -65,6 +68,7 @@ public TransportPostFeatureUpgradeAction( ); this.systemIndices = systemIndices; this.persistentTasksService = persistentTasksService; + this.projectResolver = projectResolver; } @Override @@ -78,9 +82,10 @@ protected void masterOperation( GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED, GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR ); + final var project = projectResolver.getProjectMetadata(state); List featuresToMigrate = systemIndices.getFeatures() .stream() - .map(feature -> getFeatureUpgradeStatus(state, feature)) + .map(feature -> getFeatureUpgradeStatus(project, feature)) .filter(status -> upgradableStatuses.contains(status.getUpgradeStatus())) .map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getFeatureName) .map(PostFeatureUpgradeResponse.Feature::new) @@ -88,7 +93,8 @@ protected void masterOperation( .toList(); if (featuresToMigrate.isEmpty() == false) { - persistentTasksService.sendStartRequest( + persistentTasksService.sendProjectStartRequest( + project.id(), SYSTEM_INDEX_UPGRADE_TASK_NAME, SYSTEM_INDEX_UPGRADE_TASK_NAME, new SystemIndexMigrationTaskParams(), diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java index 80060ec4908f1..cc02ac29112b1 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.SuppressForbidden; @@ -26,11 +27,18 @@ public class MigrationResultsUpdateTask extends ClusterStateUpdateTask { private static final Logger logger = LogManager.getLogger(MigrationResultsUpdateTask.class); private final String featureName; + private final ProjectId projectId; private final SingleFeatureMigrationResult status; private final ActionListener listener; - private MigrationResultsUpdateTask(String featureName, SingleFeatureMigrationResult status, ActionListener listener) { + private MigrationResultsUpdateTask( + String featureName, + ProjectId projectId, + SingleFeatureMigrationResult status, + ActionListener listener + ) { this.featureName = featureName; + this.projectId = projectId; this.status = status; this.listener = listener; } @@ -38,15 +46,17 @@ private MigrationResultsUpdateTask(String featureName, SingleFeatureMigrationRes /** * Creates a task that will update the status of a feature migration. * @param featureName The name of the feature whose status should be updated. + * @param projectId The project ID * @param status The status to be associated with the given feature. * @param listener A listener that will be called upon successfully updating the cluster state. */ public static MigrationResultsUpdateTask upsert( String featureName, + ProjectId projectId, SingleFeatureMigrationResult status, ActionListener listener ) { - return new MigrationResultsUpdateTask(featureName, status, listener); + return new MigrationResultsUpdateTask(featureName, projectId, status, listener); } /** @@ -69,7 +79,7 @@ private static void submitUnbatchedTask( @Override public ClusterState execute(ClusterState currentState) throws Exception { - final var project = currentState.metadata().getProject(); + final var project = currentState.metadata().getProject(projectId); FeatureMigrationResults currentResults = project.custom(FeatureMigrationResults.TYPE); if (currentResults == null) { currentResults = new FeatureMigrationResults(new HashMap<>()); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java index dad16d3cfa83b..83e4b80cb7351 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -41,6 +42,7 @@ public class SystemIndexMigrationExecutor extends PersistentTasksExecutor> listener) { - owningFeature.getPreMigrationFunction().prepareForIndicesMigration(clusterService, client, listener); + void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { + owningFeature.getPreMigrationFunction().prepareForIndicesMigration(project, client, listener); } /** * Invokes the post-migration hooks for the feature that owns this index. - * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, ClusterService, Client, ActionListener)}. + * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, Client, ActionListener)}. + * * @param metadata The metadata that was passed into the listener by the pre-migration hook. - * @param clusterService For retrieving the state. * @param client For performing any update operations necessary to prepare for the upgrade. * @param listener Call {@link ActionListener#onResponse(Object)} when the hook is finished. */ - void indicesMigrationComplete( - Map metadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { - owningFeature.getPostMigrationFunction().indicesMigrationComplete(metadata, clusterService, client, listener); + void indicesMigrationComplete(Map metadata, Client client, ActionListener listener) { + owningFeature.getPostMigrationFunction().indicesMigrationComplete(metadata, client, listener); } /** @@ -245,72 +237,4 @@ private static Settings copySettingsForNewIndex(Settings currentIndexSettings, I }); return newIndexSettings.build(); } - - /** - * Convenience factory method holding the logic for creating instances from a Feature object. - * @param feature The feature that - * @param metadata The current metadata, as index migration depends on the current state of the cluster. - * @param indexScopedSettings This is necessary to make adjustments to the indices settings for unmanaged indices. - * @return A {@link Stream} of {@link SystemIndexMigrationInfo}s that represent all the indices the given feature currently owns. - */ - static Stream fromFeature( - SystemIndices.Feature feature, - Metadata metadata, - IndexScopedSettings indexScopedSettings - ) { - return feature.getIndexDescriptors() - .stream() - .flatMap( - descriptor -> descriptor.getMatchingIndices(metadata.getDefaultProject()) - .stream() - .map(metadata.getProject()::index) - .filter(imd -> { - assert imd != null - : "got null IndexMetadata for index in system index descriptor [" + descriptor.getIndexPattern() + "]"; - return Objects.nonNull(imd); - }) - .map(imd -> SystemIndexMigrationInfo.build(imd, descriptor, feature, indexScopedSettings)) - ); - } - - static SystemIndexMigrationInfo fromTaskState( - SystemIndexMigrationTaskState taskState, - SystemIndices systemIndices, - Metadata metadata, - IndexScopedSettings indexScopedSettings - ) { - SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(taskState.getCurrentIndex()); - SystemIndices.Feature feature = systemIndices.getFeature(taskState.getCurrentFeature()); - IndexMetadata imd = metadata.getProject().index(taskState.getCurrentIndex()); - - // It's possible for one or both of these to happen if the executing node fails during execution and: - // 1. The task gets assigned to a node with a different set of plugins installed. - // 2. The index in question is somehow deleted before we got to it. - // The first case shouldn't happen, master nodes must have all `SystemIndexPlugins` installed. - // In the second case, we should just start over. - if (descriptor == null) { - String errorMsg = format( - "couldn't find system index descriptor for index [%s] from feature [%s], which likely means this node is missing a plugin", - taskState.getCurrentIndex(), - taskState.getCurrentFeature() - ); - logger.warn(errorMsg); - assert false : errorMsg; - throw new IllegalStateException(errorMsg); - } - - if (imd == null) { - String errorMsg = format( - "couldn't find index [%s] from feature [%s] with descriptor pattern [%s]", - taskState.getCurrentIndex(), - taskState.getCurrentFeature(), - descriptor.getIndexPattern() - ); - logger.warn(errorMsg); - assert false : errorMsg; - throw new IllegalStateException(errorMsg); - } - - return build(imd, descriptor, feature, indexScopedSettings); - } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationTaskState.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationTaskState.java index cb7ac812bb8b1..08ae777bc4671 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationTaskState.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationTaskState.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.persistent.PersistentTaskState; @@ -80,8 +80,8 @@ public String getCurrentFeature() { /** * Retrieves metadata stored by the pre-upgrade hook, intended for consumption by the post-migration hook. - * See {@link org.elasticsearch.plugins.SystemIndexPlugin#prepareForIndicesMigration(ClusterService, Client, ActionListener)} and - * {@link org.elasticsearch.plugins.SystemIndexPlugin#indicesMigrationComplete(Map, ClusterService, Client, ActionListener)} for + * See {@link org.elasticsearch.plugins.SystemIndexPlugin#prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)} and + * {@link org.elasticsearch.plugins.SystemIndexPlugin#indicesMigrationComplete(Map, Client, ActionListener)} for * details on the pre- and post-migration hooks. */ public Map getFeatureCallbackMetadata() { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java index 82d9d190ffc20..f630065423998 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java @@ -30,12 +30,12 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -88,6 +88,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask { private final SystemIndices systemIndices; private final IndexScopedSettings indexScopedSettings; private final ThreadPool threadPool; + private final ProjectId projectId; // In-memory state // NOTE: This queue is not a thread-safe class. Use `synchronized (migrationQueue)` whenever you access this. I chose this rather than @@ -106,9 +107,11 @@ public SystemIndexMigrator( ClusterService clusterService, SystemIndices systemIndices, IndexScopedSettings indexScopedSettings, - ThreadPool threadPool + ThreadPool threadPool, + ProjectId projectId ) { super(id, type, action, "system-index-migrator", parentTask, headers); + this.projectId = projectId; this.baseClient = new ParentTaskAssigningClient(client, parentTask); this.clusterService = clusterService; this.systemIndices = systemIndices; @@ -117,8 +120,7 @@ public SystemIndexMigrator( } public void run(SystemIndexMigrationTaskState taskState) { - ClusterState clusterState = clusterService.state(); - ProjectMetadata projectMetadata = clusterState.metadata().getProject(); + ProjectMetadata projectMetadata = clusterService.state().metadata().getProject(projectId); final String stateIndexName; final String stateFeatureName; @@ -215,7 +217,7 @@ public void run(SystemIndexMigrationTaskState taskState) { // Kick off our callback "loop" - finishIndexAndLoop calls back into startFeatureMigration logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState)); - clearResults(clusterService, ActionListener.wrap(state -> startFeatureMigration(stateFeatureName), this::markAsFailed)); + clearResults(ActionListener.wrap(state -> startFeatureMigration(stateFeatureName), this::markAsFailed)); } private void finishIndexAndLoop(SystemIndexMigrationInfo migrationInfo, BulkByScrollResponse bulkResponse) { @@ -252,29 +254,23 @@ private void finishResourceAndLoop(SystemResourceMigrationInfo lastMigrationInfo SystemResourceMigrationInfo nextMigrationInfo = currentMigrationInfo(); if (nextMigrationInfo == null || nextMigrationInfo.getFeatureName().equals(lastMigrationInfo.getFeatureName()) == false) { // The next feature name is different than the last one, so we just finished a feature - time to invoke its post-migration hook - lastMigrationInfo.indicesMigrationComplete( - currentFeatureCallbackMetadata.get(), - clusterService, - baseClient, - ActionListener.wrap(successful -> { - if (successful == false) { - // GWB> Should we actually fail in this case instead of plugging along? - logger.warn( - "post-migration hook for feature [{}] indicated failure;" - + " feature migration metadata prior to failure was [{}]", - lastMigrationInfo.getFeatureName(), - currentFeatureCallbackMetadata.get() - ); - } - recordIndexMigrationSuccess(lastMigrationInfo); - }, this::markAsFailed) - ); + lastMigrationInfo.indicesMigrationComplete(currentFeatureCallbackMetadata.get(), baseClient, ActionListener.wrap(successful -> { + if (successful == false) { + // GWB> Should we actually fail in this case instead of plugging along? + logger.warn( + "post-migration hook for feature [{}] indicated failure;" + " feature migration metadata prior to failure was [{}]", + lastMigrationInfo.getFeatureName(), + currentFeatureCallbackMetadata.get() + ); + } + recordIndexMigrationSuccess(lastMigrationInfo); + }, this::markAsFailed)); } else { startFeatureMigration(lastMigrationInfo.getFeatureName()); } } - private void migrateResource(SystemResourceMigrationInfo migrationInfo, ClusterState clusterState) { + private void migrateResource(SystemResourceMigrationInfo migrationInfo, ProjectMetadata project) { if (migrationInfo instanceof SystemIndexMigrationInfo systemIndexMigrationInfo) { logger.info( "preparing to migrate old index [{}] from feature [{}] to new index [{}]", @@ -282,7 +278,7 @@ private void migrateResource(SystemResourceMigrationInfo migrationInfo, ClusterS migrationInfo.getFeatureName(), systemIndexMigrationInfo.getNextIndexName() ); - migrateSingleIndex(systemIndexMigrationInfo, clusterState, this::finishIndexAndLoop); + migrateSingleIndex(systemIndexMigrationInfo, project, this::finishIndexAndLoop); } else if (migrationInfo instanceof SystemDataStreamMigrationInfo systemDataStreamMigrationInfo) { logger.info( "preparing to migrate old indices from data stream [{}] from feature [{}] to new indices", @@ -298,6 +294,7 @@ private void migrateResource(SystemResourceMigrationInfo migrationInfo, ClusterS private void recordIndexMigrationSuccess(SystemResourceMigrationInfo lastMigrationInfo) { MigrationResultsUpdateTask updateTask = MigrationResultsUpdateTask.upsert( lastMigrationInfo.getFeatureName(), + projectId, SingleFeatureMigrationResult.success(), ActionListener.wrap(state -> { startFeatureMigration(lastMigrationInfo.getFeatureName()); @@ -320,17 +317,22 @@ private void startFeatureMigration(String lastFeatureName) { assert migrationInfo != null : "the queue of indices to migrate should have been checked for emptiness before calling this method"; if (migrationInfo.getFeatureName().equals(lastFeatureName) == false) { // And then invoke the pre-migration hook for the next one. - migrationInfo.prepareForIndicesMigration(clusterService, baseClient, ActionListener.wrap(newMetadata -> { + final var project = clusterService.state().metadata().getProject(projectId); + migrationInfo.prepareForIndicesMigration(project, baseClient, ActionListener.wrap(newMetadata -> { currentFeatureCallbackMetadata.set(newMetadata); - updateTaskState(migrationInfo, state -> migrateResource(migrationInfo, state), newMetadata); + updateTaskState(migrationInfo, newProject -> migrateResource(migrationInfo, newProject), newMetadata); }, this::markAsFailed)); } else { // Otherwise, just re-use what we already have. - updateTaskState(migrationInfo, state -> migrateResource(migrationInfo, state), currentFeatureCallbackMetadata.get()); + updateTaskState(migrationInfo, newProject -> migrateResource(migrationInfo, newProject), currentFeatureCallbackMetadata.get()); } } - private void updateTaskState(SystemResourceMigrationInfo migrationInfo, Consumer listener, Map metadata) { + private void updateTaskState( + SystemResourceMigrationInfo migrationInfo, + Consumer listener, + Map metadata + ) { final SystemIndexMigrationTaskState newTaskState = new SystemIndexMigrationTaskState( migrationInfo.getCurrentResourceName(), migrationInfo.getFeatureName(), @@ -338,10 +340,10 @@ private void updateTaskState(SystemResourceMigrationInfo migrationInfo, Consumer ); logger.debug("updating task state to [{}]", Strings.toString(newTaskState)); currentFeatureCallbackMetadata.set(metadata); - updatePersistentTaskState(newTaskState, ActionListener.wrap(task -> { + updateProjectPersistentTaskState(projectId, newTaskState, ActionListener.wrap(task -> { assert newTaskState.equals(task.getState()) : "task state returned by update method did not match submitted task state"; logger.debug("new task state [{}] accepted", Strings.toString(newTaskState)); - listener.accept(clusterService.state()); + listener.accept(clusterService.state().metadata().getProject(projectId)); }, this::markAsFailed)); } @@ -357,11 +359,10 @@ private static boolean needToBeMigrated(Stream indicesMetadata) { private void migrateSingleIndex( SystemIndexMigrationInfo migrationInfo, - ClusterState clusterState, + ProjectMetadata projectMetadata, BiConsumer listener ) { String oldIndexName = migrationInfo.getCurrentIndexName(); - final ProjectMetadata projectMetadata = clusterState.metadata().getProject(); final IndexMetadata imd = projectMetadata.index(oldIndexName); if (imd.getState().equals(CLOSE)) { logger.error( @@ -570,7 +571,7 @@ private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, A aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName()); // Copy all the aliases from the old index - IndexMetadata imd = clusterService.state().metadata().getProject().index(migrationInfo.getCurrentIndexName()); + IndexMetadata imd = clusterService.state().metadata().getProject(projectId).index(migrationInfo.getCurrentIndexName()); imd.getAliases().values().forEach(aliasToAdd -> { aliasesRequest.addAliasAction( IndicesAliasesRequest.AliasActions.add() @@ -589,7 +590,6 @@ private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, A /** * Sets the write block on the index to the given value. */ - @FixForMultiProject(description = "Don't use default project id to update settings") private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener listener) { if (readOnlyValue) { // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all @@ -847,6 +847,7 @@ public void markAsFailed(Exception e) { MigrationResultsUpdateTask.upsert( featureName, + projectId, SingleFeatureMigrationResult.failure(indexName, e), ActionListener.wrap(state -> super.markAsFailed(e), exception -> super.markAsFailed(e)) ).submit(clusterService); @@ -855,14 +856,13 @@ public void markAsFailed(Exception e) { /** * Creates a task that will clear the results of previous migration attempts. - * @param clusterService The cluster service. * @param listener A listener that will be called upon successfully updating the cluster state. */ - private static void clearResults(ClusterService clusterService, ActionListener listener) { + private void clearResults(ActionListener listener) { submitUnbatchedTask(clusterService, "clear migration results", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - final var project = currentState.metadata().getProject(); + final var project = currentState.metadata().getProject(projectId); if (project.custom(FeatureMigrationResults.TYPE) != null) { return ClusterState.builder(currentState) .putProjectMetadata(ProjectMetadata.builder(project).removeCustom(FeatureMigrationResults.TYPE)) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java index 366a463b878ba..70ae101e9b973 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java @@ -12,7 +12,6 @@ import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.plugins.SystemIndexPlugin; @@ -73,29 +72,24 @@ public int compareTo(SystemResourceMigrationInfo o) { /** * Invokes the pre-migration hook for the feature that owns this index. - * See {@link SystemIndexPlugin#prepareForIndicesMigration(ClusterService, Client, ActionListener)}. - * @param clusterService For retrieving the state. + * See {@link SystemIndexPlugin#prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)}. + * @param project The project metadata * @param client For performing any update operations necessary to prepare for the upgrade. * @param listener Call {@link ActionListener#onResponse(Object)} when preparation for migration is complete. */ - void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - owningFeature.getPreMigrationFunction().prepareForIndicesMigration(clusterService, client, listener); + void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { + owningFeature.getPreMigrationFunction().prepareForIndicesMigration(project, client, listener); } /** * Invokes the post-migration hooks for the feature that owns this index. - * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, ClusterService, Client, ActionListener)}. + * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, Client, ActionListener)}. + * * @param metadata The metadata that was passed into the listener by the pre-migration hook. - * @param clusterService For retrieving the state. * @param client For performing any update operations necessary to prepare for the upgrade. * @param listener Call {@link ActionListener#onResponse(Object)} when the hook is finished. */ - void indicesMigrationComplete( - Map metadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { - owningFeature.getPostMigrationFunction().indicesMigrationComplete(metadata, clusterService, client, listener); + void indicesMigrationComplete(Map metadata, Client client, ActionListener listener) { + owningFeature.getPostMigrationFunction().indicesMigrationComplete(metadata, client, listener); } } diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java index 355ce90a14c80..c72bbd0bc88ef 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java @@ -7,11 +7,10 @@ package org.elasticsearch.system_indices.action; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; @@ -41,14 +40,14 @@ public class TransportGetFeatureUpgradeStatusActionTests extends ESTestCase { private static String TEST_SYSTEM_INDEX_PATTERN = ".test*"; // Version just before MINIMUM_COMPATIBLE in order to check that UpgradeStatus.MIGRATION_NEEDED is set correctly private static final IndexVersion TEST_OLD_VERSION = IndexVersion.fromId(IndexVersions.MINIMUM_COMPATIBLE.id() - 1); - private static final ClusterState CLUSTER_STATE = getClusterState(); private static final String TEST_INDEX_1_NAME = ".test-index-1"; - private static final SystemIndices.Feature FEATURE = getFeature(); + private final ProjectMetadata projectMetadata = getProjectMetadata(); + public void testGetFeatureStatus() { GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus status = TransportGetFeatureUpgradeStatusAction.getFeatureUpgradeStatus( - CLUSTER_STATE, + projectMetadata, FEATURE ); @@ -60,7 +59,7 @@ public void testGetFeatureStatus() { public void testGetIndexInfos() { List versions = TransportGetFeatureUpgradeStatusAction.getIndexInfos( - CLUSTER_STATE, + projectMetadata, FEATURE ); @@ -85,7 +84,7 @@ public void testGetIndexInfos() { public void testGetIndexInfosWithErrors() { List versions = TransportGetFeatureUpgradeStatusAction.getIndexInfos( - getClusterStateWithFailedMigration(TEST_INDEX_1_NAME), + getProjectWithFailedMigration(TEST_INDEX_1_NAME), FEATURE ); @@ -113,7 +112,7 @@ public void testGetIndexInfosWithErrors() { public void testGetIndexInfosWithDataStreamErrors() { List versions = TransportGetFeatureUpgradeStatusAction.getIndexInfos( - getClusterStateWithFailedMigration(DATA_STREAM_NAME), + getProjectWithFailedMigration(DATA_STREAM_NAME), FEATURE ); @@ -165,7 +164,7 @@ private static SystemIndices.Feature getFeature() { return feature; } - private static ClusterState getClusterState() { + private static ProjectMetadata getProjectMetadata() { IndexMetadata indexMetadata1 = IndexMetadata.builder(TEST_INDEX_1_NAME) .settings(Settings.builder().put("index.version.created", IndexVersion.current()).build()) .numberOfShards(1) @@ -191,21 +190,17 @@ private static ClusterState getClusterState() { .setHidden(true) .build(); - ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE).metadata( - new Metadata.Builder().dataStreams(Map.of(DATA_STREAM_NAME, dataStream), Collections.emptyMap()) - .indices(Map.of(TEST_INDEX_1_NAME, indexMetadata1, ".test-index-2", indexMetadata2, BACKING_INDEX_NAME, dsIndexMetadata)) - .build() - ).build(); - return clusterState; + return ProjectMetadata.builder(randomProjectIdOrDefault()) + .dataStreams(Map.of(DATA_STREAM_NAME, dataStream), Collections.emptyMap()) + .indices(Map.of(TEST_INDEX_1_NAME, indexMetadata1, ".test-index-2", indexMetadata2, BACKING_INDEX_NAME, dsIndexMetadata)) + .build(); } - private static ClusterState getClusterStateWithFailedMigration(String failedIndexName) { + private static ProjectMetadata getProjectWithFailedMigration(String failedIndexName) { SingleFeatureMigrationResult migrationResult = SingleFeatureMigrationResult.failure(failedIndexName, new Exception()); FeatureMigrationResults featureMigrationResults = new FeatureMigrationResults(Map.of(FEATURE_NAME, migrationResult)); - ClusterState initialState = getClusterState(); - return ClusterState.builder(initialState) - .metadata(Metadata.builder(initialState.metadata()).putCustom(FeatureMigrationResults.TYPE, featureMigrationResults).build()) - .build(); + ProjectMetadata initialProject = getProjectMetadata(); + return ProjectMetadata.builder(initialProject).putCustom(FeatureMigrationResults.TYPE, featureMigrationResults).build(); } } diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java index bea8ebf606a25..264a525fd8378 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java @@ -96,8 +96,9 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { clusterTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(), containsInAnyOrder("health-node") ); - assertThat(metadata.getProject().customs().keySet(), containsInAnyOrder("persistent_tasks", "index-graveyard")); - final var projectTasks = PersistentTasksCustomMetadata.get(metadata.getProject()); + final var project = metadata.getProject(ProjectId.DEFAULT); + assertThat(project.customs().keySet(), containsInAnyOrder("persistent_tasks", "index-graveyard")); + final var projectTasks = PersistentTasksCustomMetadata.get(project); assertThat( projectTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(), containsInAnyOrder("upgrade-system-indices") diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 0c8cec2c8d218..83adee27248be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -2042,8 +2043,8 @@ public static SystemIndexDescriptor getInferenceIndexSystemIndexDescriptor() { } @Override - public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - boolean isAlreadyInUpgradeMode = MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode(); + public void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { + boolean isAlreadyInUpgradeMode = MlMetadata.getMlMetadata(project).isUpgradeMode(); if (isAlreadyInUpgradeMode) { // ML is already in upgrade mode, so nothing will write to the ML system indices during their upgrade listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", true)); @@ -2060,12 +2061,7 @@ public void prepareForIndicesMigration(ClusterService clusterService, Client cli } @Override - public void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { + public void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener) { boolean wasAlreadyInUpgradeMode = (boolean) preUpgradeMetadata.getOrDefault("already_in_upgrade_mode", false); if (wasAlreadyInUpgradeMode) { // ML was already in upgrade mode before system indices upgrade started - we shouldn't disable it diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java index 8a05537917abe..9d77afb69209c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java @@ -11,10 +11,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; @@ -64,8 +61,6 @@ public class MachineLearningTests extends ESTestCase { @SuppressWarnings("unchecked") public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOException { ThreadPool threadpool = new TestThreadPool("test"); - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadpool); doAnswer(invocationOnMock -> { @@ -77,7 +72,7 @@ public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOExcep try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); - machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); + machineLearning.prepareForIndicesMigration(emptyProject(), client, ActionTestUtils.assertNoFailureListener(response::set)); assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false))); verify(client).execute( @@ -88,7 +83,6 @@ public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOExcep machineLearning.indicesMigrationComplete( response.get(), - clusterService, client, ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); @@ -104,25 +98,21 @@ public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOExcep } public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException { - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn( - ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build())) - .build() - ); + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build()) + .build(); Client client = mock(Client.class); try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); - machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); + machineLearning.prepareForIndicesMigration(project, client, ActionTestUtils.assertNoFailureListener(response::set)); assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true))); verifyNoMoreInteractions(client); machineLearning.indicesMigrationComplete( response.get(), - clusterService, client, ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 27c715a2649a4..ef59f057c1319 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -185,8 +186,8 @@ protected XPackLicenseState getLicenseState() { } @Override - public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - if (TransformMetadata.upgradeMode(clusterService.state())) { + public void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { + if (TransformMetadata.upgradeMode(project)) { // Transform is already in upgrade mode, so nothing will write to the Transform system indices during their upgrade listener.onResponse(Map.of("already_in_upgrade_mode", true)); return; @@ -202,12 +203,7 @@ public void prepareForIndicesMigration(ClusterService clusterService, Client cli } @Override - public void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { + public void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener) { var wasAlreadyInUpgradeMode = (boolean) preUpgradeMetadata.getOrDefault("already_in_upgrade_mode", false); if (wasAlreadyInUpgradeMode) { // Transform was already in upgrade mode before system indices upgrade started - we shouldn't disable it diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java index 8f2008dab55db..108bbab85935e 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java @@ -7,17 +7,11 @@ package org.elasticsearch.xpack.transform; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.transform.TransformMetadata; -import static org.hamcrest.Matchers.equalTo; - public class TransformMetadataTests extends AbstractChunkedSerializingTestCase { @Override @@ -41,39 +35,4 @@ protected TransformMetadata mutateInstance(TransformMetadata instance) { .upgradeMode(instance.upgradeMode() == false) .build(); } - - public void testTransformMetadataFromClusterState() { - var expectedTransformMetadata = new TransformMetadata.Builder().resetMode(true).upgradeMode(true).build(); - var projectId = randomUniqueProjectId(); - var clusterState = ClusterState.builder(new ClusterName("_name")) - .metadata( - Metadata.builder().put(ProjectMetadata.builder(projectId).putCustom(TransformMetadata.TYPE, expectedTransformMetadata)) - ) - .build(); - - assertThat(TransformMetadata.transformMetadata(clusterState, projectId), equalTo(expectedTransformMetadata)); - assertThat(TransformMetadata.getTransformMetadata(clusterState), equalTo(expectedTransformMetadata)); - } - - public void testTransformMetadataFromMissingClusterState() { - assertThat(TransformMetadata.transformMetadata(null, randomUniqueProjectId()), equalTo(TransformMetadata.EMPTY_METADATA)); - assertThat(TransformMetadata.getTransformMetadata(null), equalTo(TransformMetadata.EMPTY_METADATA)); - } - - public void testTransformMetadataFromMissingProjectId() { - assertThat( - TransformMetadata.transformMetadata(ClusterState.builder(new ClusterName("_name")).build(), null), - equalTo(TransformMetadata.EMPTY_METADATA) - ); - } - - public void testTransformMetadataWhenAbsentFromClusterState() { - var projectId = randomUniqueProjectId(); - var clusterState = ClusterState.builder(new ClusterName("_name")) - .metadata(Metadata.builder().put(ProjectMetadata.builder(projectId))) - .build(); - - assertThat(TransformMetadata.transformMetadata(clusterState, projectId), equalTo(TransformMetadata.EMPTY_METADATA)); - assertThat(TransformMetadata.getTransformMetadata(clusterState), equalTo(TransformMetadata.EMPTY_METADATA)); - } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java index 1e8bcc5b96882..5d04f026d9ef2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java @@ -12,10 +12,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -40,8 +37,6 @@ public class TransformTests extends ESTestCase { public void testSetTransformUpgradeMode() { var threadPool = new TestThreadPool("testSetTransformUpgradeMode"); - ClusterService clusterService = mock(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); Client client = mock(); when(client.threadPool()).thenReturn(threadPool); doAnswer(invocationOnMock -> { @@ -52,14 +47,13 @@ public void testSetTransformUpgradeMode() { try (var transformPlugin = new Transform(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); - transformPlugin.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); + transformPlugin.prepareForIndicesMigration(emptyProject(), client, ActionTestUtils.assertNoFailureListener(response::set)); assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false))); verify(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeActionRequest(true)), any()); transformPlugin.indicesMigrationComplete( response.get(), - clusterService, client, ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); @@ -71,24 +65,20 @@ public void testSetTransformUpgradeMode() { } public void testIgnoreSetTransformUpgradeMode() { - ClusterService clusterService = mock(); - when(clusterService.state()).thenReturn( - ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().putCustom(TransformMetadata.TYPE, new TransformMetadata.Builder().upgradeMode(true).build())) - .build() - ); + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .putCustom(TransformMetadata.TYPE, new TransformMetadata.Builder().upgradeMode(true).build()) + .build(); Client client = mock(); try (var transformPlugin = new Transform(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); - transformPlugin.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); + transformPlugin.prepareForIndicesMigration(project, client, ActionTestUtils.assertNoFailureListener(response::set)); assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true))); verifyNoMoreInteractions(client); transformPlugin.indicesMigrationComplete( response.get(), - clusterService, client, ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 657c307897425..799942ce2124b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -21,7 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -36,7 +36,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.NotMultiProjectCapable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.env.Environment; @@ -825,12 +824,11 @@ public String getFeatureName() { } @Override - @NotMultiProjectCapable(description = "Watcher is not available in serverless") - public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { + public void prepareForIndicesMigration(ProjectMetadata project, Client client, ActionListener> listener) { Client originClient = new OriginSettingClient(client, WATCHER_ORIGIN); - boolean manuallyStopped = Optional.ofNullable( - clusterService.state().metadata().getProject(ProjectId.DEFAULT).custom(WatcherMetadata.TYPE) - ).map(WatcherMetadata::manuallyStopped).orElse(false); + boolean manuallyStopped = Optional.ofNullable(project.custom(WatcherMetadata.TYPE)) + .map(WatcherMetadata::manuallyStopped) + .orElse(false); if (manuallyStopped == false) { WatcherServiceRequest serviceRequest = new WatcherServiceRequest(TimeValue.THIRTY_SECONDS /* TODO should this be longer? */); @@ -845,12 +843,7 @@ public void prepareForIndicesMigration(ClusterService clusterService, Client cli } @Override - public void indicesMigrationComplete( - Map preUpgradeMetadata, - ClusterService clusterService, - Client client, - ActionListener listener - ) { + public void indicesMigrationComplete(Map preUpgradeMetadata, Client client, ActionListener listener) { Client originClient = new OriginSettingClient(client, WATCHER_ORIGIN); boolean manuallyStopped = (boolean) preUpgradeMetadata.getOrDefault("manually_stopped", false); if (manuallyStopped == false) { From 31fe5fd571067895f91ecbf267276d504d55388a Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Tue, 12 Aug 2025 08:15:48 +0200 Subject: [PATCH 2/2] Rename test method --- .../CopyLifecycleIndexMetadataTransportActionIT.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java index 79fc569a49cc8..ad7ea0279fa56 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportActionIT.java @@ -148,13 +148,13 @@ public void testILMState() throws Exception { var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex))); - IndexMetadata destBefore = getClusterMetadata(destIndex).index(destIndex); + IndexMetadata destBefore = getProjectMetadata(destIndex).index(destIndex); assertNull(destBefore.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)); // copy over the metadata copyMetadata(backingIndex, destIndex); - var metadataAfter = getClusterMetadata(backingIndex, destIndex); + var metadataAfter = getProjectMetadata(backingIndex, destIndex); IndexMetadata sourceAfter = metadataAfter.index(backingIndex); IndexMetadata destAfter = metadataAfter.index(destIndex); assertNotNull(destAfter.getCustomData(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY)); @@ -183,7 +183,7 @@ public void testRolloverInfos() throws Exception { var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); safeGet(indicesAdmin().create(new CreateIndexRequest(destIndex))); - var metadataBefore = getClusterMetadata(backingIndex, destIndex); + var metadataBefore = getProjectMetadata(backingIndex, destIndex); IndexMetadata source = metadataBefore.index(backingIndex); IndexMetadata destBefore = metadataBefore.index(destIndex); @@ -199,7 +199,7 @@ public void testRolloverInfos() throws Exception { copyMetadata(backingIndex, destIndex); // now rollover info should be equal - IndexMetadata destAfter = getClusterMetadata(destIndex).index(destIndex); + IndexMetadata destAfter = getProjectMetadata(destIndex).index(destIndex); assertEquals(source.getRolloverInfos(), destAfter.getRolloverInfos()); } } @@ -283,7 +283,7 @@ private String rollover(String dataStream) { return rolloverResponse.getNewIndex(); } - private ProjectMetadata getClusterMetadata(String... indices) { + private ProjectMetadata getProjectMetadata(String... indices) { return safeGet(clusterAdmin().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).indices(indices))).getState() .metadata() .getProject(ProjectId.DEFAULT);