@@ -73,7 +73,11 @@ public class GrpcLogAppender extends LogAppenderBase {
7373 private enum BatchLogKey implements BatchLogger .Key {
7474 RESET_CLIENT ,
7575 INCONSISTENCY_REPLY ,
76- APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
76+ APPEND_LOG_RESPONSE_HANDLER_ON_ERROR ,
77+ INSTALL_SNAPSHOT_NOTIFY ,
78+ INSTALL_SNAPSHOT_REPLY ,
79+ INSTALL_SNAPSHOT_IN_PROGRESS ,
80+ SNAPSHOT_UNAVAILABLE
7781 }
7882
7983 public static final int INSTALL_SNAPSHOT_NOTIFICATION_INDEX = 0 ;
@@ -234,7 +238,7 @@ private void resetClient(AppendEntriesRequest request, Event event) {
234238 }
235239 getFollower ().computeNextIndex (getNextIndexForError (nextIndex ));
236240 } catch (IOException ie ) {
237- LOG .warn (this + " : Failed to getClient for " + getFollowerId (), ie );
241+ LOG .warn ("{} : Failed to resetClient for {}" , this , getFollowerId (), ie );
238242 }
239243 }
240244
@@ -497,8 +501,8 @@ public void onNext(AppendEntriesReplyProto reply) {
497501 try {
498502 onNextImpl (request , reply );
499503 } catch (Exception t ) {
500- LOG .error ("Failed onNext request=" + request
501- + ", reply=" + ServerStringUtils .toAppendEntriesReplyString (reply ), t );
504+ LOG .error ("Failed onNext(reply), request={}, reply={}" ,
505+ request , ServerStringUtils .toAppendEntriesReplyString (reply ), t );
502506 }
503507 }
504508
@@ -573,8 +577,8 @@ private void updateNextIndex(long replyNextIndex) {
573577 }
574578
575579 private class InstallSnapshotResponseHandler implements StreamObserver <InstallSnapshotReplyProto > {
576- private final String name = getFollower (). getName () + "-" + JavaUtils . getClassSimpleName ( getClass ()) ;
577- private final Queue <Integer > pending ;
580+ private final String name ;
581+ private final Queue <Integer > pending = new LinkedList <>() ;
578582 private final CompletableFuture <Void > done = new CompletableFuture <>();
579583 private final boolean isNotificationOnly ;
580584
@@ -583,8 +587,8 @@ private class InstallSnapshotResponseHandler implements StreamObserver<InstallSn
583587 }
584588
585589 InstallSnapshotResponseHandler (boolean notifyOnly ) {
586- pending = new LinkedList <>();
587590 this .isNotificationOnly = notifyOnly ;
591+ this .name = getFollower ().getName () + "-InstallSnapshot" + (isNotificationOnly ? "Notification" : "" );
588592 }
589593
590594 void addPending (InstallSnapshotRequestProto request ) {
@@ -626,8 +630,8 @@ void onFollowerCatchup(long followerSnapshotIndex) {
626630 final long leaderStartIndex = getRaftLog ().getStartIndex ();
627631 final long followerNextIndex = followerSnapshotIndex + 1 ;
628632 if (followerNextIndex >= leaderStartIndex ) {
629- LOG .info ("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}" ,
630- this , followerNextIndex );
633+ LOG .info ("{}: follower nextIndex = {} >= leader startIndex = {}" ,
634+ this , followerNextIndex , leaderStartIndex );
631635 notifyInstallSnapshotFinished (InstallSnapshotResult .SUCCESS , followerSnapshotIndex );
632636 }
633637 }
@@ -659,10 +663,10 @@ boolean hasAllResponse() {
659663
660664 @ Override
661665 public void onNext (InstallSnapshotReplyProto reply ) {
662- if ( LOG . isInfoEnabled ()) {
663- LOG .info ("{}: received {} reply {}" , this , replyState . isFirstReplyReceived ()? "a" : "the first" ,
664- ServerStringUtils . toInstallSnapshotReplyString ( reply ));
665- }
666+ BatchLogger . print ( BatchLogKey . INSTALL_SNAPSHOT_REPLY , name ,
667+ suffix -> LOG .info ("{}: received {} reply {} {} " , this ,
668+ replyState . isFirstReplyReceived () ? "a" : "the first" ,
669+ ServerStringUtils . toInstallSnapshotReplyString ( reply ), suffix ));
666670
667671 // update the last rpc time
668672 getFollower ().updateLastRpcResponseTime ();
@@ -671,12 +675,13 @@ public void onNext(InstallSnapshotReplyProto reply) {
671675 final long followerSnapshotIndex ;
672676 switch (reply .getResult ()) {
673677 case SUCCESS :
674- LOG .info ("{}: Completed InstallSnapshot. Reply: {} " , this , reply );
678+ LOG .info ("{}: Completed" , this );
675679 getFollower ().setAttemptedToInstallSnapshot ();
676680 removePending (reply );
677681 break ;
678682 case IN_PROGRESS :
679- LOG .info ("{}: InstallSnapshot in progress." , this );
683+ BatchLogger .print (BatchLogKey .INSTALL_SNAPSHOT_IN_PROGRESS , name ,
684+ suffix -> LOG .info ("{}: in progress, {}" , this , suffix ));
680685 removePending (reply );
681686 break ;
682687 case ALREADY_INSTALLED :
@@ -692,7 +697,7 @@ public void onNext(InstallSnapshotReplyProto reply) {
692697 onFollowerTerm (reply .getTerm ());
693698 break ;
694699 case CONF_MISMATCH :
695- LOG .error ("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}" ,
700+ LOG .error ("{}: CONF_MISMATCH ({}): Leader {} has it set to {} but follower {} has it set to {}" ,
696701 this , RaftServerConfigKeys .Log .Appender .INSTALL_SNAPSHOT_ENABLED_KEY ,
697702 getServer ().getId (), installSnapshotEnabled , getFollowerId (), !installSnapshotEnabled );
698703 break ;
@@ -707,17 +712,19 @@ public void onNext(InstallSnapshotReplyProto reply) {
707712 removePending (reply );
708713 break ;
709714 case SNAPSHOT_UNAVAILABLE :
710- LOG .info ("{}: Follower could not install snapshot as it is not available." , this );
715+ BatchLogger .print (BatchLogKey .SNAPSHOT_UNAVAILABLE , name ,
716+ suffix -> LOG .info ("{}: Follower failed since the snapshot is unavailable {}" , this , suffix ));
711717 getFollower ().setAttemptedToInstallSnapshot ();
712718 notifyInstallSnapshotFinished (InstallSnapshotResult .SNAPSHOT_UNAVAILABLE , RaftLog .INVALID_LOG_INDEX );
713719 removePending (reply );
714720 break ;
715721 case UNRECOGNIZED :
716- LOG .error ("Unrecognized the reply result {}: Leader is {}, follower is {}" ,
717- reply .getResult (), getServer (). getId (), getFollowerId ( ));
722+ LOG .error ("{}: Reply result {}, {}" ,
723+ name , reply .getResult (), ServerStringUtils . toInstallSnapshotReplyString ( reply ));
718724 break ;
719725 case SNAPSHOT_EXPIRED :
720- LOG .warn ("{}: Follower could not install snapshot as it is expired." , this );
726+ LOG .warn ("{}: Follower failed since the request expired, {}" ,
727+ name , ServerStringUtils .toInstallSnapshotReplyString (reply ));
721728 default :
722729 break ;
723730 }
@@ -796,8 +803,9 @@ private void installSnapshot(SnapshotInfo snapshot) {
796803 * @param firstAvailable the first available log's index on the Leader
797804 */
798805 private void notifyInstallSnapshot (TermIndex firstAvailable ) {
799- LOG .info ("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}" ,
800- this , firstAvailable , getFollower ().getNextIndex ());
806+ BatchLogger .print (BatchLogKey .INSTALL_SNAPSHOT_NOTIFY , getFollower ().getName (),
807+ suffix -> LOG .info ("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={} {}" ,
808+ this , firstAvailable , getFollower ().getNextIndex (), suffix ));
801809
802810 final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler (true );
803811 StreamObserver <InstallSnapshotRequestProto > snapshotRequestObserver = null ;
0 commit comments