@@ -1283,7 +1283,7 @@ public String toString() {
1283
1283
if (lifecycle .started ()) {
1284
1284
nextBatch .run (batchCompletionListener );
1285
1285
} else {
1286
- nextBatch .onRejection (new FailedToCommitClusterStateException ("node closed" , getRejectionException ()));
1286
+ nextBatch .onRejection (new NotMasterException ("node closed" , getRejectionException ()));
1287
1287
batchCompletionListener .onResponse (null );
1288
1288
}
1289
1289
});
@@ -1309,7 +1309,7 @@ private void onCompletion() {
1309
1309
@ Override
1310
1310
public void onRejection (Exception e ) {
1311
1311
assert e instanceof EsRejectedExecutionException esre && esre .isExecutorShutdown () : e ;
1312
- drainQueueOnRejection (new FailedToCommitClusterStateException ("node closed" , e ));
1312
+ drainQueueOnRejection (new NotMasterException ("node closed" , e ));
1313
1313
}
1314
1314
1315
1315
@ Override
@@ -1336,7 +1336,7 @@ private Batch takeNextBatch() {
1336
1336
private void forkQueueProcessor () {
1337
1337
// single-threaded: started when totalQueueSize transitions from 0 to 1 and keeps calling itself until the queue is drained.
1338
1338
if (lifecycle .started () == false ) {
1339
- drainQueueOnRejection (new FailedToCommitClusterStateException ("node closed" , getRejectionException ()));
1339
+ drainQueueOnRejection (new NotMasterException ("node closed" , getRejectionException ()));
1340
1340
return ;
1341
1341
}
1342
1342
@@ -1353,7 +1353,7 @@ private EsRejectedExecutionException getRejectionException() {
1353
1353
return new EsRejectedExecutionException ("master service is in state [" + lifecycleState () + "]" , true );
1354
1354
}
1355
1355
1356
- private void drainQueueOnRejection (FailedToCommitClusterStateException e ) {
1356
+ private void drainQueueOnRejection (NotMasterException e ) {
1357
1357
assert totalQueueSize .get () > 0 ;
1358
1358
do {
1359
1359
assert currentlyExecutingBatch == null ;
@@ -1407,12 +1407,11 @@ private interface Batch {
1407
1407
/**
1408
1408
* Called when the batch is rejected due to the master service shutting down.
1409
1409
*
1410
- * @param e is a {@link FailedToCommitClusterStateException } to cause things like {@link TransportMasterNodeAction} to retry after
1410
+ * @param e is a {@link NotMasterException } to cause things like {@link TransportMasterNodeAction} to retry after
1411
1411
* submitting a task to a master which shut down. {@code e.getCause()} is the rejection exception, which should be a
1412
1412
* {@link EsRejectedExecutionException} with {@link EsRejectedExecutionException#isExecutorShutdown()} true.
1413
1413
*/
1414
- // Should really be a NodeClosedException instead, but this exception type doesn't trigger retries today.
1415
- void onRejection (FailedToCommitClusterStateException e );
1414
+ void onRejection (NotMasterException e );
1416
1415
1417
1416
/**
1418
1417
* @return number of tasks in this batch if the batch is pending, or {@code 0} if the batch is not pending.
@@ -1634,7 +1633,7 @@ T acquireForExecution() {
1634
1633
return task ;
1635
1634
}
1636
1635
1637
- void onRejection (FailedToCommitClusterStateException e ) {
1636
+ void onRejection (NotMasterException e ) {
1638
1637
final var task = acquireForExecution ();
1639
1638
if (task != null ) {
1640
1639
try (var ignored = storedContextSupplier .get ()) {
@@ -1654,7 +1653,7 @@ boolean isPending() {
1654
1653
1655
1654
private class Processor implements Batch {
1656
1655
@ Override
1657
- public void onRejection (FailedToCommitClusterStateException e ) {
1656
+ public void onRejection (NotMasterException e ) {
1658
1657
final var items = queueSize .getAndSet (0 );
1659
1658
for (int i = 0 ; i < items ; i ++) {
1660
1659
final var entry = queue .poll ();
0 commit comments