From b8ee38f65efe39217b29810e608cc7f12ea1a5e5 Mon Sep 17 00:00:00 2001 From: Joshua Adams Date: Wed, 3 Sep 2025 14:56:11 +0100 Subject: [PATCH] Revert "Master node disconnect (#132023)" This reverts commit 656a7a99cf81586f64323f9151323d00d96121ae. --- .../cluster/coordination/NodeJoiningIT.java | 258 ------------------ .../cluster/coordination/Coordinator.java | 64 +---- .../discovery/DiscoveryModule.java | 7 +- .../elasticsearch/node/NodeConstruction.java | 3 +- .../cluster/coordination/NodeJoinTests.java | 14 +- .../discovery/DiscoveryModuleTests.java | 6 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../AbstractCoordinatorTestCase.java | 3 +- 8 files changed, 14 insertions(+), 344 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java deleted file mode 100644 index 9fdde19ac3c33..0000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster.coordination; - -import org.apache.logging.log4j.Level; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterApplierService; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ClusterServiceUtils; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.MockLog; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportService; - -import java.util.Collection; -import java.util.List; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) -public class NodeJoiningIT extends ESIntegTestCase { - - @Override - protected Collection> nodePlugins() { - return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal, otherSettings)) - // detect leader failover quickly - .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) - .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms") - .build(); - } - - public void testNodeJoinsCluster() { - internalCluster().startNodes(3); - String masterNodeName = internalCluster().getMasterName(); - int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size(); - int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName) - .state() - .nodes() - .getMasterNodes() - .size(); - List namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName); - - // Attempt to add new node - String newNodeName = internalCluster().startDataOnlyNode(); - ensureStableCluster(4); - - // Assert the new data node was added - ClusterState state = internalCluster().clusterService(masterNodeName).state(); - assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize()); - assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size()); - assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size()); - - List namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName); - assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName)); - for (String nodeName : namesOfDataNodesInOriginalCluster) { - assertTrue(namesOfDataNodesInNewCluster.contains(nodeName)); - } - } - - @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO") - public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() { - List nodeNames = internalCluster().startNodes(3); - ensureStableCluster(3); - String originalMasterNodeName = internalCluster().getMasterName(); - int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size(); - // Determine upfront who we want the next master to be - final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames)); - - // Ensure the logging is as expected - try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) { - - // Sets MockTransportService behaviour - for (final var transportService : internalCluster().getInstances(TransportService.class)) { - final var mockTransportService = asInstanceOf(MockTransportService.class, transportService); - - if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) { - List listOfActionsToBlock = List.of( - // This forces the current master node to fail - PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, - // This disables pre-voting on all nodes except the new master, forcing it to win the election - StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME - ); - blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock); - } - } - - // We do not expect to see a WARN log about a node disconnecting (#ES-11449) - addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog); - - // We haven't changed master nodes yet - assertEquals(originalMasterNodeName, internalCluster().getMasterName()); - - // Sends a node join request to the original master node. This will fail, and cause a master failover - // startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election - String newNodeName = internalCluster().startDataOnlyNode(); - assertNotEquals(originalMasterNodeName, internalCluster().getMasterName()); - logger.info("New master is elected"); - - // Assert all nodes have accepted N into their cluster state - assertNewNodeIsInAllClusterStates(newNodeName); - - mockLog.assertAllExpectationsMatched(); - - // Assert the new data node was added - DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes(); - assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize()); - assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName)); - } - } - - /* - In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected. - Node N should join the cluster, but it should not be disconnected (#ES-11449) - */ - @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO") - public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() { - internalCluster().startNodes(3); - ensureStableCluster(3); - String masterNodeName = internalCluster().getMasterName(); - - long originalTerm = getTerm(masterNodeName); - int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size(); - - try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) { - for (String nodeName : internalCluster().getNodeNames()) { - final var mockTransportService = MockTransportService.getInstance(nodeName); - - if (nodeName.equals(masterNodeName)) { - // This makes the master fail, forcing a re-election - blockActionNameOnMockTransportService( - mockTransportService, - List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME) - ); - - // Wait until the master has stepped down before removing the publishing ban - // This allows the master to be re-elected - ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> { - DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode(); - boolean hasMasterSteppedDown = currentMasterNode == null - || currentMasterNode.getName().equals(masterNodeName) == false; - if (hasMasterSteppedDown) { - logger.info("Master publishing ban removed"); - mockTransportService.addSendBehavior(Transport.Connection::sendRequest); - } - return hasMasterSteppedDown; - }); - - } else { - // This disables pre-voting on all nodes except the master, forcing it to win the election - blockActionNameOnMockTransportService( - mockTransportService, - List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME) - ); - } - } - - // We expect the node join request to fail with a FailedToCommitClusterStateException - mockLog.addExpectation( - new MockLog.SeenEventExpectation( - "failed to commit cluster state", - MasterService.class.getCanonicalName(), - Level.WARN, - "failed to commit cluster state" - ) - ); - - /* - We expect the cluster to reuse the connection to N and not disconnect it - Therefore, this WARN log should not be thrown (#ES-11449) - */ - addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog); - - // Before we add the new node, assert we haven't changed master nodes yet - assertEquals(masterNodeName, internalCluster().getMasterName()); - - // Sends a node join request to the original master node. This will fail, and cause a master failover - logger.info("Sending node join request"); - String newNodeName = internalCluster().startDataOnlyNode(); - - // Assert the master was re-elected - assertEquals(masterNodeName, internalCluster().getMasterName()); - assertTrue(originalTerm < getTerm(masterNodeName)); - - // Assert all nodes have accepted N into their cluster state - assertNewNodeIsInAllClusterStates(newNodeName); - - // If the WARN log was thrown, then the connection to N was disconnected so fail the test - mockLog.assertAllExpectationsMatched(); - - // Assert the new data node was added - DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes(); - assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize()); - assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName)); - } - } - - private long getTerm(String masterNodeName) { - return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term(); - } - - private void assertNewNodeIsInAllClusterStates(String newNodeName) { - for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) { - assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName)); - } - } - - private List getListOfDataNodeNamesFromCluster(String nodeName) { - return internalCluster().clusterService(nodeName) - .state() - .getNodes() - .getDataNodes() - .values() - .stream() - .map(DiscoveryNode::getName) - .toList(); - } - - private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) { - mockLog.addExpectation( - new MockLog.UnseenEventExpectation( - "warn message with troubleshooting link", - "org.elasticsearch.cluster.coordination.NodeJoinExecutor", - Level.WARN, - "*" - ) - ); - } - - private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List actionNamesToBlock) { - mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (actionNamesToBlock.contains(action)) { - throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode()); - } else { - connection.sendRequest(requestId, action, request, options); - } - }); - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 23bc583a14b44..1976bda6c6aba 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -19,10 +19,8 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStatePublicationEvent; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.LocalMasterServiceTask; @@ -41,7 +39,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplierService; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.cluster.version.CompatibilityVersions; @@ -193,7 +190,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt private final NodeHealthService nodeHealthService; private final List peerFinderListeners; private final LeaderHeartbeatService leaderHeartbeatService; - private final ClusterService clusterService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -222,8 +218,7 @@ public Coordinator( LeaderHeartbeatService leaderHeartbeatService, PreVoteCollector.Factory preVoteCollectorFactory, CompatibilityVersions compatibilityVersions, - FeatureService featureService, - ClusterService clusterService + FeatureService featureService ) { this.settings = settings; this.transportService = transportService; @@ -333,7 +328,6 @@ public Coordinator( this.peerFinderListeners.add(clusterBootstrapService); this.leaderHeartbeatService = leaderHeartbeatService; this.compatibilityVersions = compatibilityVersions; - this.clusterService = clusterService; } /** @@ -668,57 +662,11 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener joi transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() { @Override public void onResponse(Releasable response) { - SubscribableListener - // Validates the join request: can the remote node deserialize our cluster state and does it respond to pings? - .newForked(l -> validateJoinRequest(joinRequest, l)) - - // Adds the joining node to the cluster state - .andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> { - // #ES-11449 - if (e instanceof FailedToCommitClusterStateException) { - // The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed: - // the next master may have already accepted the state that we just published and will therefore include the - // joining node in its future states too. Thus we need to wait for the next committed state before we know the - // eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete - // the listener. - - // NB we are on the master update thread here at the end of processing the failed cluster state update, so this - // all happens before any cluster state update that re-elects a master - assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME); - - final ClusterStateListener clusterStateListener = new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - final var discoveryNodes = event.state().nodes(); - // Keep the connection open until the next committed state - if (discoveryNodes.getMasterNode() != null) { - // Remove this listener to avoid memory leaks - clusterService.removeListener(this); - if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) { - ll.onResponse(null); - } else { - ll.onFailure(e); - } - } - } - }; - clusterService.addListener(clusterStateListener); - clusterStateListener.clusterChanged( - new ClusterChangedEvent( - "Checking if another master has been elected since " - + joinRequest.getSourceNode().getName() - + " attempted to join cluster", - clusterService.state(), - clusterService.state() - ) - ); - } else { - ll.onFailure(e); - } - }))) - - // Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node. - .addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response))); + validateJoinRequest( + joinRequest, + ActionListener.runBefore(joinListener, () -> Releasables.close(response)) + .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 7277a5caaf229..a60044076234e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Randomness; @@ -108,8 +107,7 @@ public DiscoveryModule( NodeHealthService nodeHealthService, CircuitBreakerService circuitBreakerService, CompatibilityVersions compatibilityVersions, - FeatureService featureService, - ClusterService clusterService + FeatureService featureService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -196,8 +194,7 @@ public DiscoveryModule( leaderHeartbeatService, preVoteCollectorFactory, compatibilityVersions, - featureService, - clusterService + featureService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 0e3124ed07c9f..c909c9a25e5b8 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1722,8 +1722,7 @@ private DiscoveryModule createDiscoveryModule( fsHealthService, circuitBreakerService, compatibilityVersions, - featureService, - clusterService + featureService ); modules.add(module, b -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 377b91c7320a8..4ff247ff835fe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTests; @@ -213,16 +212,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req clusterSettings, Collections.emptySet() ); - String nodeName = "test_node"; - Settings settings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(); - ClusterService clusterService = new ClusterService( - settings, - clusterSettings, - threadPool, - new TaskManager(settings, threadPool, Set.of()) - ); coordinator = new Coordinator( - nodeName, + "test_node", Settings.EMPTY, clusterSettings, transportService, @@ -243,8 +234,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req LeaderHeartbeatService.NO_OP, StatefulPreVoteCollector::new, CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()), - clusterService + new FeatureService(List.of()) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index ce27b2499a190..94f8700c922e8 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterApplier; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -62,7 +61,6 @@ public class DiscoveryModuleTests extends ESTestCase { private ClusterApplier clusterApplier; private ClusterSettings clusterSettings; private GatewayMetaState gatewayMetaState; - private ClusterService clusterService; public interface DummyHostsProviderPlugin extends DiscoveryPlugin { Map> impl(); @@ -90,7 +88,6 @@ public void setupDummyServices() { clusterApplier = mock(ClusterApplier.class); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); gatewayMetaState = mock(GatewayMetaState.class); - clusterService = mock(ClusterService.class); } @After @@ -121,8 +118,7 @@ private DiscoveryModule newModule( null, new NoneCircuitBreakerService(), CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()), - clusterService + new FeatureService(List.of()) ); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 92cb2452ccbe6..6a459cab07328 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2981,8 +2981,7 @@ public void start(ClusterState initialState) { LeaderHeartbeatService.NO_OP, StatefulPreVoteCollector::new, CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()), - this.clusterService + new FeatureService(List.of()) ); masterService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 3e431eab4a15f..b8f225aedc0de 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1171,8 +1171,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { coordinationServices.getLeaderHeartbeatService(), coordinationServices.getPreVoteCollectorFactory(), CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()), - clusterService + new FeatureService(List.of()) ); coordinationDiagnosticsService = new CoordinationDiagnosticsService( clusterService,