Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8a9f876
- weighted cluster seleciton
atakavci Jun 27, 2025
d514ecf
- add builder for ClusterConfig
atakavci Jul 9, 2025
df66b1e
- fix naming
atakavci Jul 9, 2025
13757f5
clean up and mark override methods
atakavci Jul 10, 2025
ef5d83a
fix link in javadoc
atakavci Jul 10, 2025
a15fc64
fix formatting
atakavci Jul 10, 2025
cf38240
- fix double registered listeners in healtstatusmgr
atakavci Jul 14, 2025
c2fb34c
Update src/main/java/redis/clients/jedis/mcf/EchoStrategy.java
atakavci Jul 16, 2025
ade866d
- add remove endpoints
atakavci Jul 11, 2025
ca3378d
- replace cluster disabled with failbackCandidate
atakavci Jul 15, 2025
ddcec73
- remove failback candidate
atakavci Jul 16, 2025
c1b6d5f
- fix remove logic
atakavci Jul 16, 2025
ff16330
- periodic failback checks
atakavci Jul 17, 2025
c39fda1
- introduce StatusTracker with purpose of waiting initial healthcheck…
atakavci Jul 19, 2025
975ab78
- introduce forceActiveCluster by duration
atakavci Jul 19, 2025
405101e
- fix failing tests by waiting on clusters to get healthy
atakavci Jul 23, 2025
607c66d
- fix failing scenario test
atakavci Jul 23, 2025
aaac8f7
- adressing reviews and feedback
atakavci Jul 23, 2025
2ffffef
- fix formatting
atakavci Jul 23, 2025
e6e1121
- fix formatting
atakavci Jul 23, 2025
b8d4e87
- get rid of the queue and event ordering for healthstatus change in …
atakavci Jul 24, 2025
1ae7219
- replace use of reflection with helper methods
atakavci Jul 24, 2025
a751a84
Merge branch 'feature/automatic-failover' into ali/aa-failover-health…
atakavci Aug 11, 2025
004a022
- replace 'sleep' with 'await', feedback from @a-TODO-rov
atakavci Aug 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.15</version>
<version>1.2.12</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ public static interface StrategySupplier {
private long gracePeriod;

public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");
for (ClusterConfig clusterConfig : clusterConfigs) {
if (clusterConfig == null)
throw new IllegalArgumentException("ClusterClientConfigs must not contain null elements");
}
this.clusterConfigs = clusterConfigs;
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/mcf/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class HealthCheck {
this.endpoint = endpoint;
this.strategy = strategy;
this.statusChangeCallback = statusChangeCallback;
statusRef.set(new SimpleEntry<>(0L, HealthStatus.HEALTHY));
statusRef.set(new SimpleEntry<>(0L, HealthStatus.UNKNOWN));
}

public Endpoint getEndpoint() {
Expand Down Expand Up @@ -92,9 +92,9 @@ private void safeUpdate(long owner, HealthStatus status) {
}
return current;
});
if (oldStatus.getKey() != owner || oldStatus.getValue() != status) {
if (oldStatus.getValue() != status) {
// notify listeners
notifyListeners(oldStatus.getValue(), newStatus.getValue());
notifyListeners(oldStatus.getValue(), status);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/mcf/HealthStatus.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package redis.clients.jedis.mcf;

public enum HealthStatus {
HEALTHY(0x01), UNHEALTHY(0x02);
UNKNOWN(0x00), HEALTHY(0x01), UNHEALTHY(0x02);

private final int value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public void removeAll(Endpoint[] endpoints) {

public HealthStatus getHealthStatus(Endpoint endpoint) {
HealthCheck healthCheck = healthChecks.get(endpoint);
return healthCheck != null ? healthCheck.getStatus() : HealthStatus.UNHEALTHY;
return healthCheck != null ? healthCheck.getStatus() : HealthStatus.UNKNOWN;
}

public boolean hasHealthCheck(Endpoint endpoint) {
return healthChecks.get(endpoint) != null;
}
}
77 changes: 77 additions & 0 deletions src/main/java/redis/clients/jedis/mcf/StatusTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package redis.clients.jedis.mcf;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisValidationException;

/**
* StatusTracker is responsible for tracking and waiting for health status changes for specific endpoints. It provides
* an event-driven approach to wait for health status transitions from UNKNOWN to either HEALTHY or UNHEALTHY.
*/
public class StatusTracker {

private final HealthStatusManager healthStatusManager;

public StatusTracker(HealthStatusManager healthStatusManager) {
this.healthStatusManager = healthStatusManager;
}

/**
* Waits for a specific endpoint's health status to be determined (not UNKNOWN). Uses event-driven approach with
* CountDownLatch to avoid polling.
* @param endpoint the endpoint to wait for
* @return the determined health status (HEALTHY or UNHEALTHY)
* @throws JedisConnectionException if interrupted while waiting
*/
public HealthStatus waitForHealthStatus(Endpoint endpoint) {
// First check if status is already determined
HealthStatus currentStatus = healthStatusManager.getHealthStatus(endpoint);
if (currentStatus != HealthStatus.UNKNOWN) {
return currentStatus;
}

// Set up event-driven waiting
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<HealthStatus> resultStatus = new AtomicReference<>();

// Create a temporary listener for this specific endpoint
HealthStatusListener tempListener = new HealthStatusListener() {
@Override
public void onStatusChange(HealthStatusChangeEvent event) {
if (event.getEndpoint().equals(endpoint) && event.getNewStatus() != HealthStatus.UNKNOWN) {
resultStatus.set(event.getNewStatus());
latch.countDown();
}
}
};

// Register the temporary listener
healthStatusManager.registerListener(endpoint, tempListener);

try {
// Double-check status after registering listener (race condition protection)
currentStatus = healthStatusManager.getHealthStatus(endpoint);
if (currentStatus != HealthStatus.UNKNOWN) {
return currentStatus;
}

// Wait for the health status change event
// just for safety to not block indefinitely
boolean completed = latch.await(60, TimeUnit.SECONDS);
if (!completed) {
throw new JedisValidationException("Timeout while waiting for health check result");
}
return resultStatus.get();

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JedisConnectionException("Interrupted while waiting for health check result", e);
} finally {
// Clean up: unregister the temporary listener
healthStatusManager.unregisterListener(endpoint, tempListener);
}
}
}
Loading
Loading