From 3ac28e18484f38d7307655ffccde12216912fb2b Mon Sep 17 00:00:00 2001 From: Joshua Adams Date: Thu, 18 Sep 2025 15:29:43 +0100 Subject: [PATCH 1/2] Change FailedToCommitClusterStateException to NotMasterException Modifies the `Batch.onRejection` method to accept a `NotMasterException` rather than a `FailedToCommitClusterStateException`. The `NotMasterException` is thrown because the master node has closed. However, at this point we haven't even tried to commit the cluster state so we know for a fact that it's failed, so a `FailedToCommitClusterStateException`, which implies ambiguity, is wrong here. --- .../cluster/NotMasterException.java | 4 ++++ .../cluster/service/MasterService.java | 17 ++++++++--------- .../cluster/service/MasterServiceTests.java | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java b/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java index 43c43bda9aa8c..af031d44cb679 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java +++ b/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java @@ -26,6 +26,10 @@ public NotMasterException(StreamInput in) throws IOException { super(in); } + public NotMasterException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + @Override public Throwable fillInStackTrace() { return this; diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 596cce87fd151..2620fcbad4475 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -1283,7 +1283,7 @@ public String toString() { if (lifecycle.started()) { nextBatch.run(batchCompletionListener); } else { - nextBatch.onRejection(new FailedToCommitClusterStateException("node closed", getRejectionException())); + nextBatch.onRejection(new NotMasterException("node closed", getRejectionException())); batchCompletionListener.onResponse(null); } }); @@ -1309,7 +1309,7 @@ private void onCompletion() { @Override public void onRejection(Exception e) { assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e; - drainQueueOnRejection(new FailedToCommitClusterStateException("node closed", e)); + drainQueueOnRejection(new NotMasterException("node closed", e)); } @Override @@ -1336,7 +1336,7 @@ private Batch takeNextBatch() { private void forkQueueProcessor() { // single-threaded: started when totalQueueSize transitions from 0 to 1 and keeps calling itself until the queue is drained. if (lifecycle.started() == false) { - drainQueueOnRejection(new FailedToCommitClusterStateException("node closed", getRejectionException())); + drainQueueOnRejection(new NotMasterException("node closed", getRejectionException())); return; } @@ -1353,7 +1353,7 @@ private EsRejectedExecutionException getRejectionException() { return new EsRejectedExecutionException("master service is in state [" + lifecycleState() + "]", true); } - private void drainQueueOnRejection(FailedToCommitClusterStateException e) { + private void drainQueueOnRejection(NotMasterException e) { assert totalQueueSize.get() > 0; do { assert currentlyExecutingBatch == null; @@ -1407,12 +1407,11 @@ private interface Batch { /** * Called when the batch is rejected due to the master service shutting down. * - * @param e is a {@link FailedToCommitClusterStateException} to cause things like {@link TransportMasterNodeAction} to retry after + * @param e is a {@link NotMasterException} to cause things like {@link TransportMasterNodeAction} to retry after * submitting a task to a master which shut down. {@code e.getCause()} is the rejection exception, which should be a * {@link EsRejectedExecutionException} with {@link EsRejectedExecutionException#isExecutorShutdown()} true. */ - // Should really be a NodeClosedException instead, but this exception type doesn't trigger retries today. - void onRejection(FailedToCommitClusterStateException e); + void onRejection(NotMasterException e); /** * @return number of tasks in this batch if the batch is pending, or {@code 0} if the batch is not pending. @@ -1634,7 +1633,7 @@ T acquireForExecution() { return task; } - void onRejection(FailedToCommitClusterStateException e) { + void onRejection(NotMasterException e) { final var task = acquireForExecution(); if (task != null) { try (var ignored = storedContextSupplier.get()) { @@ -1654,7 +1653,7 @@ boolean isPending() { private class Processor implements Batch { @Override - public void onRejection(FailedToCommitClusterStateException e) { + public void onRejection(NotMasterException e) { final var items = queueSize.getAndSet(0); for (int i = 0; i < items; i++) { final var entry = queue.poll(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 4c24abbd5fd9a..da8c013ef44c3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -2182,7 +2182,7 @@ class TestTask implements ClusterStateTaskListener { @Override public void onFailure(Exception e) { assertEquals(expectedHeader, threadPool.getThreadContext().getHeader(testHeader)); - if ((e instanceof FailedToCommitClusterStateException + if ((e instanceof NotMasterException && e.getCause() instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) == false) { throw new AssertionError("unexpected exception", e); @@ -2361,7 +2361,7 @@ class TestTask implements ClusterStateTaskListener { @Override public void onFailure(Exception e) { assertEquals(expectedHeader, threadPool.getThreadContext().getHeader(testHeader)); - if ((e instanceof FailedToCommitClusterStateException + if ((e instanceof NotMasterException && e.getCause() instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) == false) { throw new AssertionError("unexpected exception", e); From b93d07d560ceeadfb0147dac4666c55264c49ad8 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 2 Oct 2025 07:34:10 +0000 Subject: [PATCH 2/2] [CI] Update transport version definitions --- server/src/main/resources/transport/upper_bounds/8.18.csv | 2 +- server/src/main/resources/transport/upper_bounds/8.19.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.0.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.1.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.2.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.3.csv | 1 + 6 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 server/src/main/resources/transport/upper_bounds/9.3.csv diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000