Skip to content

Commit 6a10cb2

Browse files
committed
Fix: EchoStrategy does not recover on connection error
1 parent 34a8107 commit 6a10cb2

File tree

6 files changed

+225
-53
lines changed

6 files changed

+225
-53
lines changed

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,8 @@
340340
<include>src/main/java/redis/clients/jedis/annots/*.java</include>
341341
<include>src/main/java/redis/clients/jedis/mcf/*.java</include>
342342
<include>src/test/java/redis/clients/jedis/failover/*.java</include>
343+
<include>src/test/java/redis/clients/jedis/mcf/EchoStrategyIntegrationTest.java</include>
344+
<include>src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java</include>
343345
<include>src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java</include>
344346
<include>src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java</include>
345347
<include>src/main/java/redis/clients/jedis/MultiClusterClientConfig.java</include>
Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package redis.clients.jedis.mcf;
22

3+
import redis.clients.jedis.Connection;
34
import redis.clients.jedis.ConnectionFactory;
5+
import redis.clients.jedis.DefaultJedisSocketFactory;
46
import redis.clients.jedis.HostAndPort;
57
import redis.clients.jedis.JedisClientConfig;
68
import redis.clients.jedis.UnifiedJedis;
79
import redis.clients.jedis.exceptions.JedisConnectionException;
810
import redis.clients.jedis.MultiClusterClientConfig.StrategySupplier;
11+
import redis.clients.jedis.exceptions.JedisException;
912

1013
public class EchoStrategy implements HealthCheckStrategy {
1114

12-
private int interval;
13-
private int timeout;
14-
private UnifiedJedis jedis;
15+
private final int interval;
16+
private final int timeout;
17+
private final JedisClientConfig clientConfig;
18+
private final DefaultJedisSocketFactory socketFactory;
1519

1620
public EchoStrategy(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig) {
1721
this(hostAndPort, jedisClientConfig, 1000, 1000);
@@ -20,12 +24,8 @@ public EchoStrategy(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig
2024
public EchoStrategy(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig, int interval, int timeout) {
2125
this.interval = interval;
2226
this.timeout = timeout;
23-
ConnectionFactory connFactory = new ConnectionFactory(hostAndPort, jedisClientConfig);
24-
try {
25-
this.jedis = new UnifiedJedis(connFactory.makeObject().getObject());
26-
} catch (Exception e) {
27-
throw new JedisConnectionException("HealthCheck connection Failed!", e);
28-
}
27+
this.clientConfig = jedisClientConfig;
28+
this.socketFactory = new DefaultJedisSocketFactory(hostAndPort, clientConfig);
2929
}
3030

3131
@Override
@@ -40,11 +40,13 @@ public int getTimeout() {
4040

4141
@Override
4242
public HealthStatus doHealthCheck(Endpoint endpoint) {
43-
return "HealthCheck".equals(jedis.echo("HealthCheck")) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY;
43+
try (UnifiedJedis connection = new UnifiedJedis(socketFactory, clientConfig)) {
44+
return "HealthCheck".equals(connection.echo("HealthCheck")) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY;
45+
} catch (Exception e) {
46+
return HealthStatus.UNHEALTHY;
47+
}
4448
}
4549

46-
public static final StrategySupplier DEFAULT = (hostAndPort, jedisClientConfig) -> {
47-
return new EchoStrategy(hostAndPort, jedisClientConfig);
48-
};
50+
public static final StrategySupplier DEFAULT = EchoStrategy::new;
4951

5052
}

src/main/java/redis/clients/jedis/mcf/HealthCheck.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11

22
package redis.clients.jedis.mcf;
33

4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
47
import java.util.AbstractMap.SimpleEntry;
58
import java.util.concurrent.ExecutionException;
69
import java.util.concurrent.ExecutorService;
@@ -13,14 +16,15 @@
1316
import java.util.function.Consumer;
1417

1518
public class HealthCheck {
19+
private static final Logger logger = LoggerFactory.getLogger(HealthCheck.class);
1620

17-
private Endpoint endpoint;
18-
private HealthCheckStrategy strategy;
19-
private AtomicReference<SimpleEntry<Long, HealthStatus>> statusRef = new AtomicReference<SimpleEntry<Long, HealthStatus>>();
21+
private final Endpoint endpoint;
22+
private final HealthCheckStrategy strategy;
23+
private final AtomicReference<SimpleEntry<Long, HealthStatus>> statusRef = new AtomicReference<SimpleEntry<Long, HealthStatus>>();
2024
private Consumer<HealthStatusChangeEvent> statusChangeCallback;
2125

22-
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
23-
private ExecutorService executor = Executors.newCachedThreadPool();
26+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
27+
private final ExecutorService executor = Executors.newCachedThreadPool();
2428

2529
HealthCheck(Endpoint endpoint, HealthCheckStrategy strategy,
2630
Consumer<HealthStatusChangeEvent> statusChangeCallback) {
@@ -65,8 +69,13 @@ public void stop() {
6569
private void healthCheck() {
6670
long me = System.currentTimeMillis();
6771
Future<?> future = executor.submit(() -> {
68-
HealthStatus newStatus = strategy.doHealthCheck(endpoint);
69-
safeUpdate(me, newStatus);
72+
try {
73+
HealthStatus newStatus = strategy.doHealthCheck(endpoint);
74+
safeUpdate(me, newStatus);
75+
} catch (Exception e) {
76+
logger.error("Health check failed", e);
77+
safeUpdate(me, HealthStatus.UNHEALTHY);
78+
}
7079
});
7180

7281
try {
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package redis.clients.jedis.mcf;
2+
3+
import eu.rekawek.toxiproxy.Proxy;
4+
import eu.rekawek.toxiproxy.ToxiproxyClient;
5+
import eu.rekawek.toxiproxy.model.ToxicDirection;
6+
import org.junit.jupiter.api.AfterAll;
7+
import org.junit.jupiter.api.BeforeAll;
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.Tag;
11+
import redis.clients.jedis.DefaultJedisClientConfig;
12+
import redis.clients.jedis.EndpointConfig;
13+
import redis.clients.jedis.HostAndPort;
14+
import redis.clients.jedis.HostAndPorts;
15+
import redis.clients.jedis.JedisClientConfig;
16+
import redis.clients.jedis.UnifiedJedis;
17+
18+
import java.io.IOException;
19+
20+
import static org.junit.jupiter.api.Assertions.*;
21+
22+
@Tag("failover")
23+
public class EchoStrategyIntegrationTest {
24+
25+
private static final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint("redis-failover-1");
26+
private static final HostAndPort proxyHostAndPort = endpoint.getHostAndPort();
27+
private static final ToxiproxyClient tp = new ToxiproxyClient("localhost", 8474);
28+
private static Proxy redisProxy;
29+
30+
@BeforeAll
31+
public static void setupProxy() throws IOException {
32+
if (tp.getProxyOrNull("redis-health-test") != null) {
33+
tp.getProxy("redis-health-test").delete();
34+
}
35+
redisProxy = tp.createProxy("redis-health-test", "0.0.0.0:29379", "redis-failover-1:9379");
36+
}
37+
38+
@AfterAll
39+
public static void cleanupProxy() throws IOException {
40+
if (redisProxy != null) {
41+
redisProxy.delete();
42+
}
43+
}
44+
45+
@BeforeEach
46+
public void resetProxy() throws IOException {
47+
redisProxy.enable();
48+
redisProxy.toxics().getAll().forEach(toxic -> {
49+
try {
50+
toxic.remove();
51+
} catch (IOException e) {
52+
throw new RuntimeException(e);
53+
}
54+
});
55+
}
56+
57+
@Test
58+
public void testEchoStrategyRecoversAfterDisconnect() throws Exception {
59+
JedisClientConfig config = DefaultJedisClientConfig.builder().socketTimeoutMillis(1000)
60+
.connectionTimeoutMillis(1000).build();
61+
62+
UnifiedJedis jedis = new UnifiedJedis(proxyHostAndPort, config);
63+
jedis.ping();
64+
EchoStrategy strategy = new EchoStrategy(proxyHostAndPort, config, 1000, 500);
65+
66+
// Initial health check should work
67+
HealthStatus initialStatus = strategy.doHealthCheck(proxyHostAndPort);
68+
assertEquals(HealthStatus.HEALTHY, initialStatus);
69+
70+
// Disable the proxy to simulate network failure
71+
redisProxy.disable();
72+
73+
// Health check should now fail - this will expose the bug
74+
HealthStatus statusAfterDisable = strategy.doHealthCheck(proxyHostAndPort);
75+
assertEquals(HealthStatus.UNHEALTHY, statusAfterDisable);
76+
77+
// Re-enable proxy
78+
redisProxy.enable();
79+
// Health check should recover
80+
HealthStatus statusAfterEnable = strategy.doHealthCheck(proxyHostAndPort);
81+
assertEquals(HealthStatus.HEALTHY, statusAfterEnable);
82+
}
83+
84+
@Test
85+
public void testEchoStrategyWithConnectionTimeout() throws Exception {
86+
JedisClientConfig config = DefaultJedisClientConfig.builder().socketTimeoutMillis(100)
87+
.connectionTimeoutMillis(100).build();
88+
89+
EchoStrategy strategy = new EchoStrategy(proxyHostAndPort, config, 1000, 300);
90+
91+
// Initial health check should work
92+
assertEquals(HealthStatus.HEALTHY, strategy.doHealthCheck(proxyHostAndPort));
93+
94+
// Add latency toxic to simulate slow network
95+
redisProxy.toxics().latency("slow-connection", ToxicDirection.DOWNSTREAM, 1000);
96+
97+
// Health check should timeout and return unhealthy
98+
HealthStatus slowStatus = strategy.doHealthCheck(proxyHostAndPort);
99+
assertEquals(HealthStatus.UNHEALTHY, slowStatus, "Health check should fail with high latency");
100+
101+
// Remove toxic
102+
redisProxy.toxics().get("slow-connection").remove();
103+
104+
// Health check should recover
105+
HealthStatus recoveredStatus = strategy.doHealthCheck(proxyHostAndPort);
106+
assertEquals(HealthStatus.HEALTHY, recoveredStatus, "Health check should recover from high latency");
107+
}
108+
109+
@Test
110+
public void testConnectionDropDuringHealthCheck() throws Exception {
111+
JedisClientConfig config = DefaultJedisClientConfig.builder().socketTimeoutMillis(2000).build();
112+
113+
EchoStrategy strategy = new EchoStrategy(proxyHostAndPort, config, 1000, 1500);
114+
115+
// Initial health check
116+
assertEquals(HealthStatus.HEALTHY, strategy.doHealthCheck(proxyHostAndPort));
117+
118+
// Simulate connection drop by limiting data transfer
119+
redisProxy.toxics().limitData("connection-drop", ToxicDirection.UPSTREAM, 10);
120+
121+
// This should fail due to connection issues
122+
HealthStatus droppedStatus = strategy.doHealthCheck(proxyHostAndPort);
123+
assertEquals(HealthStatus.UNHEALTHY, droppedStatus);
124+
125+
// Remove toxic
126+
redisProxy.toxics().get("connection-drop").remove();
127+
128+
// Health check should recover
129+
HealthStatus afterRecovery = strategy.doHealthCheck(proxyHostAndPort);
130+
assertEquals(HealthStatus.HEALTHY, afterRecovery);
131+
132+
}
133+
}

src/test/java/redis/clients/jedis/mcf/HealthCheckIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private MultiClusterPooledConnectionProvider getMCCF(MultiClusterClientConfig.St
9393
Function<ClusterConfig.Builder, ClusterConfig.Builder> modifier = builder -> strategySupplier == null
9494
? builder.healthCheckEnabled(false)
9595
: builder.healthCheckStrategySupplier(strategySupplier);
96-
96+
9797
List<ClusterConfig> clusterConfigs = Arrays
9898
.stream(new EndpointConfig[] { endpoint1 }).map(e -> modifier
9999
.apply(MultiClusterClientConfig.ClusterConfig.builder(e.getHostAndPort(), clientConfig)).build())

0 commit comments

Comments
 (0)