3838import  io .grpc .MethodDescriptor ;
3939import  io .grpc .Status ;
4040import  io .grpc .StatusRuntimeException ;
41+ import  java .time .Duration ;
4142import  java .util .List ;
4243import  java .util .Optional ;
44+ import  java .util .Random ;
4345import  java .util .concurrent .ExecutionException ;
4446import  java .util .concurrent .ScheduledExecutorService ;
4547import  java .util .concurrent .ScheduledFuture ;
5153
5254/** 
5355 * Decorator for a Bigtable data plane connection to add channel warming via PingAndWarm. Channel 
54-  * warming will happen on creation and then every 3 minutes. 
56+  * warming will happen on creation and then every 3 minutes (with jitter) . 
5557 */ 
5658public  class  DataChannel  extends  ManagedChannel  {
5759  private  static  final  Logger  LOGGER  = LoggerFactory .getLogger (DataChannel .class );
5860
61+   private  static  final  Duration  WARM_PERIOD  = Duration .ofMinutes (3 );
62+   private  static  final  Duration  MAX_JITTER  = Duration .ofSeconds (10 );
63+ 
64+   private  final  Random  random  = new  Random ();
5965  private  final  ManagedChannel  inner ;
6066  private  final  Metrics  metrics ;
6167  private  final  ResourceCollector  resourceCollector ;
6268  private  final  CallCredentials  callCredentials ;
63-   private  final  ScheduledFuture <?> antiIdleTask ;
69+   private  final  ScheduledExecutorService  warmingExecutor ;
70+   private  volatile  ScheduledFuture <?> antiIdleTask ;
6471
6572  private  final  AtomicBoolean  closed  = new  AtomicBoolean ();
73+   private  final  Object  scheduleLock  = new  Object ();
6674
6775  public  DataChannel (
6876      ResourceCollector  resourceCollector ,
@@ -83,6 +91,8 @@ public DataChannel(
8391            .keepAliveTime (30 , TimeUnit .SECONDS )
8492            .keepAliveTimeout (10 , TimeUnit .SECONDS )
8593            .build ();
94+ 
95+     this .warmingExecutor  = warmingExecutor ;
8696    this .metrics  = metrics ;
8797
8898    try  {
@@ -96,16 +106,30 @@ public DataChannel(
96106      throw  e ;
97107    }
98108
99-     antiIdleTask  = warmingExecutor .scheduleAtFixedRate (this ::warmQuietly , 3 , 3 , TimeUnit .MINUTES );
109+     antiIdleTask  =
110+         warmingExecutor .schedule (this ::warmTask , nextWarmup ().toMillis (), TimeUnit .MILLISECONDS );
100111    metrics .updateChannelCount (1 );
101112  }
102113
103-   private  void  warmQuietly () {
114+   private  Duration  nextWarmup () {
115+     return  WARM_PERIOD .minus (
116+         Duration .ofMillis ((long ) (MAX_JITTER .toMillis () * random .nextDouble ())));
117+   }
118+ 
119+   private  void  warmTask () {
104120    try  {
105121      warm ();
106122    } catch  (RuntimeException  e ) {
107123      LOGGER .warn ("anti idle ping failed, forcing reconnect" , e );
108124      inner .enterIdle ();
125+     } finally  {
126+       synchronized  (scheduleLock ) {
127+         if  (!closed .get ()) {
128+           antiIdleTask  =
129+               warmingExecutor .schedule (
130+                   this ::warmTask , nextWarmup ().toMillis (), TimeUnit .MILLISECONDS );
131+         }
132+       }
109133    }
110134  }
111135
@@ -204,10 +228,16 @@ public void onClose(Status status, Metadata trailers) {
204228
205229  @ Override 
206230  public  ManagedChannel  shutdown () {
207-     if  (closed .compareAndSet (false , true )) {
231+     final  boolean  closing ;
232+ 
233+     synchronized  (scheduleLock ) {
234+       closing  = closed .compareAndSet (false , true );
235+       antiIdleTask .cancel (true );
236+     }
237+     if  (closing ) {
208238      metrics .updateChannelCount (-1 );
209239    }
210-      antiIdleTask . cancel ( true ); 
240+ 
211241    return  inner .shutdown ();
212242  }
213243
@@ -223,10 +253,17 @@ public boolean isTerminated() {
223253
224254  @ Override 
225255  public  ManagedChannel  shutdownNow () {
226-     if  (closed .compareAndSet (false , true )) {
256+     final  boolean  closing ;
257+ 
258+     synchronized  (scheduleLock ) {
259+       closing  = closed .compareAndSet (false , true );
260+       antiIdleTask .cancel (true );
261+     }
262+ 
263+     if  (closing ) {
227264      metrics .updateChannelCount (-1 );
228265    }
229-      antiIdleTask . cancel ( true ); 
266+ 
230267    return  inner .shutdownNow ();
231268  }
232269
0 commit comments