File tree Expand file tree Collapse file tree 2 files changed +21
-11
lines changed
server/src/main/java/org/elasticsearch/action/support/replication Expand file tree Collapse file tree 2 files changed +21
-11
lines changed Original file line number Diff line number Diff line change @@ -692,8 +692,10 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
692692 public void onResponse (Releasable releasable ) {
693693 assert replica .getActiveOperationsCount () != 0 : "must perform shard operation under a permit" ;
694694 try {
695- shardOperationOnReplica (replicaRequest .getRequest (), replica , ActionListener .wrap ((replicaResult ) -> {
696- replicaResult .runPostReplicaActions (new ReplicaPostReplicationActionsListener () {
695+ shardOperationOnReplica (
696+ replicaRequest .getRequest (),
697+ replica ,
698+ ActionListener .wrap ((replicaResult ) -> replicaResult .runPostReplicaActions (new ReplicaPostReplicationActionsListener () {
697699 @ Override
698700 public void onResponse (long globalCheckpoint , long localCheckpoint ) {
699701 final ReplicaResponse response = new ReplicaResponse (localCheckpoint , globalCheckpoint );
@@ -716,11 +718,11 @@ public void onFailure(Exception e) {
716718 Releasables .closeWhileHandlingException (releasable );
717719 responseWithFailure (e );
718720 }
719- });
720- }, e -> {
721- Releasables . closeWhileHandlingException ( releasable ); // release shard operation lock before responding to caller
722- AsyncReplicaAction . this . onFailure ( e );
723- }) );
721+ }), e -> {
722+ Releasables . closeWhileHandlingException ( releasable ); // release shard operation lock before responding to caller
723+ AsyncReplicaAction . this . onFailure ( e );
724+ })
725+ );
724726 // TODO: Evaluate if we still need to catch this exception
725727 } catch (Exception e ) {
726728 Releasables .closeWhileHandlingException (releasable ); // release shard operation lock before responding to caller
Original file line number Diff line number Diff line change @@ -511,8 +511,13 @@ private void maybeFinish() {
511511 }
512512
513513 private void updateCheckpoints () {
514- this .globalCheckpoint .accumulateAndGet (indexShard .getLastSyncedGlobalCheckpoint (), Math ::max );
515- this .localCheckpoint .accumulateAndGet (indexShard .getLocalCheckpoint (), Math ::max );
514+ try {
515+ this .globalCheckpoint .accumulateAndGet (indexShard .getLastSyncedGlobalCheckpoint (), Math ::max );
516+ this .localCheckpoint .accumulateAndGet (indexShard .getLocalCheckpoint (), Math ::max );
517+ } catch (Exception e ) {
518+ logger .warn ("Failed to retrieve checkpoints" , e );
519+ assert false : e ;
520+ }
516521 }
517522
518523 void run () {
@@ -559,8 +564,11 @@ public void onFailure(Exception e) {
559564 if (sync ) {
560565 assert pendingOps .get () > 0 ;
561566 indexShard .syncAfterWrite (location , e -> {
562- updateCheckpoints ();
563- syncFailure .set (e );
567+ if (e != null ) {
568+ syncFailure .set (e );
569+ } else {
570+ updateCheckpoints ();
571+ }
564572 maybeFinish ();
565573 });
566574 updateCheckpoints ();
You can’t perform that action at this time.
0 commit comments