Skip to content

Commit c8c0b01

Browse files
atakavciCopilot
andauthored
[automatic failover] Implement wait on healthCheck results during client init (#4207)
* - weighted cluster seleciton - Healtstatus manager with initial listener and registration logic - pluggable health checker strategy introduced, these are draft NoOpStrategy, EchoStrategy, LagAwareStrategy, - fix failing tests impacted from weighted clusters * - add builder for ClusterConfig - add echo ot CommandObjects and UnifiedJEdis - improve StrategySupplier by accepting jedisclientconfig - adapt EchoStrategy to StrategySupplier. Now it handles the creation of connection by accepting endpoint and JedisClientConfig - make healthchecks disabled by default - drop noOpStrategy - add unit&integration tests for health check * - fix naming * clean up and mark override methods * fix link in javadoc * fix formatting * - fix double registered listeners in healtstatusmgr - clear redundant catch - replace failover options and drop failoveroptions class - remove forced_unhealthy from healthstatus - fix failback check - add disabled flag to cluster - update/fix related tests * Update src/main/java/redis/clients/jedis/mcf/EchoStrategy.java Co-authored-by: Copilot <[email protected]> * - add remove endpoints * - replace cluster disabled with failbackCandidate - replace failback enabled with failbacksupported in client - fix formatting - set defaults * - remove failback candidate - fix failing tests * - fix remove logic - fix failing tests * - periodic failback checks - introduce graceperiod - fix issue when CB is forced_open and gracePeriod is completed * - introduce StatusTracker with purpose of waiting initial healthcheck results during consturction of provider - add HealthStatus.UNKNOWN as default for Cluster - handle status changes in order of events during initialization - add tests for status tracker and orderingof events - fix impacted unit&integ tests * - introduce forceActiveCluster by duration - fix formatting * - fix failing tests by waiting on clusters to get healthy * - fix failing scenario test - downgrade logback version for slf4j compatibility - increase timeouts for faultInjector * - adressing reviews and feedback * - fix formatting * - fix formatting * - get rid of the queue and event ordering for healthstatus change in MultiClusterPooledConnectionProvider - add test for init and post init events - fix failing tests * - replace use of reflection with helper methods - fix failing tests due to method name change * - replace 'sleep' with 'await', feedback from @a-TODO-rov --------- Co-authored-by: Copilot <[email protected]>
1 parent 34a8107 commit c8c0b01

18 files changed

+1191
-254
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@
143143
<dependency>
144144
<groupId>ch.qos.logback</groupId>
145145
<artifactId>logback-classic</artifactId>
146-
<version>1.3.15</version>
146+
<version>1.2.12</version>
147147
<scope>test</scope>
148148
</dependency>
149149
<dependency>

src/main/java/redis/clients/jedis/MultiClusterClientConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ public static interface StrategySupplier {
162162
private long gracePeriod;
163163

164164
public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
165+
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
166+
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");
167+
for (ClusterConfig clusterConfig : clusterConfigs) {
168+
if (clusterConfig == null)
169+
throw new IllegalArgumentException("ClusterClientConfigs must not contain null elements");
170+
}
165171
this.clusterConfigs = clusterConfigs;
166172
}
167173

src/main/java/redis/clients/jedis/mcf/HealthCheck.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class HealthCheck {
2727
this.endpoint = endpoint;
2828
this.strategy = strategy;
2929
this.statusChangeCallback = statusChangeCallback;
30-
statusRef.set(new SimpleEntry<>(0L, HealthStatus.HEALTHY));
30+
statusRef.set(new SimpleEntry<>(0L, HealthStatus.UNKNOWN));
3131
}
3232

3333
public Endpoint getEndpoint() {
@@ -92,9 +92,9 @@ private void safeUpdate(long owner, HealthStatus status) {
9292
}
9393
return current;
9494
});
95-
if (oldStatus.getKey() != owner || oldStatus.getValue() != status) {
95+
if (oldStatus.getValue() != status) {
9696
// notify listeners
97-
notifyListeners(oldStatus.getValue(), newStatus.getValue());
97+
notifyListeners(oldStatus.getValue(), status);
9898
}
9999
}
100100

