@@ -25,18 +25,19 @@ public abstract class GrpcStreamRetrier {
2525 protected final AtomicBoolean isReconnecting = new AtomicBoolean (false );
2626 protected final AtomicBoolean isStopped = new AtomicBoolean (false );
2727
28+ private final Logger logger ;
2829 private final ScheduledExecutorService scheduler ;
2930 private final RetryConfig retryConfig ;
3031 private volatile int retryCount ;
3132 private volatile long retryStartedAt ;
3233
33- protected GrpcStreamRetrier (RetryConfig retryConfig , ScheduledExecutorService scheduler ) {
34+ protected GrpcStreamRetrier (Logger logger , RetryConfig retryConfig , ScheduledExecutorService scheduler ) {
35+ this .logger = logger ;
3436 this .retryConfig = retryConfig ;
3537 this .scheduler = scheduler ;
3638 this .id = generateRandomId (ID_LENGTH );
3739 }
3840
39- protected abstract Logger getLogger ();
4041 protected abstract String getStreamName ();
4142 protected abstract void onStreamReconnect ();
4243 protected abstract void onShutdown (String reason );
@@ -49,20 +50,20 @@ protected static String generateRandomId(int length) {
4950 .toString ();
5051 }
5152
52- private void tryReconnect (long delay ) {
53+ private void tryScheduleReconnect (long delay ) {
5354 if (!isReconnecting .compareAndSet (false , true )) {
54- getLogger () .info ("[{}] should reconnect {} stream, but reconnect is already in progress" , id ,
55+ logger .info ("[{}] should reconnect {} stream, but reconnect is already in progress" , id ,
5556 getStreamName ());
5657 return ;
5758 }
5859
59- getLogger () .warn ("[{}] Retry #{}. Scheduling {} reconnect in {}ms..." , id , retryCount , getStreamName (), delay );
60+ logger .warn ("[{}] Retry #{}. Scheduling {} reconnect in {}ms..." , id , retryCount , getStreamName (), delay );
6061 try {
6162 scheduler .schedule (this ::reconnect , delay , TimeUnit .MILLISECONDS );
6263 } catch (RejectedExecutionException exception ) {
6364 String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
6465 "Shutting down " + getStreamName ();
65- getLogger () .error (errorMessage );
66+ logger .error (errorMessage );
6667 shutdownImpl (errorMessage );
6768 }
6869 }
@@ -73,9 +74,9 @@ protected void resetRetries() {
7374 }
7475
7576 void reconnect () {
76- getLogger () .info ("[{}] {} reconnect #{} started" , id , getStreamName (), retryCount );
77+ logger .info ("[{}] {} reconnect #{} started" , id , getStreamName (), retryCount );
7778 if (!isReconnecting .compareAndSet (true , false )) {
78- getLogger () .warn ("Couldn't reset reconnect flag. Shouldn't happen" );
79+ logger .warn ("Couldn't reset reconnect flag. Shouldn't happen" );
7980 }
8081 onStreamReconnect ();
8182 }
@@ -85,7 +86,7 @@ protected CompletableFuture<Void> shutdownImpl() {
8586 }
8687
8788 protected CompletableFuture <Void > shutdownImpl (String reason ) {
88- getLogger () .info ("[{}] Shutting down {}"
89+ logger .info ("[{}] Shutting down {}"
8990 + (reason == null || reason .isEmpty () ? "" : " with reason: " + reason ), id , getStreamName ());
9091 isStopped .set (true );
9192 return CompletableFuture .runAsync (() -> {
@@ -94,28 +95,28 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
9495 }
9596
9697 protected void onSessionClosed (Status status , Throwable th ) {
97- getLogger () .info ("[{}] onSessionClosed called" , id );
98+ logger .info ("[{}] onSessionClosed called" , id );
9899
99100 RetryPolicy retryPolicy = null ;
100101 if (th != null ) {
101- getLogger () .error ("[{}] Exception in {} stream session: " , id , getStreamName (), th );
102+ logger .error ("[{}] Exception in {} stream session: " , id , getStreamName (), th );
102103 retryPolicy = retryConfig .isThrowableRetryable (th );
103104 } else {
104105 if (status .isSuccess ()) {
105106 if (isStopped .get ()) {
106- getLogger () .info ("[{}] {} stream session closed successfully" , id , getStreamName ());
107+ logger .info ("[{}] {} stream session closed successfully" , id , getStreamName ());
107108 return ;
108109 } else {
109- getLogger () .warn ("[{}] {} stream session was closed on working {}" , id , getStreamName ());
110+ logger .warn ("[{}] {} stream session was closed on working {}" , id , getStreamName ());
110111 }
111112 } else {
112- getLogger () .warn ("[{}] Error in {} stream session: {}" , id , getStreamName (), status );
113+ logger .warn ("[{}] Error in {} stream session: {}" , id , getStreamName (), status );
113114 retryPolicy = retryConfig .isStatusRetryable (status .getCode ());
114115 }
115116 }
116117
117118 if (isStopped .get ()) {
118- getLogger () .info ("[{}] {} is already stopped, no need to schedule reconnect" , id , getStreamName ());
119+ logger .info ("[{}] {} is already stopped, no need to schedule reconnect" , id , getStreamName ());
119120 return ;
120121 }
121122
@@ -126,23 +127,21 @@ protected void onSessionClosed(Status status, Throwable th) {
126127 long delay = retryPolicy .nextRetryMs (retryCount + 1 , System .currentTimeMillis () - retryStartedAt );
127128 if (delay >= 0 ) {
128129 retryCount ++;
129- tryReconnect (delay );
130+ tryScheduleReconnect (delay );
130131 return ;
131132 }
132133 }
133134
134135 long elapsedMs = retryStartedAt > 0 ? System .currentTimeMillis () - retryStartedAt : 0 ;
135136 if (!isStopped .compareAndSet (false , true )) {
136- getLogger () .warn ("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down." ,
137+ logger .warn ("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down." ,
137138 id , retryCount , elapsedMs , getStreamName ());
138139 return ;
139140 }
140141
141142 String errorMessage = "[" + id + "] Stopped after " + retryCount + " retries and " + elapsedMs +
142143 " ms elapsed. Shutting down " + getStreamName ();
143- getLogger () .error (errorMessage );
144+ logger .error (errorMessage );
144145 shutdownImpl (errorMessage );
145146 }
146-
147-
148147}
0 commit comments