16
16
import java .util .concurrent .ConcurrentHashMap ;
17
17
import java .util .concurrent .locks .Lock ;
18
18
import java .util .concurrent .locks .ReentrantLock ;
19
+ import java .util .concurrent .ScheduledExecutorService ;
20
+ import java .util .concurrent .Executors ;
21
+ import java .util .concurrent .TimeUnit ;
22
+
19
23
import java .util .function .Consumer ;
20
24
import java .util .function .Predicate ;
21
25
@@ -82,6 +86,12 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
82
86
83
87
private HealthStatusManager healthStatusManager = new HealthStatusManager ();
84
88
89
+ // Failback mechanism fields
90
+ private final ScheduledExecutorService failbackScheduler = Executors .newSingleThreadScheduledExecutor (r -> {
91
+ Thread t = new Thread (r , "failback-scheduler" );
92
+ t .setDaemon (true );
93
+ return t ;
94
+ });
85
95
// Store retry and circuit breaker configs for dynamic cluster addition/removal
86
96
private RetryConfig retryConfig ;
87
97
private CircuitBreakerConfig circuitBreakerConfig ;
@@ -151,6 +161,13 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
151
161
/// --- ///
152
162
153
163
this .fallbackExceptionList = multiClusterClientConfig .getFallbackExceptionList ();
164
+
165
+ // Start periodic failback checker
166
+ if (multiClusterClientConfig .isFailbackSupported ()) {
167
+ long failbackInterval = multiClusterClientConfig .getFailbackCheckInterval ();
168
+ failbackScheduler .scheduleAtFixedRate (this ::periodicFailbackCheck , failbackInterval , failbackInterval ,
169
+ TimeUnit .MILLISECONDS );
170
+ }
154
171
}
155
172
156
173
/**
@@ -194,6 +211,7 @@ public void remove(Endpoint endpoint) {
194
211
if (multiClusterMap .size () < 2 ) {
195
212
throw new JedisValidationException ("Cannot remove the last remaining endpoint" );
196
213
}
214
+ log .debug ("Removing endpoint {}" , endpoint );
197
215
198
216
activeClusterIndexLock .lock ();
199
217
try {
@@ -251,7 +269,6 @@ private void addClusterInternal(MultiClusterClientConfig multiClusterClientConfi
251
269
circuitBreakerEventPublisher .onError (event -> log .error (String .valueOf (event )));
252
270
circuitBreakerEventPublisher .onFailureRateExceeded (event -> log .error (String .valueOf (event )));
253
271
circuitBreakerEventPublisher .onSlowCallRateExceeded (event -> log .error (String .valueOf (event )));
254
- circuitBreakerEventPublisher .onStateTransition (event -> log .warn (String .valueOf (event )));
255
272
256
273
ConnectionPool pool ;
257
274
if (poolConfig != null ) {
@@ -281,20 +298,51 @@ private void handleStatusChange(HealthStatusChangeEvent eventArgs) {
281
298
282
299
clusterWithHealthChange .setHealthStatus (newStatus );
283
300
284
- if (newStatus .isHealthy ()) {
285
- if (clusterWithHealthChange .isFailbackSupported () && activeCluster != clusterWithHealthChange ) {
286
- // lets check if weighted switching is possible
287
- Map .Entry <Endpoint , Cluster > failbackCluster = findWeightedHealthyClusterToIterate ();
288
- if (failbackCluster == clusterWithHealthChange
289
- && clusterWithHealthChange .getWeight () > activeCluster .getWeight ()) {
290
- setActiveCluster (clusterWithHealthChange , false );
301
+ if (!newStatus .isHealthy ()) {
302
+ // Handle failover if this was the active cluster
303
+ if (clusterWithHealthChange == activeCluster ) {
304
+ clusterWithHealthChange .setGracePeriod ();
305
+ if (iterateActiveCluster () != null ) {
306
+ this .runClusterFailoverPostProcessor (activeCluster );
291
307
}
292
308
}
293
- } else if (clusterWithHealthChange == activeCluster ) {
294
- if (iterateActiveCluster () != null ) {
295
- this .runClusterFailoverPostProcessor (activeCluster );
309
+ }
310
+ }
311
+
312
+ /**
313
+ * Periodic failback checker - runs at configured intervals to check for failback opportunities
314
+ */
315
+ private void periodicFailbackCheck () {
316
+ // Find the best candidate cluster for failback
317
+ Cluster bestCandidate = null ;
318
+ float bestWeight = activeCluster .getWeight ();
319
+
320
+ for (Map .Entry <Endpoint , Cluster > entry : multiClusterMap .entrySet ()) {
321
+ Cluster cluster = entry .getValue ();
322
+
323
+ // Skip if this is already the active cluster
324
+ if (cluster == activeCluster ) {
325
+ continue ;
326
+ }
327
+
328
+ // Skip if cluster is not healthy
329
+ if (!cluster .isHealthy ()) {
330
+ continue ;
331
+ }
332
+
333
+ // This cluster is a valid candidate
334
+ if (cluster .getWeight () > bestWeight ) {
335
+ bestCandidate = cluster ;
336
+ bestWeight = cluster .getWeight ();
296
337
}
297
338
}
339
+
340
+ // Perform failback if we found a better candidate
341
+ if (bestCandidate != null ) {
342
+ log .info ("Performing failback from {} to {} (higher weight cluster available)" ,
343
+ activeCluster .getCircuitBreaker ().getName (), bestCandidate .getCircuitBreaker ().getName ());
344
+ setActiveCluster (bestCandidate , true );
345
+ }
298
346
}
299
347
300
348
public Endpoint iterateActiveCluster () {
@@ -397,7 +445,21 @@ private boolean setActiveCluster(Cluster cluster, boolean validateConnection) {
397
445
398
446
@ Override
399
447
public void close () {
400
- activeCluster .getConnectionPool ().close ();
448
+ // Shutdown the failback scheduler
449
+ failbackScheduler .shutdown ();
450
+ try {
451
+ if (!failbackScheduler .awaitTermination (1 , TimeUnit .SECONDS )) {
452
+ failbackScheduler .shutdownNow ();
453
+ }
454
+ } catch (InterruptedException e ) {
455
+ failbackScheduler .shutdownNow ();
456
+ Thread .currentThread ().interrupt ();
457
+ }
458
+
459
+ // Close all cluster connection pools
460
+ for (Cluster cluster : multiClusterMap .values ()) {
461
+ cluster .getConnectionPool ().close ();
462
+ }
401
463
}
402
464
403
465
@ Override
@@ -425,26 +487,21 @@ public Cluster getCluster() {
425
487
}
426
488
427
489
@ VisibleForTesting
428
- public Cluster getCluster (Endpoint multiClusterIndex ) {
429
- return multiClusterMap .get (multiClusterIndex );
490
+ public Cluster getCluster (Endpoint endpoint ) {
491
+ return multiClusterMap .get (endpoint );
430
492
}
431
493
432
494
public CircuitBreaker getClusterCircuitBreaker () {
433
495
return activeCluster .getCircuitBreaker ();
434
496
}
435
497
436
- public CircuitBreaker getClusterCircuitBreaker (int multiClusterIndex ) {
437
- return activeCluster .getCircuitBreaker ();
438
- }
439
-
440
498
/**
441
499
* Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list provided at
442
500
* startup via the MultiClusterClientConfig, is unavailable and therefore no further failover is possible. Users can
443
501
* manually failback to an available cluster
444
502
*/
445
503
public boolean canIterateOnceMore () {
446
504
Map .Entry <Endpoint , Cluster > e = findWeightedHealthyClusterToIterate ();
447
-
448
505
return e != null ;
449
506
}
450
507
@@ -472,6 +529,9 @@ public static class Cluster {
472
529
private MultiClusterClientConfig multiClusterClientConfig ;
473
530
private boolean disabled = false ;
474
531
532
+ // Grace period tracking
533
+ private volatile long gracePeriodEndsAt = 0 ;
534
+
475
535
public Cluster (ConnectionPool connectionPool , Retry retry , CircuitBreaker circuitBreaker , float weight ,
476
536
MultiClusterClientConfig multiClusterClientConfig ) {
477
537
this .connectionPool = connectionPool ;
@@ -513,11 +573,14 @@ public float getWeight() {
513
573
}
514
574
515
575
public boolean isCBForcedOpen () {
576
+ if (circuitBreaker .getState () == State .FORCED_OPEN && !isInGracePeriod ()) {
577
+ circuitBreaker .transitionToClosedState ();
578
+ }
516
579
return circuitBreaker .getState () == CircuitBreaker .State .FORCED_OPEN ;
517
580
}
518
581
519
582
public boolean isHealthy () {
520
- return healthStatus .isHealthy () && !isCBForcedOpen () && !disabled ;
583
+ return healthStatus .isHealthy () && !isCBForcedOpen () && !disabled && ! isInGracePeriod () ;
521
584
}
522
585
523
586
public boolean retryOnFailover () {
@@ -532,6 +595,20 @@ public void setDisabled(boolean disabled) {
532
595
this .disabled = disabled ;
533
596
}
534
597
598
+ /**
599
+ * Checks if the cluster is currently in grace period
600
+ */
601
+ public boolean isInGracePeriod () {
602
+ return System .currentTimeMillis () < gracePeriodEndsAt ;
603
+ }
604
+
605
+ /**
606
+ * Sets the grace period for this cluster
607
+ */
608
+ public void setGracePeriod () {
609
+ gracePeriodEndsAt = System .currentTimeMillis () + multiClusterClientConfig .getGracePeriod ();
610
+ }
611
+
535
612
/**
536
613
* Whether failback is supported by client
537
614
*/
0 commit comments