Skip to content

Commit aaac8f7

Browse files
committed
- adressing reviews and feedback
1 parent 607c66d commit aaac8f7

File tree

3 files changed

+19
-7
lines changed

3 files changed

+19
-7
lines changed

src/main/java/redis/clients/jedis/mcf/HealthCheck.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ private void safeUpdate(long owner, HealthStatus status) {
9292
}
9393
return current;
9494
});
95-
if (oldStatus.getKey() != owner || oldStatus.getValue() != status) {
95+
if (oldStatus.getValue() != status) {
9696
// notify listeners
97-
notifyListeners(oldStatus.getValue(), newStatus.getValue());
97+
notifyListeners(oldStatus.getValue(), status);
9898
}
9999
}
100100

src/main/java/redis/clients/jedis/mcf/StatusTracker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package redis.clients.jedis.mcf;
22

33
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
45
import java.util.concurrent.atomic.AtomicReference;
56

67
import redis.clients.jedis.exceptions.JedisConnectionException;
8+
import redis.clients.jedis.exceptions.JedisValidationException;
79

810
/**
911
* StatusTracker is responsible for tracking and waiting for health status changes for specific endpoints. It provides
@@ -57,7 +59,11 @@ public void onStatusChange(HealthStatusChangeEvent event) {
5759
}
5860

5961
// Wait for the health status change event
60-
latch.await();
62+
// just for safety to not block indefinitely
63+
boolean completed = latch.await(60, TimeUnit.SECONDS);
64+
if (!completed) {
65+
throw new JedisValidationException("Timeout while waiting for health check result");
66+
}
6167
return resultStatus.get();
6268

6369
} catch (InterruptedException e) {

src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Map;
1616
import java.util.concurrent.ConcurrentHashMap;
1717
import java.util.concurrent.ConcurrentLinkedQueue;
18+
import java.util.concurrent.atomic.AtomicInteger;
1819
import java.util.stream.Collectors;
1920
import java.util.concurrent.locks.Lock;
2021
import java.util.concurrent.locks.ReentrantLock;
@@ -97,8 +98,9 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
9798
private final ConcurrentLinkedQueue<HealthStatusChangeEvent> pendingHealthStatusChanges = new ConcurrentLinkedQueue<>();
9899

99100
// Failback mechanism fields
101+
private static final AtomicInteger failbackThreadCounter = new AtomicInteger(1);
100102
private final ScheduledExecutorService failbackScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
101-
Thread t = new Thread(r, "failback-scheduler");
103+
Thread t = new Thread(r, "jedis-failback-" + failbackThreadCounter.getAndIncrement());
102104
t.setDaemon(true);
103105
return t;
104106
});
@@ -333,7 +335,11 @@ private void processPendingHealthStatusChanges() {
333335
synchronized (pendingHealthStatusChanges) {
334336
// Process all queued events
335337
while ((event = pendingHealthStatusChanges.poll()) != null) {
336-
processStatusChangeEvent(event);
338+
Endpoint endpoint = event.getEndpoint();
339+
boolean latestInTheQueue = !pendingHealthStatusChanges.stream().anyMatch(e -> e.getEndpoint().equals(endpoint));
340+
if (latestInTheQueue) {
341+
processStatusChangeEvent(event);
342+
}
337343
}
338344
}
339345
}
@@ -345,7 +351,7 @@ private void processPendingHealthStatusChanges() {
345351
private void processStatusChangeEvent(HealthStatusChangeEvent eventArgs) {
346352
Endpoint endpoint = eventArgs.getEndpoint();
347353
HealthStatus newStatus = eventArgs.getNewStatus();
348-
log.info("Health status changed for {} from {} to {}", endpoint, eventArgs.getOldStatus(), newStatus);
354+
log.debug("Health status changed for {} from {} to {}", endpoint, eventArgs.getOldStatus(), newStatus);
349355

350356
Cluster clusterWithHealthChange = multiClusterMap.get(endpoint);
351357

@@ -394,7 +400,7 @@ private Cluster waitForInitialHealthyCluster() {
394400
status = statusTracker.waitForHealthStatus(endpoint);
395401
} else {
396402
// No health check configured - assume healthy
397-
log.info("No health check configured for cluster {}, deafulting to HEALTHY", endpoint);
403+
log.info("No health check configured for cluster {}, defaulting to HEALTHY", endpoint);
398404
status = HealthStatus.HEALTHY;
399405
}
400406

0 commit comments

Comments
 (0)