Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@
<include>src/main/java/redis/clients/jedis/annots/*.java</include>
<include>src/main/java/redis/clients/jedis/mcf/*.java</include>
<include>src/test/java/redis/clients/jedis/failover/*.java</include>
<include>src/test/java/redis/clients/jedis/mcf/EchoStrategyIntegrationTest.java</include>
<include>src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java</include>
<include>src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java</include>
<include>src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java</include>
<include>src/main/java/redis/clients/jedis/MultiClusterClientConfig.java</include>
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/redis/clients/jedis/mcf/EchoStrategy.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package redis.clients.jedis.mcf;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.MultiClusterClientConfig.StrategySupplier;

public class EchoStrategy implements HealthCheckStrategy {
private static final Logger log = LoggerFactory.getLogger(EchoStrategy.class);

private int interval;
private int timeout;
Expand Down Expand Up @@ -41,7 +44,12 @@ public int minConsecutiveSuccessCount() {

@Override
public HealthStatus doHealthCheck(Endpoint endpoint) {
return "HealthCheck".equals(jedis.echo("HealthCheck")) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY;
try {
return "HealthCheck".equals(jedis.echo("HealthCheck")) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY;
} catch (Exception e) {
log.error("Error while performing health check", e);
return HealthStatus.UNHEALTHY;
}
}

@Override
Expand Down
133 changes: 133 additions & 0 deletions src/test/java/redis/clients/jedis/mcf/EchoStrategyIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package redis.clients.jedis.mcf;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Tag;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.EndpointConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.HostAndPorts;
import redis.clients.jedis.JedisClientConfig;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.*;

@Tag("failover")
public class EchoStrategyIntegrationTest {

private static final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint("redis-failover-1");
private static final HostAndPort proxyHostAndPort = endpoint.getHostAndPort();
private static final ToxiproxyClient tp = new ToxiproxyClient("localhost", 8474);
private static Proxy redisProxy;

@BeforeAll
public static void setupProxy() throws IOException {
if (tp.getProxyOrNull("redis-health-test") != null) {
tp.getProxy("redis-health-test").delete();
}
redisProxy = tp.createProxy("redis-health-test", "0.0.0.0:29379", "redis-failover-1:9379");
}

@AfterAll
public static void cleanupProxy() throws IOException {
if (redisProxy != null) {
redisProxy.delete();
}
}

@BeforeEach
public void resetProxy() throws IOException {
redisProxy.enable();
redisProxy.toxics().getAll().forEach(toxic -> {
try {
toxic.remove();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Test
public void testEchoStrategyRecoversAfterDisconnect() throws Exception {
JedisClientConfig config = DefaultJedisClientConfig.builder().socketTimeoutMillis(1000)
.connectionTimeoutMillis(1000).build();

try (EchoStrategy strategy = new EchoStrategy(proxyHostAndPort, config, 1000, 500, 1)) {

// Initial health check should work
HealthStatus initialStatus = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.HEALTHY, initialStatus);

// Disable the proxy to simulate network failure
redisProxy.disable();

// Health check should now fail - this will expose the bug
HealthStatus statusAfterDisable = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.UNHEALTHY, statusAfterDisable);

// Re-enable proxy
redisProxy.enable();
// Health check should recover
HealthStatus statusAfterEnable = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.HEALTHY, statusAfterEnable);
}

}

@Test
public void testEchoStrategyWithConnectionTimeout() throws Exception {
JedisClientConfig config = DefaultJedisClientConfig.builder().socketTimeoutMillis(100)
.connectionTimeoutMillis(100).build();

try (EchoStrategy strategy = new EchoStrategy(proxyHostAndPort, config, 1000, 300, 1)) {

// Initial health check should work
assertEquals(HealthStatus.HEALTHY, strategy.doHealthCheck(proxyHostAndPort));

// Add latency toxic to simulate slow network
redisProxy.toxics().latency("slow-connection", ToxicDirection.DOWNSTREAM, 1000);

// Health check should timeout and return unhealthy
HealthStatus slowStatus = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.UNHEALTHY, slowStatus, "Health check should fail with high latency");

// Remove toxic
redisProxy.toxics().get("slow-connection").remove();

// Health check should recover
HealthStatus recoveredStatus = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.HEALTHY, recoveredStatus, "Health check should recover from high latency");
}
}

@Test
public void testConnectionDropDuringHealthCheck() throws Exception {
JedisClientConfig config = DefaultJedisClientConfig.builder().socketTimeoutMillis(2000).build();

try (EchoStrategy strategy = new EchoStrategy(proxyHostAndPort, config, 1000, 1500, 1)) {

// Initial health check
assertEquals(HealthStatus.HEALTHY, strategy.doHealthCheck(proxyHostAndPort));

// Simulate connection drop by limiting data transfer
redisProxy.toxics().limitData("connection-drop", ToxicDirection.UPSTREAM, 10);

// This should fail due to connection issues
HealthStatus droppedStatus = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.UNHEALTHY, droppedStatus);

// Remove toxic
redisProxy.toxics().get("connection-drop").remove();

// Health check should recover
HealthStatus afterRecovery = strategy.doHealthCheck(proxyHostAndPort);
assertEquals(HealthStatus.HEALTHY, afterRecovery);
}
}
}
44 changes: 42 additions & 2 deletions src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class HealthCheckTest {
@Mock
private HealthCheckStrategy mockStrategy;

private HealthCheckStrategy alwaysHealthyStrategy = new HealthCheckStrategy() {
private final HealthCheckStrategy alwaysHealthyStrategy = new HealthCheckStrategy() {
@Override
public int getInterval() {
return 100;
Expand Down Expand Up @@ -156,7 +156,7 @@ void testHealthCheckStop() {
HealthCheck healthCheck = new HealthCheck(testEndpoint, mockStrategy, mockCallback);
healthCheck.start();

assertDoesNotThrow(() -> healthCheck.stop());
assertDoesNotThrow(healthCheck::stop);
}

// ========== HealthStatusManager Tests ==========
Expand Down Expand Up @@ -370,6 +370,46 @@ void testClusterConfigHealthCheckEnabledExplicitly() {
}

// ========== Integration Tests ==========
@Test
@Timeout(5)
void testHealthCheckRecoversAfterException() throws InterruptedException {
// Create a mock strategy that alternates between healthy and throwing an exception
HealthCheckStrategy alternatingStrategy = new HealthCheckStrategy() {
volatile boolean isHealthy = true;

@Override
public int getInterval() {
return 1;
}

@Override
public int getTimeout() {
return 5;
}

@Override
public HealthStatus doHealthCheck(Endpoint endpoint) {
if (isHealthy) {
isHealthy = false;
throw new RuntimeException("Simulated exception");
} else {
isHealthy = true;
return HealthStatus.HEALTHY;
}
}
};

CountDownLatch statusChangeLatch = new CountDownLatch(2); // Wait for 2 status changes
HealthStatusListener listener = event -> statusChangeLatch.countDown();

HealthStatusManager manager = new HealthStatusManager();
manager.registerListener(listener);
manager.add(testEndpoint, alternatingStrategy);

assertTrue(statusChangeLatch.await(1, TimeUnit.SECONDS));

manager.remove(testEndpoint);
}

@Test
@Timeout(5)
Expand Down
Loading