Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00);
public static final TransportVersion ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON = def(9_134_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
Expand All @@ -33,7 +33,9 @@

import java.util.concurrent.atomic.AtomicBoolean;

public abstract class AbstractTransportSetUpgradeModeAction extends AcknowledgedTransportMasterNodeAction<SetUpgradeModeActionRequest> {
public abstract class AbstractTransportSetUpgradeModeAction extends TransportMasterNodeAction<
SetUpgradeModeActionRequest,
SetUpgradeModeActionResponse> {

private static final Logger logger = LogManager.getLogger(AbstractTransportSetUpgradeModeAction.class);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
Expand All @@ -54,6 +56,7 @@ public AbstractTransportSetUpgradeModeAction(
threadPool,
actionFilters,
SetUpgradeModeActionRequest::new,
SetUpgradeModeActionResponse::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);

Expand All @@ -65,7 +68,7 @@ protected void masterOperation(
Task task,
SetUpgradeModeActionRequest request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
ActionListener<SetUpgradeModeActionResponse> listener
) throws Exception {
// Don't want folks spamming this endpoint while it is in progress, only allow one request to be handled at a time
if (isRunning.compareAndSet(false, true) == false) {
Expand Down Expand Up @@ -93,7 +96,7 @@ protected void masterOperation(
if (request.enabled() == upgradeMode) {
logger.info("Upgrade mode noop");
isRunning.set(false);
listener.onResponse(AcknowledgedResponse.TRUE);
listener.onResponse(new SetUpgradeModeActionResponse(true, upgradeMode));
return;
}

Expand All @@ -107,7 +110,7 @@ protected void masterOperation(
ActionListener<AcknowledgedResponse> wrappedListener = ActionListener.wrap(r -> {
logger.info("Finished setting [upgrade_mode] for feature name [{}]", featureName());
isRunning.set(false);
listener.onResponse(r);
listener.onResponse(new SetUpgradeModeActionResponse(r.isAcknowledged(), false));
}, e -> {
logger.info("Failed to set [upgrade_mode] for feature name [{}]", featureName());
isRunning.set(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.action;

import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

import static org.elasticsearch.TransportVersions.ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON;

public class SetUpgradeModeActionResponse extends AcknowledgedResponse {
private final boolean alreadyInUpgradeMode;

public SetUpgradeModeActionResponse(StreamInput in) throws IOException {
super(in);
this.alreadyInUpgradeMode = in.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON) && in.readBoolean();
}

public SetUpgradeModeActionResponse(boolean acknowledged, boolean alreadyInUpgradeMode) {
super(acknowledged);
this.alreadyInUpgradeMode = alreadyInUpgradeMode;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().onOrAfter(ML_TRANSFORM_SYSTEM_INDEX_MIGRATE_REASON)) {
out.writeBoolean(alreadyInUpgradeMode);
}
}

public boolean alreadyInUpgradeMode() {
return alreadyInUpgradeMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest;
import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse;

import java.io.IOException;

public class SetUpgradeModeAction extends ActionType<AcknowledgedResponse> {
public class SetUpgradeModeAction extends ActionType<SetUpgradeModeActionResponse> {

public static final SetUpgradeModeAction INSTANCE = new SetUpgradeModeAction();
public static final String NAME = "cluster:admin/xpack/ml/upgrade_mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse;

public class SetTransformUpgradeModeAction extends ActionType<AcknowledgedResponse> {
public class SetTransformUpgradeModeAction extends ActionType<SetUpgradeModeActionResponse> {
public static final SetTransformUpgradeModeAction INSTANCE = new SetTransformUpgradeModeAction();
public static final String NAME = "cluster:admin/transform/upgrade_mode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void testUpdateDoesNotRun() throws Exception {

var response = action.run(true);

assertThat(response.v1(), is(AcknowledgedResponse.TRUE));
assertThat(response.v1(), equalTo(new SetUpgradeModeActionResponse(true, true)));
assertThat(response.v2(), nullValue());
assertThat(shouldNotChange.get(), is(true));
}
Expand Down Expand Up @@ -150,7 +151,7 @@ public void testSuccess() throws Exception {

var response = action.run(true);

assertThat(response.v1(), is(AcknowledgedResponse.TRUE));
assertThat(response.v1(), equalTo(new SetUpgradeModeActionResponse(true, false)));
assertThat(response.v2(), nullValue());
}

Expand All @@ -177,8 +178,8 @@ public void runWithoutWaiting(boolean upgrade) throws Exception {
masterOperation(mock(), new SetUpgradeModeActionRequest(upgrade), ClusterState.EMPTY_STATE, ActionListener.noop());
}

public Tuple<AcknowledgedResponse, Exception> run(boolean upgrade) throws Exception {
AtomicReference<Tuple<AcknowledgedResponse, Exception>> response = new AtomicReference<>();
public Tuple<SetUpgradeModeActionResponse, Exception> run(boolean upgrade) throws Exception {
AtomicReference<Tuple<SetUpgradeModeActionResponse, Exception>> response = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
masterOperation(mock(), new SetUpgradeModeActionRequest(upgrade), ClusterState.EMPTY_STATE, ActionListener.wrap(r -> {
response.set(Tuple.tuple(r, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -2043,19 +2042,12 @@ public static SystemIndexDescriptor getInferenceIndexSystemIndexDescriptor() {

@Override
public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
boolean isAlreadyInUpgradeMode = MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode();
if (isAlreadyInUpgradeMode) {
// ML is already in upgrade mode, so nothing will write to the ML system indices during their upgrade
listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", true));
return;
}

// Enable ML upgrade mode before upgrading the ML system indices to ensure nothing writes to them during the upgrade
Client originClient = new OriginSettingClient(client, ML_ORIGIN);
originClient.execute(
SetUpgradeModeAction.INSTANCE,
new SetUpgradeModeAction.Request(true),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(Collections.singletonMap("already_in_upgrade_mode", false)))
listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", r.alreadyInUpgradeMode())))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.SetUpgradeModeActionResponse;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction;
Expand All @@ -43,6 +42,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -55,36 +56,54 @@
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class MachineLearningTests extends ESTestCase {

@SuppressWarnings("unchecked")
public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOException {
ThreadPool threadpool = new TestThreadPool("test");
testUpgradeMode(false);
}

public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException {
testUpgradeMode(true);
}

private static void testUpgradeMode(boolean alreadyInUpgradeMode) throws IOException {
ThreadPool threadpool = new ThreadPool(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), MachineLearningTests.class.getSimpleName()).put(Settings.EMPTY).build(),
MeterRegistry.NOOP,
(settings1, allocatedProcessors) -> Map.of()
) {
@Override
public ExecutorService executor(String name) {
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
}

@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) {
command.run();
return null;
}
};
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadpool);
doAnswer(invocationOnMock -> {
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any(ActionListener.class));
ActionListener<SetUpgradeModeActionResponse> listener = invocationOnMock.getArgument(2);
listener.onResponse(new SetUpgradeModeActionResponse(true, alreadyInUpgradeMode));
return Void.TYPE;
}).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any());

try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) {

SetOnce<Map<String, Object>> response = new SetOnce<>();
machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set));

assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false)));
verify(client).execute(
same(SetUpgradeModeAction.INSTANCE),
eq(new SetUpgradeModeAction.Request(true)),
any(ActionListener.class)
);
assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", alreadyInUpgradeMode)));
verify(client).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(true)), any());

machineLearning.indicesMigrationComplete(
response.get(),
Expand All @@ -93,45 +112,13 @@ public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() throws IOExcep
ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue)
);

verify(client).execute(
same(SetUpgradeModeAction.INSTANCE),
eq(new SetUpgradeModeAction.Request(false)),
any(ActionListener.class)
);
var timesCalled = alreadyInUpgradeMode ? never() : times(1);
verify(client, timesCalled).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(false)), any());
} finally {
threadpool.shutdown();
}
}

public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() throws IOException {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(
ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build()))
.build()
);
Client client = mock(Client.class);

try (MachineLearning machineLearning = createTrialLicensedMachineLearning(Settings.EMPTY)) {

SetOnce<Map<String, Object>> response = new SetOnce<>();
machineLearning.prepareForIndicesMigration(clusterService, client, ActionTestUtils.assertNoFailureListener(response::set));

assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true)));
verifyNoMoreInteractions(client);

machineLearning.indicesMigrationComplete(
response.get(),
clusterService,
client,
ActionTestUtils.assertNoFailureListener(ESTestCase::assertTrue)
);

// Neither pre nor post should have called any action
verifyNoMoreInteractions(client);
}
}

public void testMaxOpenWorkersSetting_givenDefault() {
int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
assertEquals(512, maxOpenWorkers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.TransformMetadata;
import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider;
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
Expand Down Expand Up @@ -186,18 +185,12 @@ protected XPackLicenseState getLicenseState() {

@Override
public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
if (TransformMetadata.upgradeMode(clusterService.state())) {
// Transform is already in upgrade mode, so nothing will write to the Transform system indices during their upgrade
listener.onResponse(Map.of("already_in_upgrade_mode", true));
return;
}

// Enable Transform upgrade mode before upgrading the system indices to ensure nothing writes to them during the upgrade
var originClient = new OriginSettingClient(client, TRANSFORM_ORIGIN);
originClient.execute(
SetTransformUpgradeModeAction.INSTANCE,
new SetUpgradeModeActionRequest(true),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", false)))
listener.delegateFailureAndWrap((l, r) -> l.onResponse(Map.of("already_in_upgrade_mode", r.alreadyInUpgradeMode())))
);
}

Expand Down
Loading