Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/src/main/java/org/elasticsearch/ElasticsearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public class ElasticsearchException extends RuntimeException implements ToXConte

private static final TransportVersion UNKNOWN_VERSION_ADDED = TransportVersion.zero();

private static final TransportVersion FAILED_TO_PUBLISH_CLUSTER_STATE_EXCEPTION_TRANSPORT_VERSION = TransportVersion.fromName(
"failed_to_publish_cluster_state_exception"
);

/**
* Passed in the {@link Params} of {@link #generateThrowableXContent(XContentBuilder, Params, Throwable)}
* to control if the {@code caused_by} element should render. Unlike most parameters to {@code toXContent} methods this parameter is
Expand Down Expand Up @@ -2022,6 +2026,12 @@ private enum ElasticsearchExceptionHandle {
184,
TransportVersions.REMOTE_EXCEPTION,
TransportVersions.REMOTE_EXCEPTION_8_19
),
FAILED_TO_PUBLISH_CLUSTER_STATE_EXCEPTION(
org.elasticsearch.cluster.coordination.FailedToPublishClusterStateException.class,
org.elasticsearch.cluster.coordination.FailedToPublishClusterStateException::new,
185,
FAILED_TO_PUBLISH_CLUSTER_STATE_EXCEPTION_TRANSPORT_VERSION
);

final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public NotMasterException(StreamInput in) throws IOException {
super(in);
}

public NotMasterException(String msg, Object... args) {
Copy link
Contributor Author

@joshua-adams-1 joshua-adams-1 Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as part of #135548 and will disappear once I rebase

super(msg, args);
}

@Override
public Throwable fillInStackTrace() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalMasterServiceTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
Expand Down Expand Up @@ -1552,7 +1553,7 @@ public void publish(
clusterStatePublicationEvent.getNewState().term()
)
);
throw new FailedToCommitClusterStateException(
throw new NotMasterException(
Copy link
Contributor Author

@joshua-adams-1 joshua-adams-1 Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as part of #135548 and will disappear when I rebase

"node is no longer master for term "
+ clusterStatePublicationEvent.getNewState().term()
+ " while handling publication"
Expand All @@ -1567,7 +1568,7 @@ public void publish(
clusterStatePublicationEvent.getSummary()
)
);
throw new FailedToCommitClusterStateException("publication " + currentPublication.get() + " already in progress");
throw new FailedToPublishClusterStateException("publication " + currentPublication.get() + " already in progress");
}

assert assertPreviousStateConsistency(clusterStatePublicationEvent);
Expand All @@ -1586,7 +1587,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
} catch (Exception e) {
logger.debug(() -> "[" + clusterStatePublicationEvent.getSummary() + "] publishing failed during context creation", e);
becomeCandidate("publication context creation");
throw new FailedToCommitClusterStateException("publishing failed during context creation", e);
throw new FailedToPublishClusterStateException("publishing failed during context creation", e);
}

try (Releasable ignored = publicationContext::decRef) {
Expand All @@ -1607,7 +1608,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
e
);
becomeCandidate("publication creation");
throw new FailedToCommitClusterStateException("publishing failed while starting", e);
throw new FailedToPublishClusterStateException("publishing failed while starting", e);
}

try {
Expand Down Expand Up @@ -1638,12 +1639,12 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
}
}
}
} catch (FailedToCommitClusterStateException failedToCommitClusterStateException) {
publishListener.onFailure(failedToCommitClusterStateException);
} catch (FailedToPublishClusterStateException | FailedToCommitClusterStateException | NotMasterException e) {
publishListener.onFailure(e);
} catch (Exception e) {
assert false : e; // all exceptions should already be caught and wrapped in a FailedToCommitClusterStateException
assert false : e; // all exceptions should already be caught and wrapped in a FailedToPublishClusterStateException |
logger.error(() -> "[" + clusterStatePublicationEvent.getSummary() + "] publishing unexpectedly failed", e);
publishListener.onFailure(new FailedToCommitClusterStateException("publishing unexpectedly failed", e));
publishListener.onFailure(new FailedToPublishClusterStateException("publishing unexpectedly failed", e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Thrown when a cluster state publication fails to commit the new cluster state. If publication fails then a new master is elected but the
* update might or might not take effect, depending on whether or not the newly-elected master accepted the published state that failed to
* be committed.
*
* Exception indicating a cluster state update was published but not committed to all nodes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptual nit: "committed" is a global property rather than something that happens on one or more nodes.

Suggested change
* Exception indicating a cluster state update was published but not committed to all nodes.
* Exception indicating a cluster state update was published and may or may not have been committed.

This exception indicates the publishing master doesn't think the update was committed, but it cannot tell for sure. It depends on which other master nodes accepted it and the winner of the next election.

* <p>
* If this exception is thrown, then the cluster state update was published, but is not guaranteed
* to be committed on any nodes, including the next master node. This exception should only be thrown when there is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* to be committed on any nodes, including the next master node. This exception should only be thrown when there is
* to be committed, including the next master node. This exception should only be thrown when there is

* <i>ambiguity</i> whether a cluster state update has been committed.
* <p>
* For exceptions thrown prior to publication,
* when the cluster update has <i>definitely</i> failed, use a {@link FailedToPublishClusterStateException}.
* <p>
* This is a retryable exception inside {@link TransportMasterNodeAction}
* <p>
* See {@link ClusterStatePublisher} for more details.
*/
public class FailedToCommitClusterStateException extends ElasticsearchException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Exception indicating a cluster state update failed prior to publication.
* <p>
* If this exception is thrown, then the cluster state update was <i>not</i> published to any node.
* It is therefore impossible for the new master to have committed this state.
* <p>
* For exceptions thrown <i>after</i> publication, when the cluster state update may or may not have been committed,
* use a {@link FailedToCommitClusterStateException}.
* <p>
* This is a retryable exception inside {@link TransportMasterNodeAction}
*/
public class FailedToPublishClusterStateException extends ElasticsearchException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meaningfully different from NotMasterException? I know the name isn't ideal, but introducing a new ElasticsearchException subclass carries substantial costs too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by costs? I proposed adding a new exception to make the code easier to understand, especially since NotMasterException implies the error occurs because the node is no longer the master which isn't true in these cases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to think about BwC concerns - what happens if you're in a mixed-version cluster and this exception gets thrown to an older node which doesn't know that it's retryable?

