Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public NotMasterException(StreamInput in) throws IOException {
super(in);
}

public NotMasterException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

@Override
public Throwable fillInStackTrace() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ public String toString() {
if (lifecycle.started()) {
nextBatch.run(batchCompletionListener);
} else {
nextBatch.onRejection(new FailedToCommitClusterStateException("node closed", getRejectionException()));
nextBatch.onRejection(new NotMasterException("node closed", getRejectionException()));
batchCompletionListener.onResponse(null);
}
});
Expand All @@ -1309,7 +1309,7 @@ private void onCompletion() {
@Override
public void onRejection(Exception e) {
assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
drainQueueOnRejection(new FailedToCommitClusterStateException("node closed", e));
drainQueueOnRejection(new NotMasterException("node closed", e));
}

@Override
Expand All @@ -1336,7 +1336,7 @@ private Batch takeNextBatch() {
private void forkQueueProcessor() {
// single-threaded: started when totalQueueSize transitions from 0 to 1 and keeps calling itself until the queue is drained.
if (lifecycle.started() == false) {
drainQueueOnRejection(new FailedToCommitClusterStateException("node closed", getRejectionException()));
drainQueueOnRejection(new NotMasterException("node closed", getRejectionException()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forkQueueProcessor can be called on success or failure of a Runnable. Is that a problem in terms of knowing whether or not the cluster update was successful, or is it not a problem for some reason? I haven't worked with the MasterService before, so I might be missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU, your question is: "During a cluster state update, where is this code running, since if this exception is thrown after the cluster state update is pushed to the wire, then according to the FailedToCommitClusterStateException exception definition here, this is the correct exception to be throwing".

I'm still trying to understand the code myself so excuse me if I make a mistake, but as I understand it, this code isn't even running during a cluster state update. forkQueueProcessor is called twice in the code, here in the Runnable and here in a PerPriorityQueue.

PerPriorityQueue

In this case we're draining the queue because the threadpool shut down. The workflow is:

  1. We batch tasks on the master node to be executed together. This is handled by a BatchingTaskQueue, here
  2. When submitting a task to this queue, we invoke a PerPriorityQueue, here
  3. The execute() function called invokes forkQueueProcessor here
  4. A FailedToCommitClusterStateException is passed into drainQueueOnRejection here
  5. drainQueueOnRejection here uses the exception to reject a batch of tasks.
  6. In this case, we are not attempting to publish a cluster state update, and so a FailedToCommitClusterStateException is wrong. The cluster state update publication workflow is here inside the Coordinator. The reason a FailedToCommitClusterStateException is thrown is explained by a comment inside the Batch interface, here, but I shall copy below:
/**
   * Called when the batch is rejected due to the master service shutting down.
   *
   * @param e is a {@link FailedToCommitClusterStateException} to cause things like {@link TransportMasterNodeAction} to retry after
   *          submitting a task to a master which shut down. {@code e.getCause()} is the rejection exception, which should be a
   *          {@link EsRejectedExecutionException} with {@link EsRejectedExecutionException#isExecutorShutdown()} true.
   */
  // Should really be a NodeClosedException instead, but this exception type doesn't trigger retries today.
**/

^^ As shown, the FailedToCommitClusterStateException was always acknowledged to be wrong from the
beginning.

Runnable

The runnable is invoked in only one place, inside forkQueueProcessor, here. The success or failure you mention above is not referring to cluster state update success or failure, but rather success or failure executing a batch of tasks from the queue.

Therefore, in this case, I believe the exception to be incorrect, and used as a quick way to guarantee retries. As explained in another comment, both FailedToCommitClusterStateException and NotMasterException retry inside TransportMasterNodeAction, so replacing one with the other should have no adverse effect, while semantically improving our exception handling

return;
}

Expand All @@ -1353,7 +1353,7 @@ private EsRejectedExecutionException getRejectionException() {
return new EsRejectedExecutionException("master service is in state [" + lifecycleState() + "]", true);
}

private void drainQueueOnRejection(FailedToCommitClusterStateException e) {
private void drainQueueOnRejection(NotMasterException e) {
assert totalQueueSize.get() > 0;
do {
assert currentlyExecutingBatch == null;
Expand Down Expand Up @@ -1407,12 +1407,11 @@ private interface Batch {
/**
* Called when the batch is rejected due to the master service shutting down.
*
* @param e is a {@link FailedToCommitClusterStateException} to cause things like {@link TransportMasterNodeAction} to retry after
* @param e is a {@link NotMasterException} to cause things like {@link TransportMasterNodeAction} to retry after
* submitting a task to a master which shut down. {@code e.getCause()} is the rejection exception, which should be a
* {@link EsRejectedExecutionException} with {@link EsRejectedExecutionException#isExecutorShutdown()} true.
*/
// Should really be a NodeClosedException instead, but this exception type doesn't trigger retries today.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you look into this, whether NotMasterException will trigger some sort of retry?

Copy link
Contributor Author

@joshua-adams-1 joshua-adams-1 Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A NotMasterException is thrown when a non-master node is attempting to perform an action reserved for a master node. In this instance, it could occur when a master node loses an election midway through a cluster state update and is then not the master anymore. The action is therefore not to be retried on the same node (since it would hit the same exception), but is expected to be retried on the next master.

In the code, this can be seen here inside TransportMasterNodeAction:

// TransportMasterNodeAction
ActionListener<Response> delegate = listener.delegateResponse((delegatedListener, t) -> {
    if (MasterService.isPublishFailureException(t)) {
        logger.debug(
            () -> format(
                "master could not publish cluster state or "
                    + "stepped down before publishing action [%s], scheduling a retry",
                actionName
            ),
            t
        );
        retryOnNextState(currentStateVersion, t);
    } else {
        logger.debug("unexpected exception during publication", t);
        delegatedListener.onFailure(t);
    }
});

// MasterService
public static boolean isPublishFailureException(Exception e) {
    return e instanceof NotMasterException || e instanceof FailedToCommitClusterStateException;
}

where if an NotMasterException is thrown, we retry on the next cluster state update, which should be sent from the new master. Since the exception currently thrown is a FailedToCommitClusterStateException which is retried in the same way, this change should preserve functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have kept this comment - I opened #135902 to reinstate it.

void onRejection(FailedToCommitClusterStateException e);
void onRejection(NotMasterException e);

/**
* @return number of tasks in this batch if the batch is pending, or {@code 0} if the batch is not pending.
Expand Down Expand Up @@ -1634,7 +1633,7 @@ T acquireForExecution() {
return task;
}

void onRejection(FailedToCommitClusterStateException e) {
void onRejection(NotMasterException e) {
final var task = acquireForExecution();
if (task != null) {
try (var ignored = storedContextSupplier.get()) {
Expand All @@ -1654,7 +1653,7 @@ boolean isPending() {

private class Processor implements Batch {
@Override
public void onRejection(FailedToCommitClusterStateException e) {
public void onRejection(NotMasterException e) {
final var items = queueSize.getAndSet(0);
for (int i = 0; i < items; i++) {
final var entry = queue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2182,7 +2182,7 @@ class TestTask implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
assertEquals(expectedHeader, threadPool.getThreadContext().getHeader(testHeader));
if ((e instanceof FailedToCommitClusterStateException
if ((e instanceof NotMasterException
&& e.getCause() instanceof EsRejectedExecutionException esre
&& esre.isExecutorShutdown()) == false) {
throw new AssertionError("unexpected exception", e);
Expand Down Expand Up @@ -2361,7 +2361,7 @@ class TestTask implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
assertEquals(expectedHeader, threadPool.getThreadContext().getHeader(testHeader));
if ((e instanceof FailedToCommitClusterStateException
if ((e instanceof NotMasterException
&& e.getCause() instanceof EsRejectedExecutionException esre
&& esre.isExecutorShutdown()) == false) {
throw new AssertionError("unexpected exception", e);
Expand Down