src/main/java/redis/clients/jedis/mcf/HealthStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package redis.clients.jedis.mcf;
22

33
public enum HealthStatus {
4-
HEALTHY(0x01), UNHEALTHY(0x02);
4+
UNKNOWN(0x00), HEALTHY(0x01), UNHEALTHY(0x02);
55

66
private final int value;
77

src/main/java/redis/clients/jedis/mcf/HealthStatusManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ public void removeAll(Endpoint[] endpoints) {
7272

7373
public HealthStatus getHealthStatus(Endpoint endpoint) {
7474
HealthCheck healthCheck = healthChecks.get(endpoint);
75-
return healthCheck != null ? healthCheck.getStatus() : HealthStatus.UNHEALTHY;
75+
return healthCheck != null ? healthCheck.getStatus() : HealthStatus.UNKNOWN;
76+
}
77+
78+
public boolean hasHealthCheck(Endpoint endpoint) {
79+
return healthChecks.get(endpoint) != null;
7680
}
7781
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package redis.clients.jedis.mcf;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
import redis.clients.jedis.exceptions.JedisConnectionException;
8+
import redis.clients.jedis.exceptions.JedisValidationException;
9+
10+
/**
11+
* StatusTracker is responsible for tracking and waiting for health status changes for specific endpoints. It provides
12+
* an event-driven approach to wait for health status transitions from UNKNOWN to either HEALTHY or UNHEALTHY.
13+
*/
14+
public class StatusTracker {
15+
16+
private final HealthStatusManager healthStatusManager;
17+
18+
public StatusTracker(HealthStatusManager healthStatusManager) {
19+
this.healthStatusManager = healthStatusManager;
20+
}
21+
22+
/**
23+
* Waits for a specific endpoint's health status to be determined (not UNKNOWN). Uses event-driven approach with
24+
* CountDownLatch to avoid polling.
25+
* @param endpoint the endpoint to wait for
26+
* @return the determined health status (HEALTHY or UNHEALTHY)
27+
* @throws JedisConnectionException if interrupted while waiting
28+
*/
29+
public HealthStatus waitForHealthStatus(Endpoint endpoint) {
30+
// First check if status is already determined
31+
HealthStatus currentStatus = healthStatusManager.getHealthStatus(endpoint);
32+
if (currentStatus != HealthStatus.UNKNOWN) {
33+
return currentStatus;
34+
}
35+
36+
// Set up event-driven waiting
37+
final CountDownLatch latch = new CountDownLatch(1);
38+
final AtomicReference<HealthStatus> resultStatus = new AtomicReference<>();
39+
40+
// Create a temporary listener for this specific endpoint
41+
HealthStatusListener tempListener = new HealthStatusListener() {
42+
@Override
43+
public void onStatusChange(HealthStatusChangeEvent event) {
44+
if (event.getEndpoint().equals(endpoint) && event.getNewStatus() != HealthStatus.UNKNOWN) {
45+
resultStatus.set(event.getNewStatus());
46+
latch.countDown();
47+
}
48+
}
49+
};
50+
51+
// Register the temporary listener
52+
healthStatusManager.registerListener(endpoint, tempListener);
53+
54+
try {
55+
// Double-check status after registering listener (race condition protection)
56+
currentStatus = healthStatusManager.getHealthStatus(endpoint);
57+
if (currentStatus != HealthStatus.UNKNOWN) {
58+
return currentStatus;
59+
}
60+
61+
// Wait for the health status change event
62+
// just for safety to not block indefinitely
63+
boolean completed = latch.await(60, TimeUnit.SECONDS);
64+
if (!completed) {
65+
throw new JedisValidationException("Timeout while waiting for health check result");
66+
}
67+
return resultStatus.get();
68+
69+
} catch (InterruptedException e) {
70+
Thread.currentThread().interrupt();
71+
throw new JedisConnectionException("Interrupted while waiting for health check result", e);
72+
} finally {
73+
// Clean up: unregister the temporary listener
74+
healthStatusManager.unregisterListener(endpoint, tempListener);
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)