diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index e57cb485361b6..2113c462877de 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -355,6 +355,7 @@ static TransportVersion def(int id) { public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00); public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00); public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00); + public static final TransportVersion ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON = def(9_134_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java index ae94f7c8e5146..5872a3d93fb6b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java @@ -14,7 +14,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SimpleBatchedExecutor; @@ -33,7 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean; -public abstract class AbstractTransportSetUpgradeModeAction extends AcknowledgedTransportMasterNodeAction { +public abstract class AbstractTransportSetUpgradeModeAction extends TransportMasterNodeAction< + SetUpgradeModeActionRequest, + SetUpgradeModeActionResponse> { private static final Logger logger = LogManager.getLogger(AbstractTransportSetUpgradeModeAction.class); private final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -54,6 +56,7 @@ public AbstractTransportSetUpgradeModeAction( threadPool, actionFilters, SetUpgradeModeActionRequest::new, + SetUpgradeModeActionResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); @@ -65,7 +68,7 @@ protected void masterOperation( Task task, SetUpgradeModeActionRequest request, ClusterState state, - ActionListener listener + ActionListener listener ) throws Exception { // Don't want folks spamming this endpoint while it is in progress, only allow one request to be handled at a time if (isRunning.compareAndSet(false, true) == false) { @@ -93,7 +96,7 @@ protected void masterOperation( if (request.enabled() == upgradeMode) { logger.info("Upgrade mode noop"); isRunning.set(false); - listener.onResponse(AcknowledgedResponse.TRUE); + listener.onResponse(new SetUpgradeModeActionResponse(true, upgradeMode)); return; } @@ -107,7 +110,7 @@ protected void masterOperation( ActionListener wrappedListener = ActionListener.wrap(r -> { logger.info("Finished setting [upgrade_mode] for feature name [{}]", featureName()); isRunning.set(false); - listener.onResponse(r); + listener.onResponse(new SetUpgradeModeActionResponse(r.isAcknowledged(), false)); }, e -> { logger.info("Failed to set [upgrade_mode] for feature name [{}]", featureName()); isRunning.set(false); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java new file mode 100644 index 0000000000000..d08bb826cf7bc --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.TransportVersions.ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON; + +public class SetUpgradeModeActionResponse extends AcknowledgedResponse { + private final boolean alreadyInUpgradeMode; + + public SetUpgradeModeActionResponse(StreamInput in) throws IOException { + super(in); + this.alreadyInUpgradeMode = in.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON) && in.readBoolean(); + } + + public SetUpgradeModeActionResponse(boolean acknowledged, boolean alreadyInUpgradeMode) { + super(acknowledged); + this.alreadyInUpgradeMode = alreadyInUpgradeMode; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON)) { + out.writeBoolean(alreadyInUpgradeMode); + } + } + + public boolean alreadyInUpgradeMode() { + return alreadyInUpgradeMode; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java index a67ae33e85801..c45627daaa6ac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java @@ -7,15 +7,15 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; import java.io.IOException; -public class SetUpgradeModeAction extends ActionType { +public class SetUpgradeModeAction extends ActionType { public static final SetUpgradeModeAction INSTANCE = new SetUpgradeModeAction(); public static final String NAME = "cluster:admin/xpack/ml/upgrade_mode"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java index b93df8125708f..b6443d3199d4a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java @@ -8,9 +8,9 @@ package org.elasticsearch.xpack.core.transform.action; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; -public class SetTransformUpgradeModeAction extends ActionType { +public class SetTransformUpgradeModeAction extends ActionType { public static final SetTransformUpgradeModeAction INSTANCE = new SetTransformUpgradeModeAction(); public static final String NAME = "cluster:admin/transform/upgrade_mode"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java index 09d033cd9de24..6a06d8a5b0913 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java @@ -26,6 +26,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -108,7 +109,7 @@ public void testUpdateDoesNotRun() throws Exception { var response = action.run(true); - assertThat(response.v1(), is(AcknowledgedResponse.TRUE)); + assertThat(response.v1(), equalTo(new SetUpgradeModeActionResponse(true, true))); assertThat(response.v2(), nullValue()); assertThat(shouldNotChange.get(), is(true)); } @@ -150,7 +151,7 @@ public void testSuccess() throws Exception { var response = action.run(true); - assertThat(response.v1(), is(AcknowledgedResponse.TRUE)); + assertThat(response.v1(), equalTo(new SetUpgradeModeActionResponse(true, false))); assertThat(response.v2(), nullValue()); } @@ -177,8 +178,8 @@ public void runWithoutWaiting(boolean upgrade) throws Exception { masterOperation(mock(), new SetUpgradeModeActionRequest(upgrade), ClusterState.EMPTY_STATE, ActionListener.noop()); } - public Tuple run(boolean upgrade) throws Exception { - AtomicReference> response = new AtomicReference<>(); + public Tuple run(boolean upgrade) throws Exception { + AtomicReference> response = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); masterOperation(mock(), new SetUpgradeModeActionRequest(upgrade), ClusterState.EMPTY_STATE, ActionListener.wrap(r -> { response.set(Tuple.tuple(r, null)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 0c8cec2c8d218..2befe529983d1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -461,7 +461,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -2043,19 +2042,12 @@ public static SystemIndexDescriptor getInferenceIndexSystemIndexDescriptor() { @Override public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - boolean isAlreadyInUpgradeMode = MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode(); - if (isAlreadyInUpgradeMode) { - // ML is already in upgrade mode, so nothing will write to the ML system indices during their upgrade - listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", true)); - return; - } - // Enable ML upgrade mode before upgrading the ML system indices to ensure nothing writes to them during the upgrade Client originClient = new OriginSettingClient(client, ML_ORIGIN); originClient.execute( SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true), - listener.delegateFailureAndWrap((l, r) -> l.onResponse(Collections.singletonMap("already_in_upgrade_mode", false))) + listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", r.alreadyInUpgradeMode()))) ); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java index 8a05537917abe..74116639446f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java @@ -9,22 +9,21 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.node.Node; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsAction; import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; @@ -43,6 +42,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -55,36 +56,54 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class MachineLearningTests extends ESTestCase { - @SuppressWarnings("unchecked") public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOException { - ThreadPool threadpool = new TestThreadPool("test"); + testUpgradeMode(false); + } + + public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException { + testUpgradeMode(true); + } + + private static void testUpgradeMode(boolean alreadyInUpgradeMode) throws IOException { + ThreadPool threadpool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), MachineLearningTests.class.getSimpleName()).put(Settings.EMPTY).build(), + MeterRegistry.NOOP, + (settings1, allocatedProcessors) -> Map.of() + ) { + @Override + public ExecutorService executor(String name) { + return EsExecutors.DIRECT_EXECUTOR_SERVICE; + } + + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) { + command.run(); + return null; + } + }; ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadpool); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any(ActionListener.class)); + ActionListener listener = invocationOnMock.getArgument(2); + listener.onResponse(new SetUpgradeModeActionResponse(true, alreadyInUpgradeMode)); + return Void.TYPE; + }).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any()); try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false))); - verify(client).execute( - same(SetUpgradeModeAction.INSTANCE), - eq(new SetUpgradeModeAction.Request(true)), - any(ActionListener.class) - ); + assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", alreadyInUpgradeMode))); + verify(client).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(true)), any()); machineLearning.indicesMigrationComplete( response.get(), @@ -93,45 +112,13 @@ public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOExcep ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); - verify(client).execute( - same(SetUpgradeModeAction.INSTANCE), - eq(new SetUpgradeModeAction.Request(false)), - any(ActionListener.class) - ); + var timesCalled = alreadyInUpgradeMode ? never() : times(1); + verify(client, timesCalled).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(false)), any()); } finally { threadpool.shutdown(); } } - public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException { - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn( - ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build())) - .build() - ); - Client client = mock(Client.class); - - try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) { - - SetOnce> response = new SetOnce<>(); - machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true))); - verifyNoMoreInteractions(client); - - machineLearning.indicesMigrationComplete( - response.get(), - clusterService, - client, - ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) - ); - - // Neither pre nor post should have called any action - verifyNoMoreInteractions(client); - } - } - public void testMaxOpenWorkersSetting_givenDefault() { int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY); assertEquals(512, maxOpenWorkers); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 27c715a2649a4..a398ada4c8378 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -53,7 +53,6 @@ import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; -import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; @@ -186,18 +185,12 @@ protected XPackLicenseState getLicenseState() { @Override public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - if (TransformMetadata.upgradeMode(clusterService.state())) { - // Transform is already in upgrade mode, so nothing will write to the Transform system indices during their upgrade - listener.onResponse(Map.of("already_in_upgrade_mode", true)); - return; - } - // Enable Transform upgrade mode before upgrading the system indices to ensure nothing writes to them during the upgrade var originClient = new OriginSettingClient(client, TRANSFORM_ORIGIN); originClient.execute( SetTransformUpgradeModeAction.INSTANCE, new SetUpgradeModeActionRequest(true), - listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", false))) + listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", r.alreadyInUpgradeMode()))) ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java index 1e8bcc5b96882..384dcfdc9316d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java @@ -10,21 +10,23 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest; -import org.elasticsearch.xpack.core.transform.TransformMetadata; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; import org.elasticsearch.xpack.core.transform.action.SetTransformUpgradeModeAction; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; @@ -32,29 +34,51 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class TransformTests extends ESTestCase { - public void testSetTransformUpgradeMode() { - var threadPool = new TestThreadPool("testSetTransformUpgradeMode"); + testUpgradeMode(true); + } + + public void testIgnoreSetTransformUpgradeMode() { + testUpgradeMode(false); + } + + private void testUpgradeMode(boolean alreadyInUpgradeMode) { + ThreadPool threadPool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), TransformTests.class.getSimpleName()).put(Settings.EMPTY).build(), + MeterRegistry.NOOP, + (settings1, allocatedProcessors) -> Map.of() + ) { + @Override + public ExecutorService executor(String name) { + return EsExecutors.DIRECT_EXECUTOR_SERVICE; + } + + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) { + command.run(); + return null; + } + }; ClusterService clusterService = mock(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); Client client = mock(); when(client.threadPool()).thenReturn(threadPool); doAnswer(invocationOnMock -> { - ActionListener listener = invocationOnMock.getArgument(2); - listener.onResponse(AcknowledgedResponse.TRUE); - return null; + ActionListener listener = invocationOnMock.getArgument(2); + listener.onResponse(new SetUpgradeModeActionResponse(true, alreadyInUpgradeMode)); + return Void.TYPE; }).when(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), any(), any()); try (var transformPlugin = new Transform(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); transformPlugin.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false))); + assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", alreadyInUpgradeMode))); verify(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeActionRequest(true)), any()); transformPlugin.indicesMigrationComplete( @@ -64,36 +88,14 @@ public void testSetTransformUpgradeMode() { ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); - verify(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeActionRequest(false)), any()); + var timesCalled = alreadyInUpgradeMode ? never() : times(1); + verify(client, timesCalled).execute( + same(SetTransformUpgradeModeAction.INSTANCE), + eq(new SetUpgradeModeActionRequest(false)), + any() + ); } finally { terminate(threadPool); } } - - public void testIgnoreSetTransformUpgradeMode() { - ClusterService clusterService = mock(); - when(clusterService.state()).thenReturn( - ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().putCustom(TransformMetadata.TYPE, new TransformMetadata.Builder().upgradeMode(true).build())) - .build() - ); - Client client = mock(); - - try (var transformPlugin = new Transform(Settings.EMPTY)) { - SetOnce> response = new SetOnce<>(); - transformPlugin.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true))); - verifyNoMoreInteractions(client); - - transformPlugin.indicesMigrationComplete( - response.get(), - clusterService, - client, - ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) - ); - - verifyNoMoreInteractions(client); - } - } }