Can you point to a case where NotMasterException doesn't imply that the node has (or will very shortly have) stopped being the master?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to think about BwC concerns - what happens if you're in a mixed-version cluster and this exception gets thrown to an older node which doesn't know that it's retryable?

Would something like this protect against mixed version clusters?

if (getVersion().onOrAfter(TransportVersion.THE_VERSION_I_ADDED_ABOVE)) {
    throw new FailedToPublishClusterStateException();
} else {
    throw new FailedToCommitClusterStateException()
}

Can you point to a case where NotMasterException doesn't imply that the node has (or will very shortly have) stopped being the master?

My proposed solution changed the three FailedToCommitClusterStateExceptions below into FailedToPublishClusterStateExceptions:

@Override
    public void publish(
        ClusterStatePublicationEvent clusterStatePublicationEvent,
        ActionListener<Void> publishListener,
        AckListener ackListener
    ) {
        try {
            synchronized (mutex) {
                if (mode != Mode.LEADER || getCurrentTerm() != clusterStatePublicationEvent.getNewState().term()) {
                    logger.debug(
                        () -> format(
                            "[%s] failed publication as node is no longer master for term %s",
                            clusterStatePublicationEvent.getSummary(),
                            clusterStatePublicationEvent.getNewState().term()
                        )
                    );

                    // === Changed in #135548 === //
                    throw new NotMasterException(
                        "node is no longer master for term "
                            + clusterStatePublicationEvent.getNewState().term()
                            + " while handling publication"
                    );
                }

                if (currentPublication.isPresent()) {
                    assert false : "[" + currentPublication.get() + "] in progress, cannot start new publication";
                    logger.error(
                        () -> format(
                            "[%s] failed publication as already publication in progress",
                            clusterStatePublicationEvent.getSummary()
                        )
                    );

                    // === Exception 1 === //
                    throw new FailedToCommitClusterStateException("publication " + currentPublication.get() + " already in progress");
                }

                assert assertPreviousStateConsistency(clusterStatePublicationEvent);

                final ClusterState clusterState;
                final long publicationContextConstructionStartMillis;
                final PublicationTransportHandler.PublicationContext publicationContext;
                final PublishRequest publishRequest;

                try {
                    clusterState = clusterStatePublicationEvent.getNewState();
                    assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()))
                        : getLocalNode() + " should be in published " + clusterState;
                    publicationContextConstructionStartMillis = transportService.getThreadPool().rawRelativeTimeInMillis();
                    publicationContext = publicationHandler.newPublicationContext(clusterStatePublicationEvent);
                } catch (Exception e) {
                    logger.debug(() -> "[" + clusterStatePublicationEvent.getSummary() + "] publishing failed during context creation", e);
                    becomeCandidate("publication context creation");

                    // === Exception 2 === //
                    throw new FailedToCommitClusterStateException("publishing failed during context creation", e);
                }

                try (Releasable ignored = publicationContext::decRef) {
                    try {
                        clusterStatePublicationEvent.setPublicationContextConstructionElapsedMillis(
                            transportService.getThreadPool().rawRelativeTimeInMillis() - publicationContextConstructionStartMillis
                        );
                        publishRequest = coordinationState.get().handleClientValue(clusterState);
                    } catch (Exception e) {
                        logger.warn(
                            "failed to start publication of state version ["
                                + clusterState.version()
                                + "] in term ["
                                + clusterState.term()
                                + "] for ["
                                + clusterStatePublicationEvent.getSummary()
                                + "]",
                            e
                        );
                        becomeCandidate("publication creation");

                        // === Exception 3 === //
                        throw new FailedToCommitClusterStateException("publishing failed while starting", e);
                    }

                    ....
  1. If a publication is already in progress, AFAIU this implies the current node is not the master, because only master nodes can initiate cluster state updates. But what if this is on the new master running at the same time as the old? Can this occur? Because in this case, a NotMasterException would not make sense.
  2. I'm not sure this implies the node will not be the master anymore, since I followed the code through and we can throw an ElasticsearchException here if serialization fails, and that's independent of a node being master.
  3. AFAICT this can be safely converted to a NotMasterException. Digging into the .handleClientValue(clusterState) function I see code throwing exceptions such as:
throw new CoordinationStateRejectedException("election not won");
throw new CoordinationStateRejectedException("cannot start publishing next value before accepting previous one");
throw new CoordinationStateRejectedException(
        "incoming term " + clusterState.term() + " does not match current term " + getCurrentTerm()
);
...

which all imply the current node is not the master anymore since there are term mismatches, and so a NotMasterException is correct


public FailedToPublishClusterStateException(String msg) {
super(msg);
}

public FailedToPublishClusterStateException(StreamInput in) throws IOException {
super(in);
}

public FailedToPublishClusterStateException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

@Override
public Throwable fillInStackTrace() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.coordination.FailedToPublishClusterStateException;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -415,13 +416,37 @@ public void onResponse(Void unused) {

@Override
public void onFailure(Exception exception) {
if (exception instanceof FailedToCommitClusterStateException failedToCommitClusterStateException) {
if (exception instanceof FailedToPublishClusterStateException
|| exception instanceof FailedToCommitClusterStateException
|| exception instanceof NotMasterException) {
final long notificationStartTime = threadPool.rawRelativeTimeInMillis();
final long version = newClusterState.version();
logger.warn(() -> format("failing [%s]: failed to commit cluster state version [%s]", summary, version), exception);

if (exception instanceof FailedToCommitClusterStateException) {
logger.warn(
() -> format("failing [%s]: failed to commit cluster state version [%s]", summary, version),
exception
);
} else if (exception instanceof FailedToPublishClusterStateException) {
logger.warn(
() -> format("failing [%s]: failed to publish cluster state version [%s]", summary, version),
exception
);
} else {
logger.debug(
() -> format(
"node is no longer the master prior to publication of cluster state version [%s]: [%s]",
version,
summary
),
exception
);
}

for (final var executionResult : executionResults) {
executionResult.onPublishFailure(failedToCommitClusterStateException);
executionResult.onPublishFailure(exception);
}

final long notificationMillis = threadPool.rawRelativeTimeInMillis() - notificationStartTime;
clusterStateUpdateStatsTracker.onPublicationFailure(
threadPool.rawRelativeTimeInMillis(),
Expand Down Expand Up @@ -985,11 +1010,17 @@ void onClusterStateUnchanged(ClusterState clusterState) {
}
}

void onPublishFailure(FailedToCommitClusterStateException e) {
void onPublishFailure(Exception e) {
if (publishedStateConsumer == null && onPublicationSuccess == null) {
assert failure != null;
var taskFailure = failure;
failure = new FailedToCommitClusterStateException(e.getMessage(), e);

if (e instanceof FailedToCommitClusterStateException) {
failure = new FailedToCommitClusterStateException(e.getMessage(), e);
} else {
failure = new NotMasterException(e.getMessage(), e);
}
Comment on lines 1011 to 1015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be handling FailedToPublishClusterStateException here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Good catch - I realised I'm missing code here, and in a few other places too


failure.addSuppressed(taskFailure);
notifyFailure();
return;
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
roles_security_stats,9176000
failed_to_publish_cluster_state_exception,9183000
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.cluster.coordination.FailedToPublishClusterStateException;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException;
import org.elasticsearch.cluster.desirednodes.VersionConflictException;
Expand Down Expand Up @@ -846,6 +847,7 @@ public void testIds() {
ids.put(182, IngestPipelineException.class);
ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class);
ids.put(184, RemoteException.class);
ids.put(185, FailedToPublishClusterStateException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down