Skip to content

Commit b8d4e87

Browse files
committed
- get rid of the queue and event ordering for healthstatus change in MultiClusterPooledConnectionProvider
- add test for init and post init events - fix failing tests
1 parent e6e1121 commit b8d4e87

File tree

3 files changed

+475
-417
lines changed

3 files changed

+475
-417
lines changed

src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java

Lines changed: 37 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
9494
// Flag to control when handleHealthStatusChange should process events (only after initialization)
9595
private volatile boolean initializationComplete = false;
9696

97-
// Queue to hold health status events during initialization
98-
private final ConcurrentLinkedQueue<HealthStatusChangeEvent> pendingHealthStatusChanges = new ConcurrentLinkedQueue<>();
99-
10097
// Failback mechanism fields
10198
private static final AtomicInteger failbackThreadCounter = new AtomicInteger(1);
10299
private final ScheduledExecutorService failbackScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
@@ -174,11 +171,9 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
174171

175172
// Mark initialization as complete - handleHealthStatusChange can now process events
176173
initializationComplete = true;
177-
178-
// Process any events that were queued during initialization
179-
processPendingHealthStatusChanges();
180-
/// --- ///
181-
174+
if (!activeCluster.isHealthy()) {
175+
activeCluster = waitForInitialHealthyCluster();
176+
}
182177
this.fallbackExceptionList = multiClusterClientConfig.getFallbackExceptionList();
183178

184179
// Start periodic failback checker
@@ -249,7 +244,7 @@ public void remove(Endpoint endpoint) {
249244
}
250245

251246
// Remove from health status manager first
252-
healthStatusManager.unregisterListener(endpoint, this::handleHealthStatusChange);
247+
healthStatusManager.unregisterListener(endpoint, this::onHealthStatusChange);
253248
healthStatusManager.remove(endpoint);
254249

255250
// Remove from cluster map
@@ -305,64 +300,28 @@ private void addClusterInternal(MultiClusterClientConfig multiClusterClientConfi
305300
if (strategySupplier != null) {
306301
HealthCheckStrategy hcs = strategySupplier.get(config.getHostAndPort(), config.getJedisClientConfig());
307302
// Register listeners BEFORE adding clusters to avoid missing events
308-
healthStatusManager.registerListener(config.getHostAndPort(), this::handleHealthStatusChange);
303+
healthStatusManager.registerListener(config.getHostAndPort(), this::onHealthStatusChange);
309304
healthStatusManager.add(config.getHostAndPort(), hcs);
310305
} else {
311306
cluster.setHealthStatus(HealthStatus.HEALTHY);
312307
}
313308
}
314309

