diff --git a/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java b/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java index 0b586d2d43451..8557e0c148135 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java +++ b/server/src/main/java/org/elasticsearch/cluster/NotMasterException.java @@ -34,6 +34,10 @@ public NotMasterException(StreamInput in) throws IOException { super(in); } + public NotMasterException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + @Override public Throwable fillInStackTrace() { return this; diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 596cce87fd151..2620fcbad4475 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -1283,7 +1283,7 @@ public String toString() { if (lifecycle.started()) { nextBatch.run(batchCompletionListener); } else { - nextBatch.onRejection(new FailedToCommitClusterStateException("node closed", getRejectionException())); + nextBatch.onRejection(new NotMasterException("node closed", getRejectionException())); batchCompletionListener.onResponse(null); } }); @@ -1309,7 +1309,7 @@ private void onCompletion() { @Override public void onRejection(Exception e) { assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e; - drainQueueOnRejection(new FailedToCommitClusterStateException("node closed", e)); + drainQueueOnRejection(new NotMasterException("node closed", e)); } @Override @@ -1336,7 +1336,7 @@ private Batch takeNextBatch() { private void forkQueueProcessor() { // single-threaded: started when totalQueueSize transitions from 0 to 1 and keeps calling itself until the queue is drained. if (lifecycle.started() == false) { - drainQueueOnRejection(new FailedToCommitClusterStateException("node closed", getRejectionException())); + drainQueueOnRejection(new NotMasterException("node closed", getRejectionException())); return; } @@ -1353,7 +1353,7 @@ private EsRejectedExecutionException getRejectionException() { return new EsRejectedExecutionException("master service is in state [" + lifecycleState() + "]", true); } - private void drainQueueOnRejection(FailedToCommitClusterStateException e) { + private void drainQueueOnRejection(NotMasterException e) { assert totalQueueSize.get() > 0; do { assert currentlyExecutingBatch == null; @@ -1407,12 +1407,11 @@ private interface Batch { /** * Called when the batch is rejected due to the master service shutting down. * - * @param e is a {@link FailedToCommitClusterStateException} to cause things like {@link TransportMasterNodeAction} to retry after + * @param e is a {@link NotMasterException} to cause things like {@link TransportMasterNodeAction} to retry after * submitting a task to a master which shut down. {@code e.getCause()} is the rejection exception, which should be a * {@link EsRejectedExecutionException} with {@link EsRejectedExecutionException#isExecutorShutdown()} true. */ - // Should really be a NodeClosedException instead, but this exception type doesn't trigger retries today. - void onRejection(FailedToCommitClusterStateException e); + void onRejection(NotMasterException e); /** * @return number of tasks in this batch if the batch is pending, or {@code 0} if the batch is not pending. @@ -1634,7 +1633,7 @@ T acquireForExecution() { return task; } - void onRejection(FailedToCommitClusterStateException e) { + void onRejection(NotMasterException e) { final var task = acquireForExecution(); if (task != null) { try (var ignored = storedContextSupplier.get()) { @@ -1654,7 +1653,7 @@ boolean isPending() { private class Processor implements Batch { @Override - public void onRejection(FailedToCommitClusterStateException e) { + public void onRejection(NotMasterException e) { final var items = queueSize.getAndSet(0); for (int i = 0; i < items; i++) { final var entry = queue.poll(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 4c24abbd5fd9a..da8c013ef44c3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -2182,7 +2182,7 @@ class TestTask implements ClusterStateTaskListener { @Override public void onFailure(Exception e) { assertEquals(expectedHeader, threadPool.getThreadContext().getHeader(testHeader)); - if ((e instanceof FailedToCommitClusterStateException + if ((e instanceof NotMasterException && e.getCause() instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) == false) { throw new AssertionError("unexpected exception", e); @@ -2361,7 +2361,7 @@ class TestTask implements ClusterStateTaskListener { @Override public void onFailure(Exception e) { assertEquals(expectedHeader, threadPool.getThreadContext().getHeader(testHeader)); - if ((e instanceof FailedToCommitClusterStateException + if ((e instanceof NotMasterException && e.getCause() instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) == false) { throw new AssertionError("unexpected exception", e);