diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 8e4d1e33b9a10..e0753102780ee 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -286,10 +286,20 @@ protected void doStart(ClusterState clusterState) { } private void retryOnMasterChange(ClusterState state, Throwable failure) { - retry(state, failure, ClusterManagerNodeChangePredicate.build(state)); + retryOnMasterChange(state.version(), state.nodes().getClusterManagerNode(), failure); } - private void retry(ClusterState state, final Throwable failure, final Predicate statePredicate) { + private void retryOnMasterChange(long stateVersion, DiscoveryNode clusterManagerNode, Throwable failure) { + final String ephemeralNodeId = clusterManagerNode != null ? clusterManagerNode.getEphemeralId() : null; + retry(stateVersion, clusterManagerNode, failure, ClusterManagerNodeChangePredicate.build(stateVersion, ephemeralNodeId)); + } + + private void retry( + final long stateVersion, + final DiscoveryNode clusterManagerNode, + final Throwable failure, + final Predicate statePredicate + ) { if (observer == null) { final long remainingTimeoutMS = request.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime); @@ -298,9 +308,11 @@ private void retry(ClusterState state, final Throwable failure, final Predicate< listener.onFailure(new ClusterManagerNotDiscoveredException(failure)); return; } + final String persistentNodeId = clusterManagerNode != null ? clusterManagerNode.getId() : null; this.observer = new ClusterStateObserver( - state, - clusterService, + persistentNodeId, + stateVersion, + clusterService.getClusterApplierService(), TimeValue.timeValueMillis(remainingTimeoutMS), logger, threadPool.getThreadContext() @@ -329,6 +341,10 @@ public void onTimeout(TimeValue timeout) { } private ActionListener getDelegateForLocalExecute(ClusterState clusterState) { + // Extract version and cluster manager node before creating closure to avoid retaining full ClusterState + final long stateVersion = clusterState.version(); + final DiscoveryNode clusterManagerNode = clusterState.nodes().getClusterManagerNode(); + return ActionListener.delegateResponse(listener, (delegatedListener, t) -> { if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { logger.debug( @@ -340,7 +356,7 @@ private ActionListener getDelegateForLocalExecute(ClusterState cluster t ); - retryOnMasterChange(clusterState, t); + retryOnMasterChange(stateVersion, clusterManagerNode, t); } else { delegatedListener.onFailure(t); } @@ -455,7 +471,9 @@ private boolean checkForBlock(Request request, ClusterState localClusterState) { listener.onFailure(blockException); } else { logger.debug("can't execute due to a cluster block, retrying", blockException); - retry(localClusterState, blockException, newState -> { + final long blockStateVersion = localClusterState.version(); + final DiscoveryNode blockClusterManagerNode = localClusterState.nodes().getClusterManagerNode(); + retry(blockStateVersion, blockClusterManagerNode, blockException, newState -> { try { ClusterBlockException newException = checkBlock(request, newState); return (newException == null || !newException.retryable()); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerNodeChangePredicate.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerNodeChangePredicate.java index b5c65dacb9542..4bcd545df3c6e 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterManagerNodeChangePredicate.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerNodeChangePredicate.java @@ -48,13 +48,27 @@ private ClusterManagerNodeChangePredicate() { } /** - * builds a predicate that will accept a cluster state only if it was generated after the current has - * (re-)joined the master + * Builds a predicate that accepts a cluster state only if it was generated after the current node + * (re-)joined the cluster manager. Extracts version and cluster-manager ID from the given state + * and delegates to {@link #build(long, String)}. */ public static Predicate build(ClusterState currentState) { final long currentVersion = currentState.version(); final DiscoveryNode clusterManagerNode = currentState.nodes().getClusterManagerNode(); final String currentMasterId = clusterManagerNode == null ? null : clusterManagerNode.getEphemeralId(); + return build(currentVersion, currentMasterId); + } + + /** + * Builds a predicate that accepts a cluster state only if the cluster manager has changed + * or the state version has increased beyond the provided version. Accepts pre-extracted + * values to avoid retaining the full {@link ClusterState} in lambda closures. + * + * @param currentVersion the cluster state version to compare against + * @param currentMasterId the ephemeral ID of the current cluster manager node, or null if none + * @return predicate that returns true when cluster manager changes or version increases + */ + public static Predicate build(long currentVersion, String currentMasterId) { return newState -> { final DiscoveryNode newClusterManager = newState.nodes().getClusterManagerNode(); final boolean accept; diff --git a/server/src/main/java/org/opensearch/cluster/ClusterStateObserver.java b/server/src/main/java/org/opensearch/cluster/ClusterStateObserver.java index 7945afd120350..49d63513a54e6 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterStateObserver.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterStateObserver.java @@ -117,6 +117,25 @@ public ClusterStateObserver( this.contextHolder = contextHolder; } + public ClusterStateObserver( + String clusterManagerNodeId, + long version, + ClusterApplierService clusterApplierService, + @Nullable TimeValue timeout, + Logger logger, + ThreadContext contextHolder + ) { + this.clusterApplierService = clusterApplierService; + this.threadPool = clusterApplierService.threadPool(); + this.lastObservedState = new AtomicReference<>(new StoredState(clusterManagerNodeId, version)); + this.timeOutValue = timeout; + if (timeOutValue != null) { + this.startTimeMS = threadPool.relativeTimeInMillis(); + } + this.logger = logger; + this.contextHolder = contextHolder; + } + /** sets the last observed state to the currently applied cluster state and returns it */ public ClusterState setAndGetObservedState() { if (observingContext.get() != null) { @@ -311,8 +330,12 @@ private static class StoredState { private final long version; StoredState(ClusterState clusterState) { - this.clusterManagerNodeId = clusterState.nodes().getClusterManagerNodeId(); - this.version = clusterState.version(); + this(clusterState.nodes().getClusterManagerNodeId(), clusterState.version()); + } + + StoredState(String clusterManagerNodeId, long version) { + this.clusterManagerNodeId = clusterManagerNodeId; + this.version = version; } /** diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 5075f64937508..a3253d5bd3357 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -610,6 +610,83 @@ protected void clusterManagerOperation(Request request, ClusterState state, Acti assertThat(listener.get(), equalTo(response)); } + public void testStepDownToNoMasterThenNewMasterElected() throws ExecutionException, InterruptedException { + Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueHours(1)); + PlainActionFuture listener = new PlainActionFuture<>(); + + final Response response = new Response(); + + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes)); + + new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void clusterManagerOperation(Request request, ClusterState state, ActionListener listener) + throws Exception { + // Master steps down but no new master elected yet (master = null) + setState(clusterService, ClusterStateCreationUtils.state(localNode, null, allNodes)); + listener.onFailure(new NotClusterManagerException("Fake error")); + } + }.execute(request, listener); + + // No transport request yet, waiting for a master to appear + assertThat(transport.capturedRequests().length, equalTo(0)); + assertFalse(listener.isDone()); + + // Now a new master is elected (remoteNode) + setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + + // The request should now be forwarded to the new master + assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; + assertTrue(capturedRequest.node.isClusterManagerNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + + transport.handleResponse(capturedRequest.requestId, response); + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + } + + public void testStepDownToNoMasterThenSameNodeReelected() throws ExecutionException, InterruptedException { + Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueHours(1)); + PlainActionFuture listener = new PlainActionFuture<>(); + + final Response response = new Response(); + final int[] callCount = { 0 }; + + // Set initial state with a higher base version so re-election produces a version + // that the predicate recognizes as newer than the original + setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)).version(10)); + + new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void clusterManagerOperation(Request request, ClusterState state, ActionListener listener) + throws Exception { + callCount[0]++; + if (callCount[0] == 1) { + // First call: master steps down, no new master yet + setState(clusterService, ClusterStateCreationUtils.state(localNode, null, allNodes)); + listener.onFailure(new FailedToCommitClusterStateException("Fake error")); + } else { + // Second call: we're master again, succeed + listener.onResponse(response); + } + } + }.execute(request, listener); + + // Waiting for a master, no transport request, not done yet + assertFalse(listener.isDone()); + assertThat(transport.capturedRequests().length, equalTo(0)); + + // Same node becomes master again with higher version + setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)).version(11)); + + // Should have re-executed locally and succeeded + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + assertThat(callCount[0], equalTo(2)); + } + // Validate TransportMasterNodeAction.testDelegateToClusterManager() works correctly on node with the deprecated MASTER_ROLE. public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws ExecutionException, InterruptedException { DiscoveryNode localNode = new DiscoveryNode( @@ -807,6 +884,87 @@ private ClusterState buildClusterState(ClusterState state, long term, long versi return ClusterState.builder(state).version(version).metadata(newMetadata).build(); } + /** + * Tests that when the clusterStateLatestChecker's GetTermVersion request fails with a + * ConnectTransportException, the handleTransportException path correctly triggers a retry + * and the request is eventually forwarded to a new master. + */ + public void testClusterStateLatestCheckerHandlesTransportException() throws ExecutionException, InterruptedException { + Map attributes = new HashMap<>(); + attributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "repo1"); + attributes.put(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "repo2"); + + localNode = new DiscoveryNode( + "local_node", + buildNewFakeTransportAddress(), + attributes, + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + remoteNode = new DiscoveryNode( + "remote_node", + buildNewFakeTransportAddress(), + attributes, + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + allNodes = new DiscoveryNode[] { localNode, remoteNode }; + + setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + + RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + + Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60)); + PlainActionFuture listener = new PlainActionFuture<>(); + Action action = new Action("internal:testAction", transportService, clusterService, threadPool, remoteClusterStateService); + action.execute(request, listener); + + // The first captured request is the GetTermVersion request to the remote master + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; + + // Simulate a ConnectTransportException on the GetTermVersion request + transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake connection error")); + + // The action should now be waiting for a new master via the observer + assertFalse(listener.isDone()); + + // Elect localNode as the new master — this triggers the observer and re-executes locally + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes)); + + assertTrue(listener.isDone()); + assertNotNull(listener.get()); + } + + /** + * Tests that when executeOnClusterManager's transport request fails with a NodeClosedException + * wrapped in a RemoteTransportException, the handleTransportException path correctly retries. + */ + public void testExecuteOnClusterManagerHandlesNodeClosedException() throws ExecutionException, InterruptedException { + Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60)); + setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + + PlainActionFuture listener = new PlainActionFuture<>(); + new Action("internal:testAction", transportService, clusterService, threadPool).execute(request, listener); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; + + // Simulate a NodeClosedException (remote master shutting down) + transport.handleRemoteError(capturedRequest.requestId, new NodeClosedException(remoteNode)); + + // Should be waiting for a new master + assertFalse(listener.isDone()); + + // New master elected (localNode) + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes)); + + assertTrue(listener.isDone()); + assertNotNull(listener.get()); + } + public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() { // request to change cluster compatibility mode to STRICT Settings currentCompatibilityModeSettings = Settings.builder() diff --git a/server/src/test/java/org/opensearch/cluster/ClusterManagerNodeChangePredicateTests.java b/server/src/test/java/org/opensearch/cluster/ClusterManagerNodeChangePredicateTests.java new file mode 100644 index 0000000000000..c492fe54f4941 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/ClusterManagerNodeChangePredicateTests.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.cluster; + +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.function.Predicate; + +/** + * Tests for {@link ClusterManagerNodeChangePredicate}. + * Covers both the {@code build(ClusterState)} and {@code build(long, String)} overloads. + */ +public class ClusterManagerNodeChangePredicateTests extends OpenSearchTestCase { + + private DiscoveryNode clusterManagerNode; + private DiscoveryNode otherClusterManagerNode; + + @Override + public void setUp() throws Exception { + super.setUp(); + clusterManagerNode = new DiscoveryNode("old_master", buildNewFakeTransportAddress(), Version.CURRENT); + otherClusterManagerNode = new DiscoveryNode("new_master", buildNewFakeTransportAddress(), Version.CURRENT); + } + + private ClusterState buildState(DiscoveryNode clusterManagerNode, long version) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + if (clusterManagerNode != null) { + nodesBuilder.add(clusterManagerNode); + nodesBuilder.clusterManagerNodeId(clusterManagerNode.getId()); + } + nodesBuilder.add(otherClusterManagerNode); + return ClusterState.builder(new ClusterName("test")).nodes(nodesBuilder).version(version).build(); + } + + public void testRejectsNullNewClusterManager() { + ClusterState currentState = buildState(clusterManagerNode, 1); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertFalse(predicate.test(buildState(null, 5))); + } + + public void testAcceptsDifferentClusterManager() { + ClusterState currentState = buildState(clusterManagerNode, 1); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertTrue(predicate.test(buildState(otherClusterManagerNode, 1))); + } + + public void testAcceptsHigherVersionSameClusterManager() { + ClusterState currentState = buildState(clusterManagerNode, 1); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertTrue(predicate.test(buildState(clusterManagerNode, 2))); + } + + public void testRejectsSameVersionSameClusterManager() { + ClusterState currentState = buildState(clusterManagerNode, 1); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertFalse(predicate.test(buildState(clusterManagerNode, 1))); + } + + public void testRejectsLowerVersionSameClusterManager() { + ClusterState currentState = buildState(clusterManagerNode, 5); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertFalse(predicate.test(buildState(clusterManagerNode, 3))); + } + + public void testNullCurrentClusterManagerRejectsNullNewClusterManager() { + ClusterState currentState = buildState(null, 1); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertFalse(predicate.test(buildState(null, 5))); + } + + public void testNullCurrentClusterManagerAcceptsNewClusterManager() { + ClusterState currentState = buildState(null, 1); + Predicate predicate = ClusterManagerNodeChangePredicate.build(currentState); + + assertTrue(predicate.test(buildState(clusterManagerNode, 1))); + } + + public void testBuildPrimitivesWithNullCurrentClusterManagerAcceptsNewClusterManager() { + Predicate predicate = ClusterManagerNodeChangePredicate.build(1, null); + + assertTrue(predicate.test(buildState(clusterManagerNode, 1))); + } + + public void testBuildPrimitivesWithNullCurrentClusterManagerRejectsNullNewClusterManager() { + Predicate predicate = ClusterManagerNodeChangePredicate.build(1, null); + + assertFalse(predicate.test(buildState(null, 5))); + } + + public void testBothOverloadsAgreeOnClusterManagerChange() { + ClusterState currentState = buildState(clusterManagerNode, 5); + Predicate fromState = ClusterManagerNodeChangePredicate.build(currentState); + Predicate fromPrimitives = ClusterManagerNodeChangePredicate.build( + currentState.version(), + currentState.nodes().getClusterManagerNode().getEphemeralId() + ); + ClusterState newState = buildState(otherClusterManagerNode, 5); + + assertEquals(fromState.test(newState), fromPrimitives.test(newState)); + } + + public void testBothOverloadsAgreeOnVersionBump() { + ClusterState currentState = buildState(clusterManagerNode, 5); + Predicate fromState = ClusterManagerNodeChangePredicate.build(currentState); + Predicate fromPrimitives = ClusterManagerNodeChangePredicate.build( + currentState.version(), + currentState.nodes().getClusterManagerNode().getEphemeralId() + ); + ClusterState newState = buildState(clusterManagerNode, 10); + + assertEquals(fromState.test(newState), fromPrimitives.test(newState)); + } + + public void testBothOverloadsAgreeOnNoChange() { + ClusterState currentState = buildState(clusterManagerNode, 5); + Predicate fromState = ClusterManagerNodeChangePredicate.build(currentState); + Predicate fromPrimitives = ClusterManagerNodeChangePredicate.build( + currentState.version(), + currentState.nodes().getClusterManagerNode().getEphemeralId() + ); + ClusterState newState = buildState(clusterManagerNode, 5); + + assertEquals(fromState.test(newState), fromPrimitives.test(newState)); + } + + public void testBothOverloadsAgreeOnNullCurrentClusterManager() { + ClusterState currentState = buildState(null, 5); + Predicate fromState = ClusterManagerNodeChangePredicate.build(currentState); + Predicate fromPrimitives = ClusterManagerNodeChangePredicate.build(currentState.version(), null); + + assertEquals(fromState.test(buildState(clusterManagerNode, 5)), fromPrimitives.test(buildState(clusterManagerNode, 5))); + assertEquals(fromState.test(buildState(null, 10)), fromPrimitives.test(buildState(null, 10))); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/ClusterStateObserverTests.java b/server/src/test/java/org/opensearch/cluster/ClusterStateObserverTests.java index b8051c1befeb1..621790ad73299 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterStateObserverTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterStateObserverTests.java @@ -32,15 +32,20 @@ package org.opensearch.cluster; +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.containsString; import static org.mockito.Mockito.any; @@ -89,4 +94,264 @@ public String toString() { assertTrue(listenerAdded.get()); } + /** + * Tests that the ClusterStateObserver constructed with pre-extracted (String, long) values + * correctly detects a newer cluster state via waitForNextChange, matching the behavior of + * the ClusterState-based constructor. + */ + public void testPrimitiveConstructorDetectsNewerState() { + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + when(threadPool.relativeTimeInMillis()).thenReturn(0L); + + final DiscoveryNode masterNode = new DiscoveryNode("master", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState newerState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(masterNode).clusterManagerNodeId(masterNode.getId())) + .version(5) + .build(); + when(clusterApplierService.state()).thenReturn(newerState); + + final AtomicBoolean listenerAdded = new AtomicBoolean(); + doAnswer(invocation -> { + listenerAdded.set(true); + return null; + }).when(clusterApplierService).addTimeoutListener(any(), any()); + + // Construct with persistent node ID and version 1 — newerState has version 5, same master + final ClusterStateObserver observer = new ClusterStateObserver( + masterNode.getId(), + 1L, + clusterApplierService, + TimeValue.timeValueSeconds(30), + logger, + new ThreadContext(Settings.EMPTY) + ); + + final AtomicReference receivedState = new AtomicReference<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + receivedState.set(state); + } + + @Override + public void onClusterServiceClose() {} + + @Override + public void onTimeout(TimeValue timeout) {} + }); + + // The sampled state (version 5) is newer than stored (version 1) with same master, + // so the predicate should accept it immediately without adding a listener + assertFalse(listenerAdded.get()); + assertNotNull(receivedState.get()); + assertEquals(5L, receivedState.get().version()); + } + + /** + * Tests that the ClusterStateObserver constructed with (String, long) correctly waits + * when the current state has the same version and master as the stored state. + */ + public void testPrimitiveConstructorWaitsWhenStateUnchanged() { + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + when(threadPool.relativeTimeInMillis()).thenReturn(0L); + + final DiscoveryNode masterNode = new DiscoveryNode("master", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState sameState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(masterNode).clusterManagerNodeId(masterNode.getId())) + .version(5) + .build(); + when(clusterApplierService.state()).thenReturn(sameState); + + final AtomicBoolean listenerAdded = new AtomicBoolean(); + doAnswer(invocation -> { + listenerAdded.set(true); + return null; + }).when(clusterApplierService).addTimeoutListener(any(), any()); + + // Construct with same persistent node ID and same version — should NOT detect a change + final ClusterStateObserver observer = new ClusterStateObserver( + masterNode.getId(), + 5L, + clusterApplierService, + TimeValue.timeValueSeconds(30), + logger, + new ThreadContext(Settings.EMPTY) + ); + + final AtomicReference receivedState = new AtomicReference<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + receivedState.set(state); + } + + @Override + public void onClusterServiceClose() {} + + @Override + public void onTimeout(TimeValue timeout) {} + }); + + // State hasn't changed, so observer should add a listener and wait + assertTrue(listenerAdded.get()); + assertNull(receivedState.get()); + } + + /** + * Tests that the ClusterStateObserver constructed with (String, long) detects a different + * cluster manager even when the version is the same. + */ + public void testPrimitiveConstructorDetectsDifferentClusterManager() { + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + when(threadPool.relativeTimeInMillis()).thenReturn(0L); + + final DiscoveryNode oldMaster = new DiscoveryNode("old_master", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode newMaster = new DiscoveryNode("new_master", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState newMasterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(oldMaster).add(newMaster).clusterManagerNodeId(newMaster.getId())) + .version(5) + .build(); + when(clusterApplierService.state()).thenReturn(newMasterState); + + final AtomicBoolean listenerAdded = new AtomicBoolean(); + doAnswer(invocation -> { + listenerAdded.set(true); + return null; + }).when(clusterApplierService).addTimeoutListener(any(), any()); + + // Construct with old master's persistent ID — new state has different master + final ClusterStateObserver observer = new ClusterStateObserver( + oldMaster.getId(), + 5L, + clusterApplierService, + TimeValue.timeValueSeconds(30), + logger, + new ThreadContext(Settings.EMPTY) + ); + + final AtomicReference receivedState = new AtomicReference<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + receivedState.set(state); + } + + @Override + public void onClusterServiceClose() {} + + @Override + public void onTimeout(TimeValue timeout) {} + }); + + // Different master detected — should accept immediately + assertFalse(listenerAdded.get()); + assertNotNull(receivedState.get()); + } + + /** + * Tests that the ClusterService-based primitive constructor delegates correctly + * to the ClusterApplierService-based constructor. + */ + public void testPrimitiveConstructorViaClusterService() { + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + final ThreadPool threadPool = mock(ThreadPool.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + when(threadPool.relativeTimeInMillis()).thenReturn(0L); + + final DiscoveryNode masterNode = new DiscoveryNode("master", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState newerState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(masterNode).clusterManagerNodeId(masterNode.getId())) + .version(10) + .build(); + when(clusterApplierService.state()).thenReturn(newerState); + + // Use the ClusterApplierService-based constructor + final ClusterStateObserver observer = new ClusterStateObserver( + masterNode.getId(), + 1L, + clusterApplierService, + TimeValue.timeValueSeconds(30), + logger, + new ThreadContext(Settings.EMPTY) + ); + + final AtomicReference receivedState = new AtomicReference<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + receivedState.set(state); + } + + @Override + public void onClusterServiceClose() {} + + @Override + public void onTimeout(TimeValue timeout) {} + }); + + // Newer version detected — should accept immediately + assertNotNull(receivedState.get()); + assertEquals(10L, receivedState.get().version()); + } + + /** + * Tests that the primitive constructor with null clusterManagerNodeId (no master) + * detects when a master appears. + */ + public void testPrimitiveConstructorNullMasterDetectsNewMaster() { + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + when(threadPool.relativeTimeInMillis()).thenReturn(0L); + + final DiscoveryNode newMaster = new DiscoveryNode("new_master", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState stateWithMaster = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(newMaster).clusterManagerNodeId(newMaster.getId())) + .version(5) + .build(); + when(clusterApplierService.state()).thenReturn(stateWithMaster); + + final AtomicBoolean listenerAdded = new AtomicBoolean(); + doAnswer(invocation -> { + listenerAdded.set(true); + return null; + }).when(clusterApplierService).addTimeoutListener(any(), any()); + + // Construct with null master ID — simulates "no master" initial state + final ClusterStateObserver observer = new ClusterStateObserver( + null, + 5L, + clusterApplierService, + TimeValue.timeValueSeconds(30), + logger, + new ThreadContext(Settings.EMPTY) + ); + + final AtomicReference receivedState = new AtomicReference<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + receivedState.set(state); + } + + @Override + public void onClusterServiceClose() {} + + @Override + public void onTimeout(TimeValue timeout) {} + }); + + // Different master (null -> newMaster) — should accept immediately + assertFalse("should not need to add listener", listenerAdded.get()); + assertNotNull(receivedState.get()); + } + }