From f6b911672cf91dd84548ef408ba8c18486eb8ee6 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Sat, 7 Jun 2025 16:35:19 +0200 Subject: [PATCH 1/4] Remove test dependencies on cluster state API master waiting As preparation for running the cluster state API on the local node, we need to update these tests that currently depend on that API running on (and waiting for) the master node. Relates #127212 --- .../azure/classic/AzureSimpleTests.java | 23 +--- .../classic/AzureTwoStartedNodesTests.java | 23 +--- .../discovery/DiscoveryDisruptionIT.java | 1 + .../readiness/ReadinessClusterIT.java | 46 +++---- .../ComponentTemplatesFileSettingsIT.java | 14 +-- .../service/FileSettingsServiceIT.java | 20 +-- .../service/RepositoriesFileSettingsIT.java | 14 +-- .../elasticsearch/test/ESIntegTestCase.java | 18 ++- .../ml/integration/MlNativeIntegTestCase.java | 118 +----------------- .../xpack/slm/SLMFileSettingsIT.java | 12 +- .../votingonly/VotingOnlyNodePluginTests.java | 25 +--- 11 files changed, 55 insertions(+), 259 deletions(-) diff --git a/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureSimpleTests.java b/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureSimpleTests.java index c8447bc9757df..fad3fad4da077 100644 --- a/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureSimpleTests.java +++ b/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureSimpleTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery; import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Management; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESIntegTestCase; import static org.hamcrest.Matchers.containsString; @@ -28,16 +27,7 @@ public void testOneNodeShouldRunUsingPrivateIp() { final String node1 = internalCluster().startNode(settings); registerAzureNode(node1); - assertNotNull( - client().admin() - .cluster() - .prepareState(TEST_REQUEST_TIMEOUT) - .setMasterNodeTimeout(TimeValue.timeValueSeconds(1)) - .get() - .getState() - .nodes() - .getMasterNodeId() - ); + awaitMasterNode(); // We expect having 1 node as part of the cluster, let's test that assertNumberOfNodes(1); @@ -50,16 +40,7 @@ public void testOneNodeShouldRunUsingPublicIp() { final String node1 = internalCluster().startNode(settings); registerAzureNode(node1); - assertNotNull( - client().admin() - .cluster() - .prepareState(TEST_REQUEST_TIMEOUT) - .setMasterNodeTimeout(TimeValue.timeValueSeconds(1)) - .get() - .getState() - .nodes() - .getMasterNodeId() - ); + awaitMasterNode(); // We expect having 1 node as part of the cluster, let's test that assertNumberOfNodes(1); diff --git a/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureTwoStartedNodesTests.java b/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureTwoStartedNodesTests.java index a4ba0338010b1..7383704debd20 100644 --- a/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureTwoStartedNodesTests.java +++ b/plugins/discovery-azure-classic/src/internalClusterTest/java/org/elasticsearch/discovery/azure/classic/AzureTwoStartedNodesTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery; import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Management; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESIntegTestCase; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) @@ -31,30 +30,12 @@ public void testTwoNodesShouldRunUsingPrivateOrPublicIp() { logger.info("--> start first node"); final String node1 = internalCluster().startNode(settings); registerAzureNode(node1); - assertNotNull( - client().admin() - .cluster() - .prepareState(TEST_REQUEST_TIMEOUT) - .setMasterNodeTimeout(TimeValue.timeValueSeconds(1)) - .get() - .getState() - .nodes() - .getMasterNodeId() - ); + awaitMasterNode(); logger.info("--> start another node"); final String node2 = internalCluster().startNode(settings); registerAzureNode(node2); - assertNotNull( - client().admin() - .cluster() - .prepareState(TEST_REQUEST_TIMEOUT) - .setMasterNodeTimeout(TimeValue.timeValueSeconds(1)) - .get() - .getState() - .nodes() - .getMasterNodeId() - ); + awaitMasterNode(); // We expect having 2 nodes as part of the cluster, let's test that assertNumberOfNodes(2); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java index 711ad1c21ce59..c5ab9a4def526 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java @@ -146,6 +146,7 @@ public void testElectMasterWithLatestVersion() throws Exception { isolateAllNodes.stopDisrupting(); + awaitMasterNode(); final ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState(); if (state.metadata().getProject().hasIndex("test") == false) { fail("index 'test' was lost. current cluster state: " + state); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java b/server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java index cf8c377ad3410..7204f84a32718 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java @@ -8,7 +8,6 @@ */ package org.elasticsearch.readiness; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -18,9 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; -import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reservedstate.service.FileSettingsService; import org.elasticsearch.test.ESIntegTestCase; @@ -48,7 +45,6 @@ import static org.elasticsearch.test.NodeRoles.nonDataNode; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -110,20 +106,6 @@ protected Collection> getMockPlugins() { return Collections.unmodifiableList(plugins); } - private void assertMasterNode(Client client, String node) { - assertThat( - client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(), - equalTo(node) - ); - } - - private void expectMasterNotFound() { - expectThrows( - MasterNotDiscoveredException.class, - clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setMasterNodeTimeout(TimeValue.timeValueMillis(100)) - ); - } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108613") public void testReadinessDuringRestarts() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); @@ -131,23 +113,23 @@ public void testReadinessDuringRestarts() throws Exception { logger.info("--> start data node / non master node"); String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); - expectMasterNotFound(); + awaitMasterNotFound(); assertFalse(internalCluster().getInstance(ReadinessService.class, dataNode).ready()); logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); tcpReadinessProbeTrue(internalCluster().getInstance(ReadinessService.class, dataNode)); tcpReadinessProbeTrue(internalCluster().getInstance(ReadinessService.class, masterNode)); final var masterReadinessService = internalCluster().getInstance(ReadinessService.class, masterNode); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); logger.info("--> stop master node"); Settings masterDataPathSettings = internalCluster().dataPathSettings(internalCluster().getMasterName()); internalCluster().stopCurrentMasterNode(); - expectMasterNotFound(); + awaitMasterNotFound(); tcpReadinessProbeFalse(masterReadinessService); @@ -156,8 +138,8 @@ public void testReadinessDuringRestarts() throws Exception { Settings.builder().put(nonDataNode(masterNode())).put(masterDataPathSettings) ); - assertMasterNode(internalCluster().nonMasterClient(), nextMasterEligibleNodeName); - assertMasterNode(internalCluster().masterClient(), nextMasterEligibleNodeName); + awaitMasterNode(internalCluster().getNonMasterNodeName(), nextMasterEligibleNodeName); + awaitMasterNode(internalCluster().getMasterName(), nextMasterEligibleNodeName); tcpReadinessProbeTrue(internalCluster().getInstance(ReadinessService.class, nextMasterEligibleNodeName)); } @@ -168,7 +150,7 @@ public void testReadinessDuringRestartsNormalOrder() throws Exception { String masterNode = internalCluster().startMasterOnlyNode(); internalCluster().validateClusterFormed(); - assertMasterNode(internalCluster().masterClient(), masterNode); + awaitMasterNode(internalCluster().getMasterName(), masterNode); logger.info("--> start 2 data nodes"); List dataNodes = internalCluster().startDataOnlyNodes(2); @@ -196,7 +178,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) throws Exception { - expectMasterNotFound(); + awaitMasterNotFound(); logger.info("--> master node [{}] stopped", nodeName); @@ -271,7 +253,7 @@ public void testNotReadyOnBadFileSettings() throws Exception { final String masterNode = internalCluster().startMasterOnlyNode( Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build() ); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); @@ -294,14 +276,14 @@ public void testReadyAfterRestartWithBadFileSettings() throws Exception { String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); assertBusy(() -> assertTrue("master node ready", internalCluster().getInstance(ReadinessService.class, masterNode).ready())); assertBusy(() -> assertTrue("data node ready", internalCluster().getInstance(ReadinessService.class, dataNode).ready())); logger.info("--> stop master node"); Settings masterDataPathSettings = internalCluster().dataPathSettings(internalCluster().getMasterName()); internalCluster().stopCurrentMasterNode(); - expectMasterNotFound(); + awaitMasterNotFound(); logger.info("--> write bad file settings before restarting master node"); writeFileSettings(testErrorJSON); @@ -309,7 +291,7 @@ public void testReadyAfterRestartWithBadFileSettings() throws Exception { logger.info("--> restart master node"); String nextMasterNode = internalCluster().startNode(Settings.builder().put(nonDataNode(masterNode())).put(masterDataPathSettings)); - assertMasterNode(internalCluster().nonMasterClient(), nextMasterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), nextMasterNode); var savedClusterState = setupClusterStateListenerForError(nextMasterNode); assertTrue(savedClusterState.await(20, TimeUnit.SECONDS)); @@ -324,7 +306,7 @@ public void testReadyWhenMissingFileSettings() throws Exception { final String masterNode = internalCluster().startMasterOnlyNode( Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build() ); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListener(masterNode); // we need this after we setup the listener above, in case the node started and processed @@ -382,7 +364,7 @@ public void testReadyAfterCorrectFileSettings() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var readinessProbeListening = setupReadinessProbeListener(masterNode); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/ComponentTemplatesFileSettingsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/ComponentTemplatesFileSettingsIT.java index ac21bf176e201..9264c3ba1af66 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/ComponentTemplatesFileSettingsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/ComponentTemplatesFileSettingsIT.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.reservedstate.ReservedComposableIndexTemplateAction; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.ComponentTemplate; @@ -35,7 +34,6 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -46,7 +44,6 @@ import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -355,13 +352,6 @@ public class ComponentTemplatesFileSettingsIT extends ESIntegTestCase { } }"""; - private void assertMasterNode(Client client, String node) throws ExecutionException, InterruptedException { - assertThat( - client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).execute().get().getState().nodes().getMasterNode().getName(), - equalTo(node) - ); - } - private void writeJSONFile(String node, String json) throws Exception { FileSettingsServiceIT.writeJSONFile(node, json, logger, versionCounter.incrementAndGet()); } @@ -543,7 +533,7 @@ public void testSettingsApplied() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2()); @@ -611,7 +601,7 @@ public void testErrorSaved() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); writeJSONFile(masterNode, testErrorJSON); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 5778f4ea9f7f0..b86031ce96bf3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; @@ -129,13 +128,6 @@ public void resetVersionCounter() { } }"""; - private void assertMasterNode(Client client, String node) { - assertThat( - client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(), - equalTo(node) - ); - } - public static void writeJSONFile(String node, String json, Logger logger, Long version) throws Exception { FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); writeJSONFile(node, json, logger, version, fileSettingsService.watchedFile()); @@ -254,7 +246,7 @@ public void testSettingsApplied() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet()); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); @@ -281,7 +273,7 @@ public void testSettingsAppliedOnStart() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); @@ -297,7 +289,7 @@ public void testReservedStatePersistsOnRestart() throws Exception { final String masterNode = internalCluster().startMasterOnlyNode( Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build() ); - assertMasterNode(internalCluster().masterClient(), masterNode); + awaitMasterNode(internalCluster().getMasterName(), masterNode); var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet()); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); @@ -375,7 +367,7 @@ public void testErrorSaved() throws Exception { final String masterNode = internalCluster().startMasterOnlyNode( Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build() ); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); @@ -399,7 +391,7 @@ public void testErrorCanRecoverOnRestart() throws Exception { final String masterNode = internalCluster().startMasterOnlyNode( Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build() ); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); @@ -435,7 +427,7 @@ public void testNewErrorOnRestartReprocessing() throws Exception { final String masterNode = internalCluster().startMasterOnlyNode( Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build() ); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/RepositoriesFileSettingsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/RepositoriesFileSettingsIT.java index 2f73f1e32c739..2ec4dae9193c4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/RepositoriesFileSettingsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/RepositoriesFileSettingsIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -33,7 +32,6 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -43,7 +41,6 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -94,13 +91,6 @@ public class RepositoriesFileSettingsIT extends ESIntegTestCase { } }"""; - private void assertMasterNode(Client client, String node) throws ExecutionException, InterruptedException { - assertThat( - client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).execute().get().getState().nodes().getMasterNode().getName(), - equalTo(node) - ); - } - private void writeJSONFile(String node, String json) throws Exception { FileSettingsServiceIT.writeJSONFile(node, json, logger, versionCounter.incrementAndGet()); } @@ -164,7 +154,7 @@ public void testSettingsApplied() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2()); } @@ -220,7 +210,7 @@ public void testErrorSaved() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); writeJSONFile(masterNode, testErrorJSON); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 509475531f3ef..cc64382a4dba0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -962,6 +962,16 @@ public void awaitMasterNode(String viaNode, String masterNodeName) { safeAwait(listener, TEST_REQUEST_TIMEOUT); } + /** + * Waits for all nodes in the cluster to have a consistent view of which node is currently the master. + */ + public void awaitMasterNode() { + // The cluster health API always runs on the master node, and the master only completes cluster state publication when all nodes + // in the cluster have accepted the new cluster state. By waiting for all events to have finished on the master node, we ensure + // that the whole cluster has a consistent view of which node is the master. + clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).setTimeout(TEST_REQUEST_TIMEOUT).setWaitForEvents(Priority.LANGUID).get(); + } + /** * Waits for a random node in the cluster to not see a master node in the cluster state. * Note that this does not guarantee that all other nodes in the cluster are on the same cluster state version already. @@ -1360,8 +1370,14 @@ private void setMultiProjectParams(Map xContentParams) { } protected final void doEnsureClusterStateConsistency(NamedWriteableRegistry namedWriteableRegistry) { + // This check has very little value in external test clusters and there is no guaranteed method of obtaining the master cluster + // state in those clusters. + if (isInternalCluster() == false) { + return; + } final PlainActionFuture future = new PlainActionFuture<>(); final List> localStates = new ArrayList<>(cluster().size()); + final var masterName = internalCluster().getMasterName(); for (Client client : cluster().getClients()) { localStates.add( SubscribableListener.newForked( @@ -1371,7 +1387,7 @@ protected final void doEnsureClusterStateConsistency(NamedWriteableRegistry name } try (RefCountingListener refCountingListener = new RefCountingListener(future)) { SubscribableListener.newForked( - l -> client().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).all().execute(l) + l -> client(masterName).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).all().execute(l) ).andThenAccept(masterStateResponse -> { byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterStateResponse.getState()); // remove local node reference diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index a8cfa6714a7e8..63cb40753dd23 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -19,15 +19,11 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -38,8 +34,6 @@ import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.license.LicenseSettings; import org.elasticsearch.multiproject.TestOnlyMultiProjectPlugin; -import org.elasticsearch.persistent.PersistentTaskParams; -import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.script.IngestScript; @@ -58,48 +52,25 @@ import org.elasticsearch.transport.netty4.Netty4Plugin; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.autoscaling.Autoscaling; -import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; -import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.ilm.DeleteAction; -import org.elasticsearch.xpack.core.ilm.ForceMergeAction; -import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; -import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; -import org.elasticsearch.xpack.core.ilm.LifecycleType; -import org.elasticsearch.xpack.core.ilm.RolloverAction; -import org.elasticsearch.xpack.core.ilm.ShrinkAction; -import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; -import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; -import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; -import org.elasticsearch.xpack.core.ml.inference.ModelAliasMetadata; -import org.elasticsearch.xpack.core.ml.inference.TrainedModelCacheMetadata; -import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignmentMetadata; import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.security.SecurityField; -import org.elasticsearch.xpack.core.security.authc.TokenMetadata; import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.ilm.IndexLifecycle; -import org.elasticsearch.xpack.inference.registry.ModelRegistryMetadata; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; -import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason; import org.elasticsearch.xpack.slm.SnapshotLifecycle; import org.elasticsearch.xpack.slm.history.SnapshotLifecycleTemplateRegistry; import org.elasticsearch.xpack.transform.Transform; @@ -345,94 +316,7 @@ protected static List fetchAllAuditMessages(String jobId) throws Excepti @Override protected void ensureClusterStateConsistency() throws IOException { - if (cluster() != null && cluster().size() > 0) { - List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); - entries.addAll(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables()); - entries.add( - new NamedWriteableRegistry.Entry( - Metadata.ProjectCustom.class, - TrainedModelAssignmentMetadata.NAME, - TrainedModelAssignmentMetadata::fromStream - ) - ); - entries.add( - new NamedWriteableRegistry.Entry( - NamedDiff.class, - TrainedModelAssignmentMetadata.NAME, - TrainedModelAssignmentMetadata::readDiffFrom - ) - ); - entries.add(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, ModelAliasMetadata.NAME, ModelAliasMetadata::new)); - entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, ModelAliasMetadata.NAME, ModelAliasMetadata::readDiffFrom)); - entries.add( - new NamedWriteableRegistry.Entry( - Metadata.ProjectCustom.class, - TrainedModelCacheMetadata.NAME, - TrainedModelCacheMetadata::new - ) - ); - entries.add( - new NamedWriteableRegistry.Entry(NamedDiff.class, TrainedModelCacheMetadata.NAME, TrainedModelCacheMetadata::readDiffFrom) - ); - entries.add(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, "ml", MlMetadata::new)); - entries.add( - new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata::new) - ); - entries.add( - new NamedWriteableRegistry.Entry( - LifecycleType.class, - TimeseriesLifecycleType.TYPE, - (in) -> TimeseriesLifecycleType.INSTANCE - ) - ); - entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::readFrom)); - entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new)); - entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::read)); - entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new)); - entries.add( - new NamedWriteableRegistry.Entry( - PersistentTaskParams.class, - MlTasks.DATAFEED_TASK_NAME, - StartDatafeedAction.DatafeedParams::new - ) - ); - entries.add( - new NamedWriteableRegistry.Entry( - PersistentTaskParams.class, - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, - StartDataFrameAnalyticsAction.TaskParams::new - ) - ); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME, OpenJobAction.JobParams::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); - entries.add( - new NamedWriteableRegistry.Entry( - PersistentTaskState.class, - DataFrameAnalyticsTaskState.NAME, - DataFrameAnalyticsTaskState::new - ) - ); - entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetadata.TYPE, TokenMetadata::new)); - entries.add(new NamedWriteableRegistry.Entry(Metadata.ClusterCustom.class, AutoscalingMetadata.NAME, AutoscalingMetadata::new)); - entries.add( - new NamedWriteableRegistry.Entry( - NamedDiff.class, - AutoscalingMetadata.NAME, - AutoscalingMetadata.AutoscalingMetadataDiff::new - ) - ); - entries.add( - new NamedWriteableRegistry.Entry(AutoscalingDeciderResult.Reason.class, MlScalingReason.NAME, MlScalingReason::new) - ); - - entries.add( - new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, ModelRegistryMetadata.TYPE, ModelRegistryMetadata::new) - ); - entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, ModelRegistryMetadata.TYPE, ModelRegistryMetadata::readDiffFrom)); - - doEnsureClusterStateConsistency(new NamedWriteableRegistry(entries)); - } + // Ensuring cluster state consistency is not supported for external test clusters. } protected static void createDataStreamAndTemplate(String dataStreamName, String mapping) throws IOException { diff --git a/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMFileSettingsIT.java b/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMFileSettingsIT.java index 66c8a7d4337d6..54a390f55cc35 100644 --- a/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMFileSettingsIT.java +++ b/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMFileSettingsIT.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; @@ -149,13 +148,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder().put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED, false).build(); } - private void assertMasterNode(Client client, String node) { - assertThat( - client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(), - equalTo(node) - ); - } - private void writeJSONFile(String node, String json) throws Exception { long version = versionCounter.incrementAndGet(); @@ -238,7 +230,7 @@ public void testSettingsApplied() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNode); assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2()); @@ -352,7 +344,7 @@ public void testErrorSaved() throws Exception { logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + awaitMasterNode(internalCluster().getMasterName(), masterNode); var savedClusterState = setupClusterStateListenerForError(masterNode); writeJSONFile(masterNode, testErrorJSON); diff --git a/x-pack/plugin/voting-only-node/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePluginTests.java b/x-pack/plugin/voting-only-node/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePluginTests.java index bff42d689fb62..92297f7585128 100644 --- a/x-pack/plugin/voting-only-node/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePluginTests.java +++ b/x-pack/plugin/voting-only-node/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePluginTests.java @@ -15,12 +15,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -53,7 +50,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) public class VotingOnlyNodePluginTests extends ESIntegTestCase { @@ -110,7 +106,7 @@ public void testPreferFullMasterOverVotingOnlyNodes() throws Exception { final String originalMaster = internalCluster().getMasterName(); internalCluster().stopCurrentMasterNode(); - clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForEvents(Priority.LANGUID).get(); + awaitMasterNode(); assertNotEquals(originalMaster, internalCluster().getMasterName()); assertThat( VotingOnlyNodePlugin.isVotingOnlyNode( @@ -129,6 +125,7 @@ public void testBootstrapOnlyVotingOnlyNodes() throws Exception { equalTo(3) ) ); + awaitMasterNode(); assertThat( VotingOnlyNodePlugin.isVotingOnlyNode( clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode() @@ -146,7 +143,8 @@ public void testBootstrapOnlySingleVotingOnlyNode() throws Exception { .build() ); internalCluster().startNode(); - assertBusy(() -> assertThat(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().getNodes().getSize(), equalTo(2))); + ensureStableCluster(2); + awaitMasterNode(); assertThat( VotingOnlyNodePlugin.isVotingOnlyNode( clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode() @@ -166,22 +164,11 @@ public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exc equalTo(3) ) ); + awaitMasterNode(); final String oldMasterId = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNodeId(); internalCluster().stopCurrentMasterNode(); - - expectThrows( - MasterNotDiscoveredException.class, - () -> assertThat( - clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) - .setMasterNodeTimeout(TimeValue.timeValueMillis(100)) - .get() - .getState() - .nodes() - .getMasterNodeId(), - nullValue() - ) - ); + awaitMasterNotFound(); // start a fresh full master node, which will be brought into the cluster as master by the voting-only nodes final String newMaster = internalCluster().startNode(); From 1b2345de15779efaffddaef839f575707780d90d Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Fri, 13 Jun 2025 07:10:11 -0300 Subject: [PATCH 2/4] Readd roundtrip for ML tests --- .../ml/integration/MlNativeIntegTestCase.java | 140 +++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 63cb40753dd23..2518fa0518ac8 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; @@ -19,11 +20,15 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -34,6 +39,8 @@ import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.license.LicenseSettings; import org.elasticsearch.multiproject.TestOnlyMultiProjectPlugin; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.script.IngestScript; @@ -52,28 +59,52 @@ import org.elasticsearch.transport.netty4.Netty4Plugin; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.autoscaling.Autoscaling; +import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.DeleteAction; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; +import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleType; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.ShrinkAction; +import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.inference.ModelAliasMetadata; +import org.elasticsearch.xpack.core.ml.inference.TrainedModelCacheMetadata; +import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignmentMetadata; import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.security.SecurityField; +import org.elasticsearch.xpack.core.security.authc.TokenMetadata; import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.ilm.IndexLifecycle; +import org.elasticsearch.xpack.inference.registry.ModelRegistryMetadata; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; +import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason; import org.elasticsearch.xpack.slm.SnapshotLifecycle; import org.elasticsearch.xpack.slm.history.SnapshotLifecycleTemplateRegistry; import org.elasticsearch.xpack.transform.Transform; +import org.junit.After; import java.io.IOException; import java.io.UncheckedIOException; @@ -93,6 +124,7 @@ import java.util.Set; import java.util.function.Function; +import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.monitoring.MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED; @@ -314,9 +346,111 @@ protected static List fetchAllAuditMessages(String jobId) throws Excepti return messages; } - @Override - protected void ensureClusterStateConsistency() throws IOException { - // Ensuring cluster state consistency is not supported for external test clusters. + /** + * Asserts that all ML named writeables pass a cluster state round-trip (de)serialization. + */ + @After + protected void assertClusterRoundTrip() throws IOException { + if (cluster() != null && cluster().size() > 0) { + List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); + entries.addAll(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables()); + entries.add( + new NamedWriteableRegistry.Entry( + Metadata.ProjectCustom.class, + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata::fromStream + ) + ); + entries.add( + new NamedWriteableRegistry.Entry( + NamedDiff.class, + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata::readDiffFrom + ) + ); + entries.add(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, ModelAliasMetadata.NAME, ModelAliasMetadata::new)); + entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, ModelAliasMetadata.NAME, ModelAliasMetadata::readDiffFrom)); + entries.add( + new NamedWriteableRegistry.Entry( + Metadata.ProjectCustom.class, + TrainedModelCacheMetadata.NAME, + TrainedModelCacheMetadata::new + ) + ); + entries.add( + new NamedWriteableRegistry.Entry(NamedDiff.class, TrainedModelCacheMetadata.NAME, TrainedModelCacheMetadata::readDiffFrom) + ); + entries.add(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, "ml", MlMetadata::new)); + entries.add( + new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata::new) + ); + entries.add( + new NamedWriteableRegistry.Entry( + LifecycleType.class, + TimeseriesLifecycleType.TYPE, + (in) -> TimeseriesLifecycleType.INSTANCE + ) + ); + entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::readFrom)); + entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new)); + entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::read)); + entries.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new)); + entries.add( + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + MlTasks.DATAFEED_TASK_NAME, + StartDatafeedAction.DatafeedParams::new + ) + ); + entries.add( + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, + StartDataFrameAnalyticsAction.TaskParams::new + ) + ); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME, OpenJobAction.JobParams::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); + entries.add( + new NamedWriteableRegistry.Entry( + PersistentTaskState.class, + DataFrameAnalyticsTaskState.NAME, + DataFrameAnalyticsTaskState::new + ) + ); + entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetadata.TYPE, TokenMetadata::new)); + entries.add(new NamedWriteableRegistry.Entry(Metadata.ClusterCustom.class, AutoscalingMetadata.NAME, AutoscalingMetadata::new)); + entries.add( + new NamedWriteableRegistry.Entry( + NamedDiff.class, + AutoscalingMetadata.NAME, + AutoscalingMetadata.AutoscalingMetadataDiff::new + ) + ); + entries.add( + new NamedWriteableRegistry.Entry(AutoscalingDeciderResult.Reason.class, MlScalingReason.NAME, MlScalingReason::new) + ); + + entries.add( + new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, ModelRegistryMetadata.TYPE, ModelRegistryMetadata::new) + ); + entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, ModelRegistryMetadata.TYPE, ModelRegistryMetadata::readDiffFrom)); + + // + final ClusterStateResponse clusterStateResponse = client().admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .all() + .get(TEST_REQUEST_TIMEOUT); + byte[] clusterStateBytes = ClusterState.Builder.toBytes(clusterStateResponse.getState()); + final ClusterState clusterState = ClusterState.Builder.fromBytes( + clusterStateBytes, + clusterStateResponse.getState().nodes().getLocalNode(), + new NamedWriteableRegistry(entries) + ); + assertEquals(convertToMap(clusterStateResponse.getState()), convertToMap(clusterState)); + } } protected static void createDataStreamAndTemplate(String dataStreamName, String mapping) throws IOException { From c702b51ca5eefa256e03fb89fa8799f3d87dd438 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Fri, 13 Jun 2025 10:57:15 -0300 Subject: [PATCH 3/4] Fix test --- .../ml/integration/MlNativeIntegTestCase.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 2518fa0518ac8..5c9c8093e2a9f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -56,6 +56,7 @@ import org.elasticsearch.test.ExternalTestCluster; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.test.TestCluster; +import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.netty4.Netty4Plugin; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.autoscaling.Autoscaling; @@ -124,7 +125,6 @@ import java.util.Set; import java.util.function.Function; -import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.monitoring.MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED; @@ -444,12 +444,22 @@ protected void assertClusterRoundTrip() throws IOException { .all() .get(TEST_REQUEST_TIMEOUT); byte[] clusterStateBytes = ClusterState.Builder.toBytes(clusterStateResponse.getState()); - final ClusterState clusterState = ClusterState.Builder.fromBytes( + final ClusterState parsedClusterState = ClusterState.Builder.fromBytes( clusterStateBytes, clusterStateResponse.getState().nodes().getLocalNode(), new NamedWriteableRegistry(entries) ); - assertEquals(convertToMap(clusterStateResponse.getState()), convertToMap(clusterState)); + final var responseMap = XContentTestUtils.convertToMap(clusterStateResponse.getState()); + final var parsedMap = XContentTestUtils.convertToMap(parsedClusterState); + final var diff = XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder(responseMap, parsedMap); + if (diff != null) { + logger.error( + "Cluster state response:\n{}\nParsed cluster state:\n{}", + clusterStateResponse.getState().toString(), + parsedClusterState.toString() + ); + assertNull("cluster state JSON serialization does not match", diff); + } } } From 9a223c6d3dbaadfb4a367c8d77751142f3bf6abd Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Mon, 16 Jun 2025 09:51:18 -0300 Subject: [PATCH 4/4] Add comment --- .../xpack/ml/integration/MlNativeIntegTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 5c9c8093e2a9f..86aadcb0ec1d4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -437,7 +437,7 @@ protected void assertClusterRoundTrip() throws IOException { ); entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, ModelRegistryMetadata.TYPE, ModelRegistryMetadata::readDiffFrom)); - // + // Retrieve the cluster state from a random node, and serialize and deserialize it. final ClusterStateResponse clusterStateResponse = client().admin() .cluster() .prepareState(TEST_REQUEST_TIMEOUT)