From f22f5563d4f0c836a7534564207b8d415ed90d8f Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 30 Jul 2025 12:10:06 -0400 Subject: [PATCH 1/3] [ML][Transform] Move upgrade mode check into API Transform and ML's System Index Migration code uses deprecated API to access the ClusterState. Rather than refactor all System Index Migration logic to depend on ProjectState, we can move those checks into the API layer and perform them there. Both API now return alreadyInUpgradeMode if the API exits early for that reason. --- .../org/elasticsearch/TransportVersions.java | 1 + ...AbstractTransportSetUpgradeModeAction.java | 13 ++- .../action/SetUpgradeModeActionResponse.java | 42 ++++++++ .../core/ml/action/SetUpgradeModeAction.java | 4 +- .../action/SetTransformUpgradeModeAction.java | 4 +- .../xpack/ml/MachineLearning.java | 10 +- .../xpack/ml/MachineLearningTests.java | 95 ++++++++----------- .../xpack/transform/Transform.java | 8 +- .../xpack/transform/TransformTests.java | 81 +++++++++------- 9 files changed, 143 insertions(+), 115 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index cc286522a0b7c..13c9fd161c61c 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -355,6 +355,7 @@ static TransportVersion def(int id) { public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00); public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00); public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00); + public static final TransportVersion ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON = def(9_133_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java index ae94f7c8e5146..5872a3d93fb6b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeAction.java @@ -14,7 +14,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SimpleBatchedExecutor; @@ -33,7 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean; -public abstract class AbstractTransportSetUpgradeModeAction extends AcknowledgedTransportMasterNodeAction { +public abstract class AbstractTransportSetUpgradeModeAction extends TransportMasterNodeAction< + SetUpgradeModeActionRequest, + SetUpgradeModeActionResponse> { private static final Logger logger = LogManager.getLogger(AbstractTransportSetUpgradeModeAction.class); private final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -54,6 +56,7 @@ public AbstractTransportSetUpgradeModeAction( threadPool, actionFilters, SetUpgradeModeActionRequest::new, + SetUpgradeModeActionResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); @@ -65,7 +68,7 @@ protected void masterOperation( Task task, SetUpgradeModeActionRequest request, ClusterState state, - ActionListener listener + ActionListener listener ) throws Exception { // Don't want folks spamming this endpoint while it is in progress, only allow one request to be handled at a time if (isRunning.compareAndSet(false, true) == false) { @@ -93,7 +96,7 @@ protected void masterOperation( if (request.enabled() == upgradeMode) { logger.info("Upgrade mode noop"); isRunning.set(false); - listener.onResponse(AcknowledgedResponse.TRUE); + listener.onResponse(new SetUpgradeModeActionResponse(true, upgradeMode)); return; } @@ -107,7 +110,7 @@ protected void masterOperation( ActionListener wrappedListener = ActionListener.wrap(r -> { logger.info("Finished setting [upgrade_mode] for feature name [{}]", featureName()); isRunning.set(false); - listener.onResponse(r); + listener.onResponse(new SetUpgradeModeActionResponse(r.isAcknowledged(), false)); }, e -> { logger.info("Failed to set [upgrade_mode] for feature name [{}]", featureName()); isRunning.set(false); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java new file mode 100644 index 0000000000000..e2a586bfc99ed --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.TransportVersions.ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON; + +public class SetUpgradeModeActionResponse extends AcknowledgedResponse { + private final boolean alreadyInUpgradeMode; + + public SetUpgradeModeActionResponse(StreamInput in) throws IOException { + super(in); + this.alreadyInUpgradeMode = in.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON) && in.readBoolean(); + } + + public SetUpgradeModeActionResponse(boolean acknowledged, boolean modified) { + super(acknowledged); + this.alreadyInUpgradeMode = modified; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON)) { + out.writeBoolean(alreadyInUpgradeMode); + } + } + + public boolean alreadyInUpgradeMode() { + return alreadyInUpgradeMode; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java index a67ae33e85801..c45627daaa6ac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java @@ -7,15 +7,15 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; import java.io.IOException; -public class SetUpgradeModeAction extends ActionType { +public class SetUpgradeModeAction extends ActionType { public static final SetUpgradeModeAction INSTANCE = new SetUpgradeModeAction(); public static final String NAME = "cluster:admin/xpack/ml/upgrade_mode"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java index b93df8125708f..b6443d3199d4a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetTransformUpgradeModeAction.java @@ -8,9 +8,9 @@ package org.elasticsearch.xpack.core.transform.action; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; -public class SetTransformUpgradeModeAction extends ActionType { +public class SetTransformUpgradeModeAction extends ActionType { public static final SetTransformUpgradeModeAction INSTANCE = new SetTransformUpgradeModeAction(); public static final String NAME = "cluster:admin/transform/upgrade_mode"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 0c8cec2c8d218..2befe529983d1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -461,7 +461,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -2043,19 +2042,12 @@ public static SystemIndexDescriptor getInferenceIndexSystemIndexDescriptor() { @Override public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - boolean isAlreadyInUpgradeMode = MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode(); - if (isAlreadyInUpgradeMode) { - // ML is already in upgrade mode, so nothing will write to the ML system indices during their upgrade - listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", true)); - return; - } - // Enable ML upgrade mode before upgrading the ML system indices to ensure nothing writes to them during the upgrade Client originClient = new OriginSettingClient(client, ML_ORIGIN); originClient.execute( SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true), - listener.delegateFailureAndWrap((l, r) -> l.onResponse(Collections.singletonMap("already_in_upgrade_mode", false))) + listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", r.alreadyInUpgradeMode()))) ); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java index 8a05537917abe..74116639446f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java @@ -9,22 +9,21 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.node.Node; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsAction; import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; @@ -43,6 +42,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -55,36 +56,54 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class MachineLearningTests extends ESTestCase { - @SuppressWarnings("unchecked") public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOException { - ThreadPool threadpool = new TestThreadPool("test"); + testUpgradeMode(false); + } + + public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException { + testUpgradeMode(true); + } + + private static void testUpgradeMode(boolean alreadyInUpgradeMode) throws IOException { + ThreadPool threadpool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), MachineLearningTests.class.getSimpleName()).put(Settings.EMPTY).build(), + MeterRegistry.NOOP, + (settings1, allocatedProcessors) -> Map.of() + ) { + @Override + public ExecutorService executor(String name) { + return EsExecutors.DIRECT_EXECUTOR_SERVICE; + } + + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) { + command.run(); + return null; + } + }; ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadpool); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any(ActionListener.class)); + ActionListener listener = invocationOnMock.getArgument(2); + listener.onResponse(new SetUpgradeModeActionResponse(true, alreadyInUpgradeMode)); + return Void.TYPE; + }).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any()); try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false))); - verify(client).execute( - same(SetUpgradeModeAction.INSTANCE), - eq(new SetUpgradeModeAction.Request(true)), - any(ActionListener.class) - ); + assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", alreadyInUpgradeMode))); + verify(client).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(true)), any()); machineLearning.indicesMigrationComplete( response.get(), @@ -93,45 +112,13 @@ public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOExcep ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); - verify(client).execute( - same(SetUpgradeModeAction.INSTANCE), - eq(new SetUpgradeModeAction.Request(false)), - any(ActionListener.class) - ); + var timesCalled = alreadyInUpgradeMode ? never() : times(1); + verify(client, timesCalled).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(false)), any()); } finally { threadpool.shutdown(); } } - public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException { - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn( - ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build())) - .build() - ); - Client client = mock(Client.class); - - try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) { - - SetOnce> response = new SetOnce<>(); - machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true))); - verifyNoMoreInteractions(client); - - machineLearning.indicesMigrationComplete( - response.get(), - clusterService, - client, - ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) - ); - - // Neither pre nor post should have called any action - verifyNoMoreInteractions(client); - } - } - public void testMaxOpenWorkersSetting_givenDefault() { int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY); assertEquals(512, maxOpenWorkers); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 27c715a2649a4..af5b79c25ea34 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -186,18 +186,12 @@ protected XPackLicenseState getLicenseState() { @Override public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { - if (TransformMetadata.upgradeMode(clusterService.state())) { - // Transform is already in upgrade mode, so nothing will write to the Transform system indices during their upgrade - listener.onResponse(Map.of("already_in_upgrade_mode", true)); - return; - } - // Enable Transform upgrade mode before upgrading the system indices to ensure nothing writes to them during the upgrade var originClient = new OriginSettingClient(client, TRANSFORM_ORIGIN); originClient.execute( SetTransformUpgradeModeAction.INSTANCE, new SetUpgradeModeActionRequest(true), - listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", false))) + listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", r.alreadyInUpgradeMode()))) ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java index 1e8bcc5b96882..e03234bc4eb43 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java @@ -10,21 +10,29 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest; +import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.SetTransformUpgradeModeAction; +import org.elasticsearch.xpack.transform.transforms.TransformPersistentTasksExecutorTests; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; @@ -32,29 +40,52 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class TransformTests extends ESTestCase { - public void testSetTransformUpgradeMode() { - var threadPool = new TestThreadPool("testSetTransformUpgradeMode"); + testUpgradeMode(true); + } + + public void testIgnoreSetTransformUpgradeMode() { + testUpgradeMode(false); + } + + private void testUpgradeMode(boolean alreadyInUpgradeMode) { + ThreadPool threadPool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), TransformTests.class.getSimpleName()).put(Settings.EMPTY).build(), + MeterRegistry.NOOP, + (settings1, allocatedProcessors) -> Map.of() + ) { + @Override + public ExecutorService executor(String name) { + return EsExecutors.DIRECT_EXECUTOR_SERVICE; + } + + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) { + command.run(); + return null; + } + }; ClusterService clusterService = mock(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); Client client = mock(); when(client.threadPool()).thenReturn(threadPool); doAnswer(invocationOnMock -> { - ActionListener listener = invocationOnMock.getArgument(2); - listener.onResponse(AcknowledgedResponse.TRUE); - return null; + ActionListener listener = invocationOnMock.getArgument(2); + listener.onResponse(new SetUpgradeModeActionResponse(true, alreadyInUpgradeMode)); + return Void.TYPE; }).when(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), any(), any()); try (var transformPlugin = new Transform(Settings.EMPTY)) { SetOnce> response = new SetOnce<>(); transformPlugin.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false))); + assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", alreadyInUpgradeMode))); verify(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeActionRequest(true)), any()); transformPlugin.indicesMigrationComplete( @@ -64,36 +95,14 @@ public void testSetTransformUpgradeMode() { ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) ); - verify(client).execute(same(SetTransformUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeActionRequest(false)), any()); + var timesCalled = alreadyInUpgradeMode ? never() : times(1); + verify(client, timesCalled).execute( + same(SetTransformUpgradeModeAction.INSTANCE), + eq(new SetUpgradeModeActionRequest(false)), + any() + ); } finally { terminate(threadPool); } } - - public void testIgnoreSetTransformUpgradeMode() { - ClusterService clusterService = mock(); - when(clusterService.state()).thenReturn( - ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().putCustom(TransformMetadata.TYPE, new TransformMetadata.Builder().upgradeMode(true).build())) - .build() - ); - Client client = mock(); - - try (var transformPlugin = new Transform(Settings.EMPTY)) { - SetOnce> response = new SetOnce<>(); - transformPlugin.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set)); - - assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true))); - verifyNoMoreInteractions(client); - - transformPlugin.indicesMigrationComplete( - response.get(), - clusterService, - client, - ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue) - ); - - verifyNoMoreInteractions(client); - } - } } From 7855214c4d70f8775cce945ade93180b5a9c5d84 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 30 Jul 2025 16:24:21 +0000 Subject: [PATCH 2/3] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/xpack/transform/Transform.java | 1 - .../org/elasticsearch/xpack/transform/TransformTests.java | 7 ------- 2 files changed, 8 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index af5b79c25ea34..a398ada4c8378 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -53,7 +53,6 @@ import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; -import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java index e03234bc4eb43..384dcfdc9316d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java @@ -11,9 +11,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -21,13 +18,10 @@ import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest; import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse; -import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.SetTransformUpgradeModeAction; -import org.elasticsearch.xpack.transform.transforms.TransformPersistentTasksExecutorTests; import java.util.Collections; import java.util.Map; @@ -43,7 +37,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class TransformTests extends ESTestCase { From 683a388a48b52a7f766c369e6e37081e3fd79f2e Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 30 Jul 2025 13:17:13 -0400 Subject: [PATCH 3/3] fix tests --- .../xpack/core/action/SetUpgradeModeActionResponse.java | 4 ++-- .../AbstractTransportSetUpgradeModeActionTests.java | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java index e2a586bfc99ed..d08bb826cf7bc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetUpgradeModeActionResponse.java @@ -23,9 +23,9 @@ public SetUpgradeModeActionResponse(StreamInput in) throws IOException { this.alreadyInUpgradeMode = in.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON) && in.readBoolean(); } - public SetUpgradeModeActionResponse(boolean acknowledged, boolean modified) { + public SetUpgradeModeActionResponse(boolean acknowledged, boolean alreadyInUpgradeMode) { super(acknowledged); - this.alreadyInUpgradeMode = modified; + this.alreadyInUpgradeMode = alreadyInUpgradeMode; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java index 09d033cd9de24..6a06d8a5b0913 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/AbstractTransportSetUpgradeModeActionTests.java @@ -26,6 +26,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -108,7 +109,7 @@ public void testUpdateDoesNotRun() throws Exception { var response = action.run(true); - assertThat(response.v1(), is(AcknowledgedResponse.TRUE)); + assertThat(response.v1(), equalTo(new SetUpgradeModeActionResponse(true, true))); assertThat(response.v2(), nullValue()); assertThat(shouldNotChange.get(), is(true)); } @@ -150,7 +151,7 @@ public void testSuccess() throws Exception { var response = action.run(true); - assertThat(response.v1(), is(AcknowledgedResponse.TRUE)); + assertThat(response.v1(), equalTo(new SetUpgradeModeActionResponse(true, false))); assertThat(response.v2(), nullValue()); } @@ -177,8 +178,8 @@ public void runWithoutWaiting(boolean upgrade) throws Exception { masterOperation(mock(), new SetUpgradeModeActionRequest(upgrade), ClusterState.EMPTY_STATE, ActionListener.noop()); } - public Tuple run(boolean upgrade) throws Exception { - AtomicReference> response = new AtomicReference<>(); + public Tuple run(boolean upgrade) throws Exception { + AtomicReference> response = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); masterOperation(mock(), new SetUpgradeModeActionRequest(upgrade), ClusterState.EMPTY_STATE, ActionListener.wrap(r -> { response.set(Tuple.tuple(r, null));