Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,20 @@ protected void doStart(ClusterState clusterState) {
}

private void retryOnMasterChange(ClusterState state, Throwable failure) {
retry(state, failure, ClusterManagerNodeChangePredicate.build(state));
retryOnMasterChange(state.version(), state.nodes().getClusterManagerNode(), failure);
}

private void retry(ClusterState state, final Throwable failure, final Predicate<ClusterState> statePredicate) {
private void retryOnMasterChange(long stateVersion, DiscoveryNode clusterManagerNode, Throwable failure) {
final String ephemeralNodeId = clusterManagerNode != null ? clusterManagerNode.getEphemeralId() : null;
retry(stateVersion, clusterManagerNode, failure, ClusterManagerNodeChangePredicate.build(stateVersion, ephemeralNodeId));
}

private void retry(
final long stateVersion,
final DiscoveryNode clusterManagerNode,
final Throwable failure,
final Predicate<ClusterState> statePredicate
) {
if (observer == null) {
final long remainingTimeoutMS = request.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis()
- startTime);
Expand All @@ -298,9 +308,11 @@ private void retry(ClusterState state, final Throwable failure, final Predicate<
listener.onFailure(new ClusterManagerNotDiscoveredException(failure));
return;
}
final String persistentNodeId = clusterManagerNode != null ? clusterManagerNode.getId() : null;
this.observer = new ClusterStateObserver(
state,
clusterService,
persistentNodeId,
stateVersion,
clusterService.getClusterApplierService(),
TimeValue.timeValueMillis(remainingTimeoutMS),
logger,
threadPool.getThreadContext()
Expand Down Expand Up @@ -329,6 +341,10 @@ public void onTimeout(TimeValue timeout) {
}

private ActionListener<Response> getDelegateForLocalExecute(ClusterState clusterState) {
// Extract version and cluster manager node before creating closure to avoid retaining full ClusterState
final long stateVersion = clusterState.version();
final DiscoveryNode clusterManagerNode = clusterState.nodes().getClusterManagerNode();

return ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) {
logger.debug(
Expand All @@ -340,7 +356,7 @@ private ActionListener<Response> getDelegateForLocalExecute(ClusterState cluster
t
);

retryOnMasterChange(clusterState, t);
retryOnMasterChange(stateVersion, clusterManagerNode, t);
} else {
delegatedListener.onFailure(t);
}
Expand Down Expand Up @@ -455,7 +471,9 @@ private boolean checkForBlock(Request request, ClusterState localClusterState) {
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
retry(localClusterState, blockException, newState -> {
final long blockStateVersion = localClusterState.version();
final DiscoveryNode blockClusterManagerNode = localClusterState.nodes().getClusterManagerNode();
retry(blockStateVersion, blockClusterManagerNode, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,27 @@ private ClusterManagerNodeChangePredicate() {
}

/**
* builds a predicate that will accept a cluster state only if it was generated after the current has
* (re-)joined the master
* Builds a predicate that accepts a cluster state only if it was generated after the current node
* (re-)joined the cluster manager. Extracts version and cluster-manager ID from the given state
* and delegates to {@link #build(long, String)}.
*/
public static Predicate<ClusterState> build(ClusterState currentState) {
final long currentVersion = currentState.version();
final DiscoveryNode clusterManagerNode = currentState.nodes().getClusterManagerNode();
final String currentMasterId = clusterManagerNode == null ? null : clusterManagerNode.getEphemeralId();
return build(currentVersion, currentMasterId);
}

/**
* Builds a predicate that accepts a cluster state only if the cluster manager has changed
* or the state version has increased beyond the provided version. Accepts pre-extracted
* values to avoid retaining the full {@link ClusterState} in lambda closures.
*
* @param currentVersion the cluster state version to compare against
* @param currentMasterId the ephemeral ID of the current cluster manager node, or null if none
* @return predicate that returns true when cluster manager changes or version increases
*/
public static Predicate<ClusterState> build(long currentVersion, String currentMasterId) {
return newState -> {
final DiscoveryNode newClusterManager = newState.nodes().getClusterManagerNode();
final boolean accept;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ public ClusterStateObserver(
this.contextHolder = contextHolder;
}

public ClusterStateObserver(
String clusterManagerNodeId,
long version,
ClusterApplierService clusterApplierService,
@Nullable TimeValue timeout,
Logger logger,
ThreadContext contextHolder
) {
this.clusterApplierService = clusterApplierService;
this.threadPool = clusterApplierService.threadPool();
this.lastObservedState = new AtomicReference<>(new StoredState(clusterManagerNodeId, version));
this.timeOutValue = timeout;
if (timeOutValue != null) {
this.startTimeMS = threadPool.relativeTimeInMillis();
}
this.logger = logger;
this.contextHolder = contextHolder;
}

/** sets the last observed state to the currently applied cluster state and returns it */
public ClusterState setAndGetObservedState() {
if (observingContext.get() != null) {
Expand Down Expand Up @@ -311,8 +330,12 @@ private static class StoredState {
private final long version;

StoredState(ClusterState clusterState) {
this.clusterManagerNodeId = clusterState.nodes().getClusterManagerNodeId();
this.version = clusterState.version();
this(clusterState.nodes().getClusterManagerNodeId(), clusterState.version());
}

StoredState(String clusterManagerNodeId, long version) {
this.clusterManagerNodeId = clusterManagerNodeId;
this.version = version;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,83 @@ protected void clusterManagerOperation(Request request, ClusterState state, Acti
assertThat(listener.get(), equalTo(response));
}

public void testStepDownToNoMasterThenNewMasterElected() throws ExecutionException, InterruptedException {
Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueHours(1));
PlainActionFuture<Response> listener = new PlainActionFuture<>();

final Response response = new Response();

setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes));

new Action("internal:testAction", transportService, clusterService, threadPool) {
@Override
protected void clusterManagerOperation(Request request, ClusterState state, ActionListener<Response> listener)
throws Exception {
// Master steps down but no new master elected yet (master = null)
setState(clusterService, ClusterStateCreationUtils.state(localNode, null, allNodes));
listener.onFailure(new NotClusterManagerException("Fake error"));
}
}.execute(request, listener);

// No transport request yet, waiting for a master to appear
assertThat(transport.capturedRequests().length, equalTo(0));
assertFalse(listener.isDone());

// Now a new master is elected (remoteNode)
setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));

// The request should now be forwarded to the new master
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
assertTrue(capturedRequest.node.isClusterManagerNode());
assertThat(capturedRequest.request, equalTo(request));
assertThat(capturedRequest.action, equalTo("internal:testAction"));

transport.handleResponse(capturedRequest.requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
}

public void testStepDownToNoMasterThenSameNodeReelected() throws ExecutionException, InterruptedException {
Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueHours(1));
PlainActionFuture<Response> listener = new PlainActionFuture<>();

final Response response = new Response();
final int[] callCount = { 0 };

// Set initial state with a higher base version so re-election produces a version
// that the predicate recognizes as newer than the original
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)).version(10));

new Action("internal:testAction", transportService, clusterService, threadPool) {
@Override
protected void clusterManagerOperation(Request request, ClusterState state, ActionListener<Response> listener)
throws Exception {
callCount[0]++;
if (callCount[0] == 1) {
// First call: master steps down, no new master yet
setState(clusterService, ClusterStateCreationUtils.state(localNode, null, allNodes));
listener.onFailure(new FailedToCommitClusterStateException("Fake error"));
} else {
// Second call: we're master again, succeed
listener.onResponse(response);
}
}
}.execute(request, listener);

// Waiting for a master, no transport request, not done yet
assertFalse(listener.isDone());
assertThat(transport.capturedRequests().length, equalTo(0));

// Same node becomes master again with higher version
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)).version(11));

// Should have re-executed locally and succeeded
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
assertThat(callCount[0], equalTo(2));
}

// Validate TransportMasterNodeAction.testDelegateToClusterManager() works correctly on node with the deprecated MASTER_ROLE.
public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws ExecutionException, InterruptedException {
DiscoveryNode localNode = new DiscoveryNode(
Expand Down Expand Up @@ -807,6 +884,87 @@ private ClusterState buildClusterState(ClusterState state, long term, long versi
return ClusterState.builder(state).version(version).metadata(newMetadata).build();
}

/**
* Tests that when the clusterStateLatestChecker's GetTermVersion request fails with a
* ConnectTransportException, the handleTransportException path correctly triggers a retry
* and the request is eventually forwarded to a new master.
*/
public void testClusterStateLatestCheckerHandlesTransportException() throws ExecutionException, InterruptedException {
Map<String, String> attributes = new HashMap<>();
attributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "repo1");
attributes.put(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "repo2");

localNode = new DiscoveryNode(
"local_node",
buildNewFakeTransportAddress(),
attributes,
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.CURRENT
);
remoteNode = new DiscoveryNode(
"remote_node",
buildNewFakeTransportAddress(),
attributes,
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.CURRENT
);
allNodes = new DiscoveryNode[] { localNode, remoteNode };

setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));

RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);

Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Action action = new Action("internal:testAction", transportService, clusterService, threadPool, remoteClusterStateService);
action.execute(request, listener);

// The first captured request is the GetTermVersion request to the remote master
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0];

// Simulate a ConnectTransportException on the GetTermVersion request
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake connection error"));

// The action should now be waiting for a new master via the observer
assertFalse(listener.isDone());

// Elect localNode as the new master — this triggers the observer and re-executes locally
setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes));

assertTrue(listener.isDone());
assertNotNull(listener.get());
}

/**
* Tests that when executeOnClusterManager's transport request fails with a NodeClosedException
* wrapped in a RemoteTransportException, the handleTransportException path correctly retries.
*/
public void testExecuteOnClusterManagerHandlesNodeClosedException() throws ExecutionException, InterruptedException {
Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60));
setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));

PlainActionFuture<Response> listener = new PlainActionFuture<>();
new Action("internal:testAction", transportService, clusterService, threadPool).execute(request, listener);

CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0];

// Simulate a NodeClosedException (remote master shutting down)
transport.handleRemoteError(capturedRequest.requestId, new NodeClosedException(remoteNode));

// Should be waiting for a new master
assertFalse(listener.isDone());

// New master elected (localNode)
setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes));

assertTrue(listener.isDone());
assertNotNull(listener.get());
}

public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() {
// request to change cluster compatibility mode to STRICT
Settings currentCompatibilityModeSettings = Settings.builder()
Expand Down
Loading
Loading