315-
private void handleHealthStatusChange(HealthStatusChangeEvent eventArgs) {
316-
// Queue events during initialization to process them later
317-
if (!initializationComplete) {
318-
pendingHealthStatusChanges.offer(eventArgs);
319-
return;
320-
}
321-
if (!pendingHealthStatusChanges.isEmpty()) {
322-
processPendingHealthStatusChanges();
323-
}
324-
// Process the event immediately if initialization is complete
325-
processStatusChangeEvent(eventArgs);
326-
}
327-
328310
/**
329-
* Processes any health status events that were queued during initialization. This ensures that no events are lost
330-
* during the initialization process.
311+
* Handles health status changes for clusters. This method is called by the health status manager when the health
312+
* status of a cluster changes.
331313
*/
332-
private void processPendingHealthStatusChanges() {
333-
HealthStatusChangeEvent event;
334-
// Synchronize to ensure the order of events when consuming the queue
335-
synchronized (pendingHealthStatusChanges) {
336-
// Process all queued events
337-
while ((event = pendingHealthStatusChanges.poll()) != null) {
338-
Endpoint endpoint = event.getEndpoint();
339-
boolean latestInTheQueue = !pendingHealthStatusChanges.stream()
340-
.anyMatch(e -> e.getEndpoint().equals(endpoint));
341-
if (latestInTheQueue) {
342-
processStatusChangeEvent(event);
343-
}
344-
}
345-
}
346-
}
347-
348-
/**
349-
* Processes a health status change event. This method contains the actual logic for handling status changes and can
350-
* be called both for queued events and real-time events.
351-
*/
352-
private void processStatusChangeEvent(HealthStatusChangeEvent eventArgs) {
314+
private void onHealthStatusChange(HealthStatusChangeEvent eventArgs) {
353315
Endpoint endpoint = eventArgs.getEndpoint();
354316
HealthStatus newStatus = eventArgs.getNewStatus();
355317
log.debug("Health status changed for {} from {} to {}", endpoint, eventArgs.getOldStatus(), newStatus);
356-
357318
Cluster clusterWithHealthChange = multiClusterMap.get(endpoint);
358319

359320
if (clusterWithHealthChange == null) return;
360321

361322
clusterWithHealthChange.setHealthStatus(newStatus);
362-
363-
if (!newStatus.isHealthy()) {
364-
// Handle failover if this was the active cluster
365-
if (clusterWithHealthChange == activeCluster) {
323+
if (initializationComplete) {
324+
if (!newStatus.isHealthy() && clusterWithHealthChange == activeCluster) {
366325
clusterWithHealthChange.setGracePeriod();
367326
if (iterateActiveCluster() != null) {
368327
this.runClusterFailoverPostProcessor(activeCluster);
@@ -424,35 +383,39 @@ private Cluster waitForInitialHealthyCluster() {
424383
* Periodic failback checker - runs at configured intervals to check for failback opportunities
425384
*/
426385
private void periodicFailbackCheck() {
427-
// Find the best candidate cluster for failback
428-
Cluster bestCandidate = null;
429-
float bestWeight = activeCluster.getWeight();
386+
try {
387+
// Find the best candidate cluster for failback
388+
Cluster bestCandidate = null;
389+
float bestWeight = activeCluster.getWeight();
430390

431-
for (Map.Entry<Endpoint, Cluster> entry : multiClusterMap.entrySet()) {
432-
Cluster cluster = entry.getValue();
391+
for (Map.Entry<Endpoint, Cluster> entry : multiClusterMap.entrySet()) {
392+
Cluster cluster = entry.getValue();
433393

434-
// Skip if this is already the active cluster
435-
if (cluster == activeCluster) {
436-
continue;
437-
}
394+
// Skip if this is already the active cluster
395+
if (cluster == activeCluster) {
396+
continue;
397+
}
438398

439-
// Skip if cluster is not healthy
440-
if (!cluster.isHealthy()) {
441-
continue;
442-
}
399+
// Skip if cluster is not healthy
400+
if (!cluster.isHealthy()) {
401+
continue;
402+
}
443403

444-
// This cluster is a valid candidate
445-
if (cluster.getWeight() > bestWeight) {
446-
bestCandidate = cluster;
447-
bestWeight = cluster.getWeight();
404+
// This cluster is a valid candidate
405+
if (cluster.getWeight() > bestWeight) {
406+
bestCandidate = cluster;
407+
bestWeight = cluster.getWeight();
408+
}
448409
}
449-
}
450410

451-
// Perform failback if we found a better candidate
452-
if (bestCandidate != null) {
453-
log.info("Performing failback from {} to {} (higher weight cluster available)",
454-
activeCluster.getCircuitBreaker().getName(), bestCandidate.getCircuitBreaker().getName());
455-
setActiveCluster(bestCandidate, true);
411+
// Perform failback if we found a better candidate
412+
if (bestCandidate != null) {
413+
log.info("Performing failback from {} to {} (higher weight cluster available)",
414+
activeCluster.getCircuitBreaker().getName(), bestCandidate.getCircuitBreaker().getName());
415+
setActiveCluster(bestCandidate, true);
416+
}
417+
} catch (Exception e) {
418+
log.error("Error during periodic failback check", e);
456419
}
457420
}
458421

0 commit comments

Comments
 (0)