diff --git a/pom.xml b/pom.xml index 38f3289acb..05e07a8d25 100644 --- a/pom.xml +++ b/pom.xml @@ -143,7 +143,7 @@ ch.qos.logback logback-classic - 1.3.15 + 1.2.12 test diff --git a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java index 8c94c591fd..a0708f8325 100644 --- a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java +++ b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java @@ -162,6 +162,12 @@ public static interface StrategySupplier { private long gracePeriod; public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) { + if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException( + "ClusterClientConfigs are required for MultiClusterPooledConnectionProvider"); + for (ClusterConfig clusterConfig : clusterConfigs) { + if (clusterConfig == null) + throw new IllegalArgumentException("ClusterClientConfigs must not contain null elements"); + } this.clusterConfigs = clusterConfigs; } diff --git a/src/main/java/redis/clients/jedis/mcf/HealthCheck.java b/src/main/java/redis/clients/jedis/mcf/HealthCheck.java index e23e0e5810..2f4cbd915f 100644 --- a/src/main/java/redis/clients/jedis/mcf/HealthCheck.java +++ b/src/main/java/redis/clients/jedis/mcf/HealthCheck.java @@ -27,7 +27,7 @@ public class HealthCheck { this.endpoint = endpoint; this.strategy = strategy; this.statusChangeCallback = statusChangeCallback; - statusRef.set(new SimpleEntry<>(0L, HealthStatus.HEALTHY)); + statusRef.set(new SimpleEntry<>(0L, HealthStatus.UNKNOWN)); } public Endpoint getEndpoint() { @@ -92,9 +92,9 @@ private void safeUpdate(long owner, HealthStatus status) { } return current; }); - if (oldStatus.getKey() != owner || oldStatus.getValue() != status) { + if (oldStatus.getValue() != status) { // notify listeners - notifyListeners(oldStatus.getValue(), newStatus.getValue()); + notifyListeners(oldStatus.getValue(), status); } } diff --git a/src/main/java/redis/clients/jedis/mcf/HealthStatus.java b/src/main/java/redis/clients/jedis/mcf/HealthStatus.java index 9e5ba4bbcd..2620f4131e 100644 --- a/src/main/java/redis/clients/jedis/mcf/HealthStatus.java +++ b/src/main/java/redis/clients/jedis/mcf/HealthStatus.java @@ -1,7 +1,7 @@ package redis.clients.jedis.mcf; public enum HealthStatus { - HEALTHY(0x01), UNHEALTHY(0x02); + UNKNOWN(0x00), HEALTHY(0x01), UNHEALTHY(0x02); private final int value; diff --git a/src/main/java/redis/clients/jedis/mcf/HealthStatusManager.java b/src/main/java/redis/clients/jedis/mcf/HealthStatusManager.java index 39d71a1e80..c3787e12a1 100644 --- a/src/main/java/redis/clients/jedis/mcf/HealthStatusManager.java +++ b/src/main/java/redis/clients/jedis/mcf/HealthStatusManager.java @@ -72,6 +72,10 @@ public void removeAll(Endpoint[] endpoints) { public HealthStatus getHealthStatus(Endpoint endpoint) { HealthCheck healthCheck = healthChecks.get(endpoint); - return healthCheck != null ? healthCheck.getStatus() : HealthStatus.UNHEALTHY; + return healthCheck != null ? healthCheck.getStatus() : HealthStatus.UNKNOWN; + } + + public boolean hasHealthCheck(Endpoint endpoint) { + return healthChecks.get(endpoint) != null; } } diff --git a/src/main/java/redis/clients/jedis/mcf/StatusTracker.java b/src/main/java/redis/clients/jedis/mcf/StatusTracker.java new file mode 100644 index 0000000000..ed3d4ee8de --- /dev/null +++ b/src/main/java/redis/clients/jedis/mcf/StatusTracker.java @@ -0,0 +1,77 @@ +package redis.clients.jedis.mcf; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisValidationException; + +/** + * StatusTracker is responsible for tracking and waiting for health status changes for specific endpoints. It provides + * an event-driven approach to wait for health status transitions from UNKNOWN to either HEALTHY or UNHEALTHY. + */ +public class StatusTracker { + + private final HealthStatusManager healthStatusManager; + + public StatusTracker(HealthStatusManager healthStatusManager) { + this.healthStatusManager = healthStatusManager; + } + + /** + * Waits for a specific endpoint's health status to be determined (not UNKNOWN). Uses event-driven approach with + * CountDownLatch to avoid polling. + * @param endpoint the endpoint to wait for + * @return the determined health status (HEALTHY or UNHEALTHY) + * @throws JedisConnectionException if interrupted while waiting + */ + public HealthStatus waitForHealthStatus(Endpoint endpoint) { + // First check if status is already determined + HealthStatus currentStatus = healthStatusManager.getHealthStatus(endpoint); + if (currentStatus != HealthStatus.UNKNOWN) { + return currentStatus; + } + + // Set up event-driven waiting + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference resultStatus = new AtomicReference<>(); + + // Create a temporary listener for this specific endpoint + HealthStatusListener tempListener = new HealthStatusListener() { + @Override + public void onStatusChange(HealthStatusChangeEvent event) { + if (event.getEndpoint().equals(endpoint) && event.getNewStatus() != HealthStatus.UNKNOWN) { + resultStatus.set(event.getNewStatus()); + latch.countDown(); + } + } + }; + + // Register the temporary listener + healthStatusManager.registerListener(endpoint, tempListener); + + try { + // Double-check status after registering listener (race condition protection) + currentStatus = healthStatusManager.getHealthStatus(endpoint); + if (currentStatus != HealthStatus.UNKNOWN) { + return currentStatus; + } + + // Wait for the health status change event + // just for safety to not block indefinitely + boolean completed = latch.await(60, TimeUnit.SECONDS); + if (!completed) { + throw new JedisValidationException("Timeout while waiting for health check result"); + } + return resultStatus.get(); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JedisConnectionException("Interrupted while waiting for health check result", e); + } finally { + // Clean up: unregister the temporary listener + healthStatusManager.unregisterListener(endpoint, tempListener); + } + } +} diff --git a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java index 31ffebaf07..2ee9ccb6fb 100644 --- a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java @@ -14,6 +14,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ScheduledExecutorService; @@ -37,6 +40,7 @@ import redis.clients.jedis.mcf.HealthStatus; import redis.clients.jedis.mcf.HealthStatusChangeEvent; import redis.clients.jedis.mcf.HealthStatusManager; +import redis.clients.jedis.mcf.StatusTracker; import redis.clients.jedis.MultiClusterClientConfig.StrategySupplier; import redis.clients.jedis.util.Pool; @@ -85,13 +89,19 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider private List> fallbackExceptionList; private HealthStatusManager healthStatusManager = new HealthStatusManager(); + private StatusTracker statusTracker; + + // Flag to control when handleHealthStatusChange should process events (only after initialization) + private volatile boolean initializationComplete = false; // Failback mechanism fields + private static final AtomicInteger failbackThreadCounter = new AtomicInteger(1); private final ScheduledExecutorService failbackScheduler = Executors.newSingleThreadScheduledExecutor(r -> { - Thread t = new Thread(r, "failback-scheduler"); + Thread t = new Thread(r, "jedis-failback-" + failbackThreadCounter.getAndIncrement()); t.setDaemon(true); return t; }); + // Store retry and circuit breaker configs for dynamic cluster addition/removal private RetryConfig retryConfig; private CircuitBreakerConfig circuitBreakerConfig; @@ -147,19 +157,23 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste ////////////// Configure Cluster Map //////////////////// ClusterConfig[] clusterConfigs = multiClusterClientConfig.getClusterConfigs(); + + // Now add clusters - health checks will start but events will be queued for (ClusterConfig config : clusterConfigs) { addClusterInternal(multiClusterClientConfig, config); } - // selecting activeCluster with configuration values. - // all health status would be HEALTHY at this point - activeCluster = findWeightedHealthyClusterToIterate().getValue(); + // Initialize StatusTracker for waiting on health check results + statusTracker = new StatusTracker(healthStatusManager); - for (Endpoint endpoint : multiClusterMap.keySet()) { - healthStatusManager.registerListener(endpoint, this::handleStatusChange); - } - /// --- /// + // Wait for initial health check results and select active cluster based on weights + activeCluster = waitForInitialHealthyCluster(); + // Mark initialization as complete - handleHealthStatusChange can now process events + initializationComplete = true; + if (!activeCluster.isHealthy()) { + activeCluster = waitForInitialHealthyCluster(); + } this.fallbackExceptionList = multiClusterClientConfig.getFallbackExceptionList(); // Start periodic failback checker @@ -188,7 +202,6 @@ public void add(ClusterConfig clusterConfig) { activeClusterIndexLock.lock(); try { addClusterInternal(multiClusterClientConfig, clusterConfig); - healthStatusManager.registerListener(endpoint, this::handleStatusChange); } finally { activeClusterIndexLock.unlock(); } @@ -231,7 +244,7 @@ public void remove(Endpoint endpoint) { } // Remove from health status manager first - healthStatusManager.unregisterListener(endpoint, this::handleStatusChange); + healthStatusManager.unregisterListener(endpoint, this::onHealthStatusChange); healthStatusManager.remove(endpoint); // Remove from cluster map @@ -252,6 +265,10 @@ public void remove(Endpoint endpoint) { * appropriate locks. */ private void addClusterInternal(MultiClusterClientConfig multiClusterClientConfig, ClusterConfig config) { + if (multiClusterMap.containsKey(config.getHostAndPort())) { + throw new JedisValidationException( + "Endpoint " + config.getHostAndPort() + " already exists in the provider"); + } GenericObjectPoolConfig poolConfig = config.getConnectionPoolConfig(); String clusterId = "cluster:" + config.getHostAndPort(); @@ -282,25 +299,30 @@ private void addClusterInternal(MultiClusterClientConfig multiClusterClientConfi StrategySupplier strategySupplier = config.getHealthCheckStrategySupplier(); if (strategySupplier != null) { HealthCheckStrategy hcs = strategySupplier.get(config.getHostAndPort(), config.getJedisClientConfig()); + // Register listeners BEFORE adding clusters to avoid missing events + healthStatusManager.registerListener(config.getHostAndPort(), this::onHealthStatusChange); healthStatusManager.add(config.getHostAndPort(), hcs); + } else { + cluster.setHealthStatus(HealthStatus.HEALTHY); } } - private void handleStatusChange(HealthStatusChangeEvent eventArgs) { - + /** + * Handles health status changes for clusters. This method is called by the health status manager when the health + * status of a cluster changes. + */ + @VisibleForTesting + void onHealthStatusChange(HealthStatusChangeEvent eventArgs) { Endpoint endpoint = eventArgs.getEndpoint(); HealthStatus newStatus = eventArgs.getNewStatus(); - log.info("Health status changed for {} from {} to {}", endpoint, eventArgs.getOldStatus(), newStatus); - + log.debug("Health status changed for {} from {} to {}", endpoint, eventArgs.getOldStatus(), newStatus); Cluster clusterWithHealthChange = multiClusterMap.get(endpoint); if (clusterWithHealthChange == null) return; clusterWithHealthChange.setHealthStatus(newStatus); - - if (!newStatus.isHealthy()) { - // Handle failover if this was the active cluster - if (clusterWithHealthChange == activeCluster) { + if (initializationComplete) { + if (!newStatus.isHealthy() && clusterWithHealthChange == activeCluster) { clusterWithHealthChange.setGracePeriod(); if (iterateActiveCluster() != null) { this.runClusterFailoverPostProcessor(activeCluster); @@ -310,38 +332,92 @@ private void handleStatusChange(HealthStatusChangeEvent eventArgs) { } /** - * Periodic failback checker - runs at configured intervals to check for failback opportunities + * Waits for initial health check results and selects the first healthy cluster based on weight priority. Blocks + * until at least one cluster becomes healthy or all clusters are determined to be unhealthy. + * @return the first healthy cluster found, ordered by weight (highest first) + * @throws JedisConnectionException if all clusters are unhealthy */ - private void periodicFailbackCheck() { - // Find the best candidate cluster for failback - Cluster bestCandidate = null; - float bestWeight = activeCluster.getWeight(); + private Cluster waitForInitialHealthyCluster() { + // Sort clusters by weight in descending order + List> sortedClusters = multiClusterMap.entrySet().stream() + .sorted(Map.Entry. comparingByValue(Comparator.comparing(Cluster::getWeight).reversed())) + .collect(Collectors.toList()); - for (Map.Entry entry : multiClusterMap.entrySet()) { + log.info("Selecting initial cluster from {} configured clusters", sortedClusters.size()); + + // Select cluster in weight order + for (Map.Entry entry : sortedClusters) { + Endpoint endpoint = entry.getKey(); Cluster cluster = entry.getValue(); - // Skip if this is already the active cluster - if (cluster == activeCluster) { - continue; - } + log.info("Evaluating cluster {} (weight: {})", endpoint, cluster.getWeight()); + + HealthStatus status; - // Skip if cluster is not healthy - if (!cluster.isHealthy()) { - continue; + // Check if health checks are enabled for this endpoint + if (healthStatusManager.hasHealthCheck(endpoint)) { + log.info("Health checks enabled for {}, waiting for result", endpoint); + // Wait for this cluster's health status to be determined + status = statusTracker.waitForHealthStatus(endpoint); + } else { + // No health check configured - assume healthy + log.info("No health check configured for cluster {}, defaulting to HEALTHY", endpoint); + status = HealthStatus.HEALTHY; } - // This cluster is a valid candidate - if (cluster.getWeight() > bestWeight) { - bestCandidate = cluster; - bestWeight = cluster.getWeight(); + cluster.setHealthStatus(status); + + if (status.isHealthy()) { + log.info("Found healthy cluster: {} (weight: {})", endpoint, cluster.getWeight()); + return cluster; + } else { + log.info("Cluster {} is unhealthy, trying next cluster", endpoint); } } - // Perform failback if we found a better candidate - if (bestCandidate != null) { - log.info("Performing failback from {} to {} (higher weight cluster available)", - activeCluster.getCircuitBreaker().getName(), bestCandidate.getCircuitBreaker().getName()); - setActiveCluster(bestCandidate, true); + // All clusters are unhealthy + throw new JedisConnectionException( + "All configured clusters are unhealthy. Cannot initialize MultiClusterPooledConnectionProvider."); + } + + /** + * Periodic failback checker - runs at configured intervals to check for failback opportunities + */ + @VisibleForTesting + void periodicFailbackCheck() { + try { + // Find the best candidate cluster for failback + Cluster bestCandidate = null; + float bestWeight = activeCluster.getWeight(); + + for (Map.Entry entry : multiClusterMap.entrySet()) { + Cluster cluster = entry.getValue(); + + // Skip if this is already the active cluster + if (cluster == activeCluster) { + continue; + } + + // Skip if cluster is not healthy + if (!cluster.isHealthy()) { + continue; + } + + // This cluster is a valid candidate + if (cluster.getWeight() > bestWeight) { + bestCandidate = cluster; + bestWeight = cluster.getWeight(); + } + } + + // Perform failback if we found a better candidate + if (bestCandidate != null) { + log.info("Performing failback from {} to {} (higher weight cluster available)", + activeCluster.getCircuitBreaker().getName(), bestCandidate.getCircuitBreaker().getName()); + setActiveCluster(bestCandidate, true); + } + } catch (Exception e) { + log.error("Error during periodic failback check", e); } } @@ -415,6 +491,21 @@ public void setActiveCluster(Endpoint endpoint) { setActiveCluster(cluster, true); } + public void forceActiveCluster(Endpoint endpoint, long forcedActiveDuration) { + Cluster cluster = multiClusterMap.get(endpoint); + cluster.clearGracePeriod(); + if (!cluster.isHealthy()) { + throw new JedisValidationException("Provided endpoint: " + endpoint + + " is not healthy. Please consider a healthy endpoint from the configuration"); + } + multiClusterMap.entrySet().stream().forEach(entry -> { + if (entry.getKey() != endpoint) { + entry.getValue().setGracePeriod(forcedActiveDuration); + } + }); + setActiveCluster(endpoint); + } + private boolean setActiveCluster(Cluster cluster, boolean validateConnection) { // Cluster cluster = clusterEntry.getValue(); // Field-level synchronization is used to avoid the edge case in which @@ -524,13 +615,14 @@ public static class Cluster { private final Retry retry; private final CircuitBreaker circuitBreaker; private final float weight; - // it starts its life with the assumption of being healthy - private HealthStatus healthStatus = HealthStatus.HEALTHY; + // it starts its life with unknown health status, waiting for initial health check + private HealthStatus healthStatus = HealthStatus.UNKNOWN; private MultiClusterClientConfig multiClusterClientConfig; private boolean disabled = false; // Grace period tracking private volatile long gracePeriodEndsAt = 0; + private final Logger log = LoggerFactory.getLogger(getClass()); public Cluster(ConnectionPool connectionPool, Retry retry, CircuitBreaker circuitBreaker, float weight, MultiClusterClientConfig multiClusterClientConfig) { @@ -574,6 +666,7 @@ public float getWeight() { public boolean isCBForcedOpen() { if (circuitBreaker.getState() == State.FORCED_OPEN && !isInGracePeriod()) { + log.info("Transitioning circuit breaker from FORCED_OPEN to CLOSED state due to end of grace period!"); circuitBreaker.transitionToClosedState(); } return circuitBreaker.getState() == CircuitBreaker.State.FORCED_OPEN; @@ -606,7 +699,17 @@ public boolean isInGracePeriod() { * Sets the grace period for this cluster */ public void setGracePeriod() { - gracePeriodEndsAt = System.currentTimeMillis() + multiClusterClientConfig.getGracePeriod(); + setGracePeriod(multiClusterClientConfig.getGracePeriod()); + } + + public void setGracePeriod(long gracePeriod) { + long endTime = System.currentTimeMillis() + gracePeriod; + if (endTime < gracePeriodEndsAt) return; + gracePeriodEndsAt = endTime; + } + + public void clearGracePeriod() { + gracePeriodEndsAt = 0; } /** diff --git a/src/test/java/redis/clients/jedis/mcf/FailbackMechanismIntegrationTest.java b/src/test/java/redis/clients/jedis/mcf/FailbackMechanismIntegrationTest.java index efe1f0c4ba..5a02f6dfeb 100644 --- a/src/test/java/redis/clients/jedis/mcf/FailbackMechanismIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/mcf/FailbackMechanismIntegrationTest.java @@ -1,8 +1,12 @@ package redis.clients.jedis.mcf; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; +import java.time.Duration; + +import org.awaitility.Durations; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -16,7 +20,7 @@ import redis.clients.jedis.JedisClientConfig; import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; -import java.lang.reflect.Method; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProviderHelper; @ExtendWith(MockitoExtension.class) class FailbackMechanismIntegrationTest { @@ -25,6 +29,7 @@ class FailbackMechanismIntegrationTest { private HostAndPort endpoint2; private HostAndPort endpoint3; private JedisClientConfig clientConfig; + private static Duration FIFTY_MILLISECONDS = Duration.ofMillis(50); @BeforeEach void setUp() { @@ -43,23 +48,6 @@ private MockedConstruction mockPool() { }); } - /** - * Helper method to trigger health status changes using reflection - */ - private void triggerHealthStatusChange(MultiClusterPooledConnectionProvider provider, HostAndPort endpoint, - HealthStatus oldStatus, HealthStatus newStatus) { - try { - Method handleStatusChangeMethod = MultiClusterPooledConnectionProvider.class - .getDeclaredMethod("handleStatusChange", HealthStatusChangeEvent.class); - handleStatusChangeMethod.setAccessible(true); - - HealthStatusChangeEvent event = new HealthStatusChangeEvent(endpoint, oldStatus, newStatus); - handleStatusChangeMethod.invoke(provider, event); - } catch (Exception e) { - throw new RuntimeException("Failed to trigger health status change", e); - } - } - @Test void testFailbackDisabledDoesNotPerformFailback() throws InterruptedException { try (MockedConstruction mockedPool = mockPool()) { @@ -81,19 +69,18 @@ void testFailbackDisabledDoesNotPerformFailback() throws InterruptedException { assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster2 unhealthy to force failover to cluster1 - triggerHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster1 (only healthy option) assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Make cluster2 healthy again (higher weight - would normally trigger failback) - triggerHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait longer than failback interval - Thread.sleep(200); - // Should still be on cluster1 since failback is disabled - assertEquals(provider.getCluster(endpoint1), provider.getCluster()); + await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint1) == provider.getCluster()); } } } @@ -121,19 +108,18 @@ void testFailbackToHigherWeightCluster() throws InterruptedException { assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Make cluster1 unhealthy to force failover to cluster2 - triggerHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster2 (lower weight, but only healthy option) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster1 healthy again - triggerHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait for failback check interval + some buffer - Thread.sleep(250); - // Should have failed back to cluster1 (higher weight) - assertEquals(provider.getCluster(endpoint1), provider.getCluster()); + await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint1) == provider.getCluster()); } } } @@ -163,20 +149,19 @@ void testNoFailbackToLowerWeightCluster() throws InterruptedException { assertEquals(provider.getCluster(endpoint3), provider.getCluster()); // Make cluster3 unhealthy to force failover to cluster2 (medium weight) - triggerHealthStatusChange(provider, endpoint3, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint3, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster2 (highest weight among healthy clusters) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster1 (lowest weight) healthy - this should NOT trigger failback // since we don't failback to lower weight clusters - triggerHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait for failback check interval - Thread.sleep(250); - // Should still be on cluster2 (no failback to lower weight cluster1) - assertEquals(provider.getCluster(endpoint2), provider.getCluster()); + await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint2) == provider.getCluster()); } } } @@ -199,19 +184,18 @@ void testFailbackToHigherWeightClusterImmediately() throws InterruptedException assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Make cluster1 unhealthy to force failover to cluster2 - triggerHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster2 (only healthy option) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster1 healthy again - triggerHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait for failback check - Thread.sleep(150); - // Should have failed back to cluster1 immediately (higher weight, no stability period required) - assertEquals(provider.getCluster(endpoint1), provider.getCluster()); + await().atMost(Durations.TWO_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint1) == provider.getCluster()); } } } @@ -234,25 +218,24 @@ void testUnhealthyClusterCancelsFailback() throws InterruptedException { assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Make cluster1 unhealthy to force failover to cluster2 - triggerHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster2 (only healthy option) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster1 healthy again (should trigger failback attempt) - triggerHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait a bit Thread.sleep(100); // Make cluster1 unhealthy again before failback completes - triggerHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Wait past the original failback interval - Thread.sleep(150); - // Should still be on cluster2 (failback was cancelled due to cluster1 becoming unhealthy) - assertEquals(provider.getCluster(endpoint2), provider.getCluster()); + await().atMost(Durations.TWO_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint2) == provider.getCluster()); } } } @@ -279,19 +262,18 @@ void testMultipleClusterFailbackPriority() throws InterruptedException { assertEquals(provider.getCluster(endpoint3), provider.getCluster()); // Make cluster3 unhealthy to force failover to cluster2 (next highest weight) - triggerHealthStatusChange(provider, endpoint3, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint3, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster2 (highest weight among healthy clusters) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster3 healthy again - triggerHealthStatusChange(provider, endpoint3, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint3, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait for failback - Thread.sleep(250); - // Should fail back to cluster3 (highest weight) - assertEquals(provider.getCluster(endpoint3), provider.getCluster()); + await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint3) == provider.getCluster()); } } } @@ -315,7 +297,7 @@ void testGracePeriodDisablesClusterOnUnhealthy() throws InterruptedException { assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Now make cluster2 unhealthy - it should be disabled for grace period - triggerHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should failover to cluster1 assertEquals(provider.getCluster(endpoint1), provider.getCluster()); @@ -346,7 +328,7 @@ void testGracePeriodReEnablesClusterAfterPeriod() throws InterruptedException { assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster2 unhealthy to start grace period and force failover - triggerHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should failover to cluster1 assertEquals(provider.getCluster(endpoint1), provider.getCluster()); @@ -355,22 +337,20 @@ void testGracePeriodReEnablesClusterAfterPeriod() throws InterruptedException { assertTrue(provider.getCluster(endpoint2).isInGracePeriod()); // Make cluster2 healthy again while it's still in grace period - triggerHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Should still be on cluster1 because cluster2 is in grace period assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Wait for grace period to expire - Thread.sleep(150); - // Cluster2 should no longer be in grace period - assertFalse(provider.getCluster(endpoint2).isInGracePeriod()); + await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> !provider.getCluster(endpoint2).isInGracePeriod()); // Wait for failback check to run - Thread.sleep(100); - // Should now failback to cluster2 (higher weight) since grace period has expired - assertEquals(provider.getCluster(endpoint2), provider.getCluster()); + await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).pollInterval(FIFTY_MILLISECONDS) + .until(() -> provider.getCluster(endpoint2) == provider.getCluster()); } } } diff --git a/src/test/java/redis/clients/jedis/mcf/HealthCheckIntegrationTest.java b/src/test/java/redis/clients/jedis/mcf/HealthCheckIntegrationTest.java index eeb1e9caa7..ed2cf8139e 100644 --- a/src/test/java/redis/clients/jedis/mcf/HealthCheckIntegrationTest.java +++ b/src/test/java/redis/clients/jedis/mcf/HealthCheckIntegrationTest.java @@ -72,7 +72,8 @@ public int getTimeout() { public HealthStatus doHealthCheck(Endpoint endpoint) { // Create connection per health check to avoid resource leak try (UnifiedJedis pinger = new UnifiedJedis(hostAndPort, jedisClientConfig)) { - return "OK".equals(pinger.ping()) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY; + String result = pinger.ping(); + return "PONG".equals(result) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY; } catch (Exception e) { return HealthStatus.UNHEALTHY; } diff --git a/src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java b/src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java index 023c1b62f2..bfbfd452fe 100644 --- a/src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java +++ b/src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java @@ -59,28 +59,9 @@ void setUp() { testConfig = DefaultJedisClientConfig.builder().build(); } - // ========== HealthStatus Tests ========== - - @Test - void testHealthStatusIsHealthy() { - assertTrue(HealthStatus.HEALTHY.isHealthy()); - assertFalse(HealthStatus.UNHEALTHY.isHealthy()); - } - - // ========== HostAndPort Tests ========== - @Test - void testHostAndPortEquality() { - HostAndPort endpoint1 = new HostAndPort("localhost", 6379); - HostAndPort endpoint2 = new HostAndPort("localhost", 6379); - HostAndPort endpoint3 = new HostAndPort("localhost", 6380); - - assertEquals(endpoint1, endpoint2); - assertNotEquals(endpoint1, endpoint3); - assertEquals(endpoint1.hashCode(), endpoint2.hashCode()); - } - // ========== HealthCheckCollection Tests ========== + @Test void testHealthCheckCollectionAdd() { HealthCheckCollection collection = new HealthCheckCollection(); HealthCheck healthCheck = new HealthCheck(testEndpoint, mockStrategy, mockCallback); @@ -155,7 +136,7 @@ void testHealthCheckStatusUpdate() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Consumer callback = event -> { - assertEquals(HealthStatus.HEALTHY, event.getOldStatus()); + assertEquals(HealthStatus.UNKNOWN, event.getOldStatus()); assertEquals(HealthStatus.UNHEALTHY, event.getNewStatus()); latch.countDown(); }; @@ -238,17 +219,35 @@ void testHealthStatusManagerEndpointSpecificListener() { } @Test - void testHealthStatusManagerLifecycle() { + void testHealthStatusManagerLifecycle() throws InterruptedException { HealthStatusManager manager = new HealthStatusManager(); // Before adding health check - assertEquals(HealthStatus.UNHEALTHY, manager.getHealthStatus(testEndpoint)); + assertEquals(HealthStatus.UNKNOWN, manager.getHealthStatus(testEndpoint)); + + // Set up event listener to wait for initial health check completion + CountDownLatch healthCheckCompleteLatch = new CountDownLatch(1); + HealthStatusListener listener = event -> healthCheckCompleteLatch.countDown(); + + // Register listener before adding health check to capture the initial event + manager.registerListener(testEndpoint, listener); + // Add health check - this will start async health checking manager.add(testEndpoint, alwaysHealthyStrategy); + + // Initially should still be UNKNOWN until first check completes + assertEquals(HealthStatus.UNKNOWN, manager.getHealthStatus(testEndpoint)); + + // Wait for initial health check to complete + assertTrue(healthCheckCompleteLatch.await(2, TimeUnit.SECONDS), + "Initial health check should complete within timeout"); + + // Now should be HEALTHY after initial check assertEquals(HealthStatus.HEALTHY, manager.getHealthStatus(testEndpoint)); + // Clean up and verify removal manager.remove(testEndpoint); - assertEquals(HealthStatus.UNHEALTHY, manager.getHealthStatus(testEndpoint)); + assertEquals(HealthStatus.UNKNOWN, manager.getHealthStatus(testEndpoint)); } // ========== EchoStrategy Tests ========== @@ -275,14 +274,11 @@ void testEchoStrategyDefaultSupplier() { void testNewFieldLocations() { // Test new field locations in ClusterConfig and MultiClusterClientConfig MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .weight(2.5f) - .build(); + .builder(testEndpoint, testConfig).weight(2.5f).build(); - MultiClusterClientConfig multiConfig = new MultiClusterClientConfig.Builder(new MultiClusterClientConfig.ClusterConfig[]{clusterConfig}) - .retryOnFailover(true) - .failbackSupported(false) - .build(); + MultiClusterClientConfig multiConfig = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { clusterConfig }).retryOnFailover(true) + .failbackSupported(false).build(); assertEquals(2.5f, clusterConfig.getWeight()); assertTrue(multiConfig.isRetryOnFailover()); @@ -293,15 +289,15 @@ void testNewFieldLocations() { void testDefaultValues() { // Test default values in ClusterConfig MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .build(); + .builder(testEndpoint, testConfig).build(); assertEquals(1.0f, clusterConfig.getWeight()); // Default weight - assertEquals(EchoStrategy.DEFAULT, clusterConfig.getHealthCheckStrategySupplier()); // Default is null (no health check) + assertEquals(EchoStrategy.DEFAULT, clusterConfig.getHealthCheckStrategySupplier()); // Default is null (no + // health check) // Test default values in MultiClusterClientConfig - MultiClusterClientConfig multiConfig = new MultiClusterClientConfig.Builder(new MultiClusterClientConfig.ClusterConfig[]{clusterConfig}) - .build(); + MultiClusterClientConfig multiConfig = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { clusterConfig }).build(); assertFalse(multiConfig.isRetryOnFailover()); // Default is false assertTrue(multiConfig.isFailbackSupported()); // Default is true @@ -314,9 +310,7 @@ void testClusterConfigWithHealthCheckStrategy() { MultiClusterClientConfig.StrategySupplier supplier = (hostAndPort, jedisClientConfig) -> customStrategy; MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .healthCheckStrategySupplier(supplier) - .build(); + .builder(testEndpoint, testConfig).healthCheckStrategySupplier(supplier).build(); assertNotNull(clusterConfig.getHealthCheckStrategySupplier()); HealthCheckStrategy result = clusterConfig.getHealthCheckStrategySupplier().get(testEndpoint, testConfig); @@ -330,9 +324,7 @@ void testClusterConfigWithStrategySupplier() { }; MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .healthCheckStrategySupplier(customSupplier) - .build(); + .builder(testEndpoint, testConfig).healthCheckStrategySupplier(customSupplier).build(); assertEquals(customSupplier, clusterConfig.getHealthCheckStrategySupplier()); } @@ -344,9 +336,7 @@ void testClusterConfigWithEchoStrategy() { }; MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .healthCheckStrategySupplier(echoSupplier) - .build(); + .builder(testEndpoint, testConfig).healthCheckStrategySupplier(echoSupplier).build(); MultiClusterClientConfig.StrategySupplier supplier = clusterConfig.getHealthCheckStrategySupplier(); assertNotNull(supplier); @@ -356,8 +346,7 @@ void testClusterConfigWithEchoStrategy() { @Test void testClusterConfigWithDefaultHealthCheck() { MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .build(); // Should use default EchoStrategy + .builder(testEndpoint, testConfig).build(); // Should use default EchoStrategy assertNotNull(clusterConfig.getHealthCheckStrategySupplier()); assertEquals(EchoStrategy.DEFAULT, clusterConfig.getHealthCheckStrategySupplier()); @@ -366,9 +355,7 @@ void testClusterConfigWithDefaultHealthCheck() { @Test void testClusterConfigWithDisabledHealthCheck() { MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .healthCheckEnabled(false) - .build(); + .builder(testEndpoint, testConfig).healthCheckEnabled(false).build(); assertNull(clusterConfig.getHealthCheckStrategySupplier()); } @@ -376,9 +363,7 @@ void testClusterConfigWithDisabledHealthCheck() { @Test void testClusterConfigHealthCheckEnabledExplicitly() { MultiClusterClientConfig.ClusterConfig clusterConfig = MultiClusterClientConfig.ClusterConfig - .builder(testEndpoint, testConfig) - .healthCheckEnabled(true) - .build(); + .builder(testEndpoint, testConfig).healthCheckEnabled(true).build(); assertNotNull(clusterConfig.getHealthCheckStrategySupplier()); assertEquals(EchoStrategy.DEFAULT, clusterConfig.getHealthCheckStrategySupplier()); diff --git a/src/test/java/redis/clients/jedis/mcf/MultiClusterInitializationTest.java b/src/test/java/redis/clients/jedis/mcf/MultiClusterInitializationTest.java new file mode 100644 index 0000000000..3c42aefea6 --- /dev/null +++ b/src/test/java/redis/clients/jedis/mcf/MultiClusterInitializationTest.java @@ -0,0 +1,175 @@ +package redis.clients.jedis.mcf; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +import redis.clients.jedis.Connection; +import redis.clients.jedis.ConnectionPool; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisClientConfig; +import redis.clients.jedis.MultiClusterClientConfig; +import redis.clients.jedis.exceptions.JedisValidationException; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; + +/** + * Tests for MultiClusterPooledConnectionProvider initialization edge cases + */ +@ExtendWith(MockitoExtension.class) +public class MultiClusterInitializationTest { + + private HostAndPort endpoint1; + private HostAndPort endpoint2; + private HostAndPort endpoint3; + private JedisClientConfig clientConfig; + + @BeforeEach + void setUp() { + endpoint1 = new HostAndPort("localhost", 6379); + endpoint2 = new HostAndPort("localhost", 6380); + endpoint3 = new HostAndPort("localhost", 6381); + clientConfig = DefaultJedisClientConfig.builder().build(); + } + + private MockedConstruction mockPool() { + Connection mockConnection = mock(Connection.class); + lenient().when(mockConnection.ping()).thenReturn(true); + return mockConstruction(ConnectionPool.class, (mock, context) -> { + when(mock.getResource()).thenReturn(mockConnection); + doNothing().when(mock).close(); + }); + } + + @Test + void testInitializationWithMixedHealthCheckConfiguration() { + try (MockedConstruction mockedPool = mockPool()) { + // Create clusters with mixed health check configuration + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig) + .weight(1.0f) + .healthCheckEnabled(false) // No health check + .build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig) + .weight(2.0f) + .healthCheckStrategySupplier(EchoStrategy.DEFAULT) // With health check + .build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }) + .build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Should initialize successfully + assertNotNull(provider.getCluster()); + + // Should select cluster1 (no health check, assumed healthy) or cluster2 based on weight + // Since cluster2 has higher weight and health checks, it should be selected if healthy + assertTrue(provider.getCluster() == provider.getCluster(endpoint1) || + provider.getCluster() == provider.getCluster(endpoint2)); + } + } + } + + @Test + void testInitializationWithAllHealthChecksDisabled() { + try (MockedConstruction mockedPool = mockPool()) { + // Create clusters with no health checks + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig) + .weight(1.0f) + .healthCheckEnabled(false) + .build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig) + .weight(3.0f) // Higher weight + .healthCheckEnabled(false) + .build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }) + .build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Should select cluster2 (highest weight, no health checks) + assertEquals(provider.getCluster(endpoint2), provider.getCluster()); + } + } + } + + @Test + void testInitializationWithSingleCluster() { + try (MockedConstruction mockedPool = mockPool()) { + MultiClusterClientConfig.ClusterConfig cluster = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig) + .weight(1.0f) + .healthCheckEnabled(false) + .build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster }) + .build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Should select the only available cluster + assertEquals(provider.getCluster(endpoint1), provider.getCluster()); + } + } + } + + @Test + void testErrorHandlingWithNullConfiguration() { + assertThrows(JedisValidationException.class, () -> { + new MultiClusterPooledConnectionProvider(null); + }); + } + + @Test + void testErrorHandlingWithEmptyClusterArray() { + assertThrows(JedisValidationException.class, () -> { + new MultiClusterClientConfig.Builder(new MultiClusterClientConfig.ClusterConfig[0]).build(); + }); + } + + @Test + void testErrorHandlingWithNullClusterConfig() { + assertThrows(IllegalArgumentException.class, () -> { + new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { null }).build(); + }); + } + + @Test + void testInitializationWithZeroWeights() { + try (MockedConstruction mockedPool = mockPool()) { + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig) + .weight(0.0f) // Zero weight + .healthCheckEnabled(false) + .build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig) + .weight(0.0f) // Zero weight + .healthCheckEnabled(false) + .build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }) + .build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Should still initialize and select one of the clusters + assertNotNull(provider.getCluster()); + } + } + } +} diff --git a/src/test/java/redis/clients/jedis/mcf/PeriodicFailbackTest.java b/src/test/java/redis/clients/jedis/mcf/PeriodicFailbackTest.java index 0d61aa9fa8..43d336fe8a 100644 --- a/src/test/java/redis/clients/jedis/mcf/PeriodicFailbackTest.java +++ b/src/test/java/redis/clients/jedis/mcf/PeriodicFailbackTest.java @@ -3,12 +3,9 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; -import java.lang.reflect.Method; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; import org.mockito.MockedConstruction; import org.mockito.junit.jupiter.MockitoExtension; @@ -19,7 +16,7 @@ import redis.clients.jedis.JedisClientConfig; import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; -import redis.clients.jedis.util.Pool; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProviderHelper; @ExtendWith(MockitoExtension.class) class PeriodicFailbackTest { @@ -44,38 +41,6 @@ private MockedConstruction mockPool() { }); } - /** - * Helper method to trigger health status changes using reflection - */ - private void triggerHealthStatusChange(MultiClusterPooledConnectionProvider provider, HostAndPort endpoint, - HealthStatus oldStatus, HealthStatus newStatus) { - try { - Method handleStatusChangeMethod = MultiClusterPooledConnectionProvider.class - .getDeclaredMethod("handleStatusChange", HealthStatusChangeEvent.class); - handleStatusChangeMethod.setAccessible(true); - - HealthStatusChangeEvent event = new HealthStatusChangeEvent(endpoint, oldStatus, newStatus); - handleStatusChangeMethod.invoke(provider, event); - } catch (Exception e) { - throw new RuntimeException("Failed to trigger health status change", e); - } - } - - /** - * Helper method to trigger periodic failback check using reflection - */ - private void triggerPeriodicFailbackCheck(MultiClusterPooledConnectionProvider provider) { - try { - Method periodicFailbackCheckMethod = MultiClusterPooledConnectionProvider.class - .getDeclaredMethod("periodicFailbackCheck"); - periodicFailbackCheckMethod.setAccessible(true); - - periodicFailbackCheckMethod.invoke(provider); - } catch (Exception e) { - throw new RuntimeException("Failed to trigger periodic failback check", e); - } - } - @Test void testPeriodicFailbackCheckWithDisabledCluster() throws InterruptedException { try (MockedConstruction mockedPool = mockPool()) { @@ -101,7 +66,7 @@ void testPeriodicFailbackCheckWithDisabledCluster() throws InterruptedException provider.iterateActiveCluster(); // Manually trigger periodic check - triggerPeriodicFailbackCheck(provider); + MultiClusterPooledConnectionProviderHelper.periodicFailbackCheck(provider); // Should still be on cluster1 (cluster2 is in grace period) assertEquals(provider.getCluster(endpoint1), provider.getCluster()); @@ -127,7 +92,7 @@ void testPeriodicFailbackCheckWithHealthyCluster() throws InterruptedException { assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster2 unhealthy to force failover to cluster1 - triggerHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster1 (cluster2 is in grace period) assertEquals(provider.getCluster(endpoint1), provider.getCluster()); @@ -136,17 +101,17 @@ void testPeriodicFailbackCheckWithHealthyCluster() throws InterruptedException { assertTrue(provider.getCluster(endpoint2).isInGracePeriod()); // Make cluster2 healthy again (but it's still in grace period) - triggerHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Trigger periodic check immediately - should still be on cluster1 - triggerPeriodicFailbackCheck(provider); + MultiClusterPooledConnectionProviderHelper.periodicFailbackCheck(provider); assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Wait for grace period to expire Thread.sleep(150); // Trigger periodic check after grace period expires - triggerPeriodicFailbackCheck(provider); + MultiClusterPooledConnectionProviderHelper.periodicFailbackCheck(provider); // Should have failed back to cluster2 (higher weight, grace period expired) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); @@ -172,19 +137,19 @@ void testPeriodicFailbackCheckWithFailbackDisabled() throws InterruptedException assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster2 unhealthy to force failover to cluster1 - triggerHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster1 assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Make cluster2 healthy again - triggerHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait for stability period Thread.sleep(100); // Trigger periodic check - triggerPeriodicFailbackCheck(provider); + MultiClusterPooledConnectionProviderHelper.periodicFailbackCheck(provider); // Should still be on cluster1 (failback disabled) assertEquals(provider.getCluster(endpoint1), provider.getCluster()); @@ -216,26 +181,26 @@ void testPeriodicFailbackCheckSelectsHighestWeightCluster() throws InterruptedEx assertEquals(provider.getCluster(endpoint3), provider.getCluster()); // Make cluster3 unhealthy to force failover to cluster2 (next highest weight) - triggerHealthStatusChange(provider, endpoint3, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint3, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster2 (weight 2.0f, higher than cluster1's 1.0f) assertEquals(provider.getCluster(endpoint2), provider.getCluster()); // Make cluster2 unhealthy to force failover to cluster1 - triggerHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); // Should now be on cluster1 (only healthy cluster left) assertEquals(provider.getCluster(endpoint1), provider.getCluster()); // Make cluster2 and cluster3 healthy again - triggerHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); - triggerHealthStatusChange(provider, endpoint3, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint3, HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); // Wait for grace period to expire Thread.sleep(150); // Trigger periodic check - triggerPeriodicFailbackCheck(provider); + MultiClusterPooledConnectionProviderHelper.periodicFailbackCheck(provider); // Should have failed back to cluster3 (highest weight, grace period expired) assertEquals(provider.getCluster(endpoint3), provider.getCluster()); diff --git a/src/test/java/redis/clients/jedis/mcf/StatusTrackerTest.java b/src/test/java/redis/clients/jedis/mcf/StatusTrackerTest.java new file mode 100644 index 0000000000..3a97267c96 --- /dev/null +++ b/src/test/java/redis/clients/jedis/mcf/StatusTrackerTest.java @@ -0,0 +1,243 @@ +package redis.clients.jedis.mcf; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import redis.clients.jedis.HostAndPort; + +public class StatusTrackerTest { + + @Mock + private HealthStatusManager mockHealthStatusManager; + + private StatusTracker statusTracker; + private HostAndPort testEndpoint; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + statusTracker = new StatusTracker(mockHealthStatusManager); + testEndpoint = new HostAndPort("localhost", 6379); + } + + @Test + void testWaitForHealthStatus_AlreadyDetermined() { + // Given: Health status is already HEALTHY + when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.HEALTHY); + + // When: Waiting for health status + HealthStatus result = statusTracker.waitForHealthStatus(testEndpoint); + + // Then: Should return immediately without waiting + assertEquals(HealthStatus.HEALTHY, result); + verify(mockHealthStatusManager, never()).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + } + + @Test + void testWaitForHealthStatus_EventDriven() throws InterruptedException { + // Given: Health status is initially UNKNOWN + when(mockHealthStatusManager.getHealthStatus(testEndpoint)) + .thenReturn(HealthStatus.UNKNOWN) // First call + .thenReturn(HealthStatus.UNKNOWN); // Second call after registering listener + + // Capture the registered listener + final HealthStatusListener[] capturedListener = new HealthStatusListener[1]; + doAnswer(invocation -> { + capturedListener[0] = invocation.getArgument(1); + return null; + }).when(mockHealthStatusManager).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + + // When: Start waiting in a separate thread + CountDownLatch testLatch = new CountDownLatch(1); + final HealthStatus[] result = new HealthStatus[1]; + + Thread waitingThread = new Thread(() -> { + result[0] = statusTracker.waitForHealthStatus(testEndpoint); + testLatch.countDown(); + }); + waitingThread.start(); + + // Give some time for the listener to be registered + Thread.sleep(50); + + // Simulate health status change event + assertNotNull(capturedListener[0], "Listener should have been registered"); + HealthStatusChangeEvent event = new HealthStatusChangeEvent(testEndpoint, HealthStatus.UNKNOWN, HealthStatus.HEALTHY); + capturedListener[0].onStatusChange(event); + + // Then: Should complete and return the new status + assertTrue(testLatch.await(1, TimeUnit.SECONDS), "Should complete within timeout"); + assertEquals(HealthStatus.HEALTHY, result[0]); + + // Verify cleanup + verify(mockHealthStatusManager).unregisterListener(eq(testEndpoint), eq(capturedListener[0])); + } + + @Test + void testWaitForHealthStatus_IgnoresUnknownStatus() throws InterruptedException { + // Given: Health status is initially UNKNOWN + when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN); + + // Capture the registered listener + final HealthStatusListener[] capturedListener = new HealthStatusListener[1]; + doAnswer(invocation -> { + capturedListener[0] = invocation.getArgument(1); + return null; + }).when(mockHealthStatusManager).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + + // When: Start waiting in a separate thread + CountDownLatch testLatch = new CountDownLatch(1); + final HealthStatus[] result = new HealthStatus[1]; + + Thread waitingThread = new Thread(() -> { + result[0] = statusTracker.waitForHealthStatus(testEndpoint); + testLatch.countDown(); + }); + waitingThread.start(); + + // Give some time for the listener to be registered + Thread.sleep(50); + + // Simulate UNKNOWN status change (should be ignored) + assertNotNull(capturedListener[0], "Listener should have been registered"); + HealthStatusChangeEvent unknownEvent = new HealthStatusChangeEvent(testEndpoint, HealthStatus.UNKNOWN, HealthStatus.UNKNOWN); + capturedListener[0].onStatusChange(unknownEvent); + + // Should not complete yet + assertFalse(testLatch.await(100, TimeUnit.MILLISECONDS), "Should not complete with UNKNOWN status"); + + // Now send a real status change + HealthStatusChangeEvent realEvent = new HealthStatusChangeEvent(testEndpoint, HealthStatus.UNKNOWN, HealthStatus.UNHEALTHY); + capturedListener[0].onStatusChange(realEvent); + + // Then: Should complete now + assertTrue(testLatch.await(1, TimeUnit.SECONDS), "Should complete with real status"); + assertEquals(HealthStatus.UNHEALTHY, result[0]); + } + + @Test + void testWaitForHealthStatus_IgnoresOtherEndpoints() throws InterruptedException { + // Given: Health status is initially UNKNOWN + when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN); + HostAndPort otherEndpoint = new HostAndPort("other", 6379); + + // Capture the registered listener + final HealthStatusListener[] capturedListener = new HealthStatusListener[1]; + doAnswer(invocation -> { + capturedListener[0] = invocation.getArgument(1); + return null; + }).when(mockHealthStatusManager).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + + // When: Start waiting in a separate thread + CountDownLatch testLatch = new CountDownLatch(1); + final HealthStatus[] result = new HealthStatus[1]; + + Thread waitingThread = new Thread(() -> { + result[0] = statusTracker.waitForHealthStatus(testEndpoint); + testLatch.countDown(); + }); + waitingThread.start(); + + // Give some time for the listener to be registered + Thread.sleep(50); + + // Simulate status change for different endpoint (should be ignored) + assertNotNull(capturedListener[0], "Listener should have been registered"); + HealthStatusChangeEvent otherEvent = new HealthStatusChangeEvent(otherEndpoint, HealthStatus.UNKNOWN, HealthStatus.HEALTHY); + capturedListener[0].onStatusChange(otherEvent); + + // Should not complete yet + assertFalse(testLatch.await(100, TimeUnit.MILLISECONDS), "Should not complete with other endpoint"); + + // Now send event for correct endpoint + HealthStatusChangeEvent correctEvent = new HealthStatusChangeEvent(testEndpoint, HealthStatus.UNKNOWN, HealthStatus.HEALTHY); + capturedListener[0].onStatusChange(correctEvent); + + // Then: Should complete now + assertTrue(testLatch.await(1, TimeUnit.SECONDS), "Should complete with correct endpoint"); + assertEquals(HealthStatus.HEALTHY, result[0]); + } + + @Test + void testWaitForHealthStatus_InterruptHandling() { + // Given: Health status is initially UNKNOWN and will stay that way + when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN); + + // When: Interrupt the waiting thread + Thread testThread = new Thread(() -> { + try { + statusTracker.waitForHealthStatus(testEndpoint); + fail("Should have thrown JedisConnectionException due to interrupt"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Interrupted while waiting")); + assertTrue(Thread.currentThread().isInterrupted()); + } + }); + + testThread.start(); + + // Give thread time to start waiting + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Interrupt the waiting thread + testThread.interrupt(); + + try { + testThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + assertFalse(testThread.isAlive(), "Test thread should have completed"); + } + + @Test + void testWaitForHealthStatus_RaceConditionProtection() { + // Given: Health status changes between first check and listener registration + when(mockHealthStatusManager.getHealthStatus(testEndpoint)) + .thenReturn(HealthStatus.UNKNOWN) // First call + .thenReturn(HealthStatus.HEALTHY); // Second call after registering listener + + // When: Waiting for health status + HealthStatus result = statusTracker.waitForHealthStatus(testEndpoint); + + // Then: Should return the status from the second check without waiting + assertEquals(HealthStatus.HEALTHY, result); + + // Verify listener was registered and unregistered + verify(mockHealthStatusManager).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + verify(mockHealthStatusManager).unregisterListener(eq(testEndpoint), any(HealthStatusListener.class)); + } + + @Test + void testWaitForHealthStatus_ListenerCleanupOnException() { + // Given: Health status is initially UNKNOWN + when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN); + + // Mock registerListener to throw an exception + doThrow(new RuntimeException("Registration failed")) + .when(mockHealthStatusManager).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + + // When: Waiting for health status + assertThrows(RuntimeException.class, () -> { + statusTracker.waitForHealthStatus(testEndpoint); + }); + + // Then: Should still attempt to unregister (cleanup in finally block) + verify(mockHealthStatusManager).registerListener(eq(testEndpoint), any(HealthStatusListener.class)); + // Note: unregisterListener might not be called if registerListener fails, + // but the finally block should handle this gracefully + } +} diff --git a/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderHelper.java b/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderHelper.java new file mode 100644 index 0000000000..138d524ec0 --- /dev/null +++ b/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderHelper.java @@ -0,0 +1,17 @@ +package redis.clients.jedis.providers; + +import redis.clients.jedis.mcf.Endpoint; +import redis.clients.jedis.mcf.HealthStatus; +import redis.clients.jedis.mcf.HealthStatusChangeEvent; + +public class MultiClusterPooledConnectionProviderHelper { + + public static void onHealthStatusChange(MultiClusterPooledConnectionProvider provider, Endpoint endpoint, + HealthStatus oldStatus, HealthStatus newStatus) { + provider.onHealthStatusChange(new HealthStatusChangeEvent(endpoint, oldStatus, newStatus)); + } + + public static void periodicFailbackCheck(MultiClusterPooledConnectionProvider provider) { + provider.periodicFailbackCheck(); + } +} diff --git a/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderTest.java b/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderTest.java index 8263a595bf..ea783c0081 100644 --- a/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderTest.java +++ b/src/test/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProviderTest.java @@ -1,6 +1,10 @@ package redis.clients.jedis.providers; import io.github.resilience4j.circuitbreaker.CircuitBreaker; + +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import redis.clients.jedis.*; @@ -8,7 +12,9 @@ import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisValidationException; import redis.clients.jedis.mcf.Endpoint; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.jupiter.api.Assertions.*; @@ -38,6 +44,12 @@ public void setUp() { new MultiClusterClientConfig.Builder(clusterConfigs).build()); } + @AfterEach + public void destroy() { + provider.close(); + provider = null; + } + @Test public void testCircuitBreakerForcedTransitions() { @@ -55,13 +67,19 @@ public void testCircuitBreakerForcedTransitions() { } @Test - public void testIterateActiveCluster() { + public void testIterateActiveCluster() throws InterruptedException { + waitForClustersToGetHealthy(provider.getCluster(endpointStandalone0.getHostAndPort()), + provider.getCluster(endpointStandalone1.getHostAndPort())); + Endpoint e2 = provider.iterateActiveCluster(); assertEquals(endpointStandalone1.getHostAndPort(), e2); } @Test public void testIterateActiveClusterOutOfRange() { + waitForClustersToGetHealthy(provider.getCluster(endpointStandalone0.getHostAndPort()), + provider.getCluster(endpointStandalone1.getHostAndPort())); + provider.setActiveCluster(endpointStandalone0.getHostAndPort()); provider.getCluster().setDisabled(true); @@ -69,13 +87,15 @@ public void testIterateActiveClusterOutOfRange() { provider.getCluster().setDisabled(true); assertEquals(endpointStandalone1.getHostAndPort(), e2); - - assertThrows(JedisConnectionException.class, () -> provider.iterateActiveCluster()); // Should throw an - // exception + // Should throw an exception + assertThrows(JedisConnectionException.class, () -> provider.iterateActiveCluster()); } @Test public void testCanIterateOnceMore() { + waitForClustersToGetHealthy(provider.getCluster(endpointStandalone0.getHostAndPort()), + provider.getCluster(endpointStandalone1.getHostAndPort())); + provider.setActiveCluster(endpointStandalone0.getHostAndPort()); provider.getCluster().setDisabled(true); provider.iterateActiveCluster(); @@ -83,6 +103,11 @@ public void testCanIterateOnceMore() { assertFalse(provider.canIterateOnceMore()); } + private void waitForClustersToGetHealthy(Cluster... clusters) { + Awaitility.await().pollInterval(Durations.ONE_HUNDRED_MILLISECONDS).atMost(Durations.TWO_SECONDS) + .until(() -> Arrays.stream(clusters).allMatch(Cluster::isHealthy)); + } + @Test public void testRunClusterFailoverPostProcessor() { ClusterConfig[] clusterConfigs = new ClusterConfig[2]; diff --git a/src/test/java/redis/clients/jedis/providers/MultiClusterProviderHealthStatusChangeEventTest.java b/src/test/java/redis/clients/jedis/providers/MultiClusterProviderHealthStatusChangeEventTest.java new file mode 100644 index 0000000000..cb9993c8c7 --- /dev/null +++ b/src/test/java/redis/clients/jedis/providers/MultiClusterProviderHealthStatusChangeEventTest.java @@ -0,0 +1,346 @@ +package redis.clients.jedis.providers; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +import redis.clients.jedis.Connection; +import redis.clients.jedis.ConnectionPool; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisClientConfig; +import redis.clients.jedis.MultiClusterClientConfig; +import redis.clients.jedis.mcf.Endpoint; +import redis.clients.jedis.mcf.HealthCheckStrategy; +import redis.clients.jedis.mcf.HealthStatus; +import redis.clients.jedis.mcf.HealthStatusListener; +import redis.clients.jedis.mcf.HealthStatusManager; + +/** + * Tests for MultiClusterPooledConnectionProvider event handling behavior during initialization and throughout its + * lifecycle with HealthStatusChangeEvents. + */ +@ExtendWith(MockitoExtension.class) +public class MultiClusterProviderHealthStatusChangeEventTest { + + private HostAndPort endpoint1; + private HostAndPort endpoint2; + private HostAndPort endpoint3; + private JedisClientConfig clientConfig; + + @BeforeEach + void setUp() { + endpoint1 = new HostAndPort("localhost", 6879); + endpoint2 = new HostAndPort("localhost", 6880); + endpoint3 = new HostAndPort("localhost", 6881); + clientConfig = DefaultJedisClientConfig.builder().build(); + } + + private MockedConstruction mockConnectionPool() { + Connection mockConnection = mock(Connection.class); + lenient().when(mockConnection.ping()).thenReturn(true); + return mockConstruction(ConnectionPool.class, (mock, context) -> { + when(mock.getResource()).thenReturn(mockConnection); + doNothing().when(mock).close(); + }); + } + + @Test + void testEventsProcessedAfterInitialization() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + // Create clusters without health checks + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false).build(); + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig).weight(0.5f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }).build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // This should process immediately since initialization is complete + assertDoesNotThrow(() -> { + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + }, "Post-initialization events should be processed immediately"); + + // Verify the cluster status was updated + assertEquals(HealthStatus.UNHEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Cluster health status should be updated after post-init event"); + } + } + } + + @Test + void testMultipleEventsProcessedSequentially() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false).build(); + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig).weight(0.5f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }).build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Verify initial state + assertEquals(HealthStatus.HEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Should start as HEALTHY"); + + // Simulate multiple rapid events for the same endpoint + // Process events sequentially (post-init behavior) + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + assertEquals(HealthStatus.UNHEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Should be UNHEALTHY after first event"); + + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + assertEquals(HealthStatus.HEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Should be HEALTHY after second event"); + + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + assertEquals(HealthStatus.UNHEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Should be UNHEALTHY after third event"); + } + } + } + + @Test + void testEventsForMultipleEndpointsPreserveOrder() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig).weight(2.0f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }).build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // This test verifies that multiple endpoints are properly initialized + + // Verify both clusters are initialized properly + assertNotNull(provider.getCluster(endpoint1), "Cluster 1 should be available"); + assertNotNull(provider.getCluster(endpoint2), "Cluster 2 should be available"); + + // Both should be healthy (no health checks = assumed healthy) + assertTrue(provider.getCluster(endpoint1).isHealthy(), "Cluster 1 should be healthy"); + assertTrue(provider.getCluster(endpoint2).isHealthy(), "Cluster 2 should be healthy"); + } + } + } + + @Test + void testEventProcessingWithMixedHealthCheckConfiguration() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + // One cluster with health checks disabled, one with enabled (but mocked) + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false) // No health checks + .build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig).weight(2.0f).healthCheckEnabled(false) // No health checks for + // simplicity + .build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }).build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Both clusters should be available and healthy + assertNotNull(provider.getCluster(endpoint1), "Cluster 1 should be available"); + assertNotNull(provider.getCluster(endpoint2), "Cluster 2 should be available"); + + assertTrue(provider.getCluster(endpoint1).isHealthy(), "Cluster 1 should be healthy"); + assertTrue(provider.getCluster(endpoint2).isHealthy(), "Cluster 2 should be healthy"); + + // Provider should select the higher weight cluster (endpoint2) + assertEquals(provider.getCluster(endpoint2), provider.getCluster(), + "Should select higher weight cluster as active"); + } + } + } + + @Test + void testNoEventsLostDuringInitialization() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1 }).build(); + + // This test verifies that the provider initializes correctly and doesn't lose events + // In practice, with health checks disabled, no events should be generated during init + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Verify successful initialization + assertNotNull(provider.getCluster(), "Provider should have initialized successfully"); + assertEquals(provider.getCluster(endpoint1), provider.getCluster(), + "Should have selected the configured cluster"); + assertTrue(provider.getCluster().isHealthy(), + "Cluster should be healthy (assumed healthy with no health checks)"); + } + } + } + + // ========== POST-INITIALIZATION EVENT ORDERING TESTS ========== + + @Test + void testPostInitEventOrderingWithMultipleEndpoints() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig).weight(0.5f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig.ClusterConfig cluster3 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint3, clientConfig).weight(0.2f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2, cluster3 }).build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Process events immediately (post-init behavior) + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint2, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + + // Verify events were processed and cluster states updated + assertEquals(HealthStatus.HEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Endpoint1 should have final HEALTHY status"); + assertEquals(HealthStatus.UNHEALTHY, provider.getCluster(endpoint2).getHealthStatus(), + "Endpoint2 should have UNHEALTHY status"); + } + } + } + + @Test + void testPostInitRapidEventsOptimization() throws Exception { + try (MockedConstruction mockedPool = mockConnectionPool()) { + MultiClusterClientConfig.ClusterConfig cluster1 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint1, clientConfig).weight(1.0f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig.ClusterConfig cluster2 = MultiClusterClientConfig.ClusterConfig + .builder(endpoint2, clientConfig).weight(0.5f).healthCheckEnabled(false).build(); + + MultiClusterClientConfig config = new MultiClusterClientConfig.Builder( + new MultiClusterClientConfig.ClusterConfig[] { cluster1, cluster2 }).build(); + + try (MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(config)) { + // Verify initial state + assertEquals(HealthStatus.HEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Should start as HEALTHY"); + + // Send rapid sequence of events (should all be processed since init is complete) + // Process events immediately (post-init behavior) + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.UNHEALTHY, HealthStatus.HEALTHY); + MultiClusterPooledConnectionProviderHelper.onHealthStatusChange(provider, endpoint1, + HealthStatus.HEALTHY, HealthStatus.UNHEALTHY); + + // Final state should reflect the last event + assertEquals(HealthStatus.UNHEALTHY, provider.getCluster(endpoint1).getHealthStatus(), + "Should have final UNHEALTHY status from last event"); + } + } + } + + @Test + void testHealthStatusManagerEventOrdering() throws InterruptedException { + HealthStatusManager manager = new HealthStatusManager(); + + // Counter to track events received + AtomicInteger eventCount = new AtomicInteger(0); + CountDownLatch eventLatch = new CountDownLatch(1); + + // Create a listener that counts events + HealthStatusListener listener = event -> { + eventCount.incrementAndGet(); + eventLatch.countDown(); + }; + + // Register listener BEFORE adding endpoint (correct order to prevent missing events) + manager.registerListener(endpoint1, listener); + + // Create a strategy that immediately returns HEALTHY + HealthCheckStrategy immediateStrategy = new HealthCheckStrategy() { + @Override + public int getInterval() { + return 100; + } + + @Override + public int getTimeout() { + return 50; + } + + @Override + public HealthStatus doHealthCheck(Endpoint endpoint) { + return HealthStatus.HEALTHY; + } + }; + + // Add endpoint - this should trigger health check and event + manager.add(endpoint1, immediateStrategy); + + // Wait for event to be processed + assertTrue(eventLatch.await(2, TimeUnit.SECONDS), "Should receive health status event"); + + // Should have received at least one event (UNKNOWN -> HEALTHY) + assertTrue(eventCount.get() >= 1, "Should have received at least one health status event"); + + manager.remove(endpoint1); + } + + @Test + void testHealthStatusManagerHasHealthCheck() { + HealthStatusManager manager = new HealthStatusManager(); + + // Initially no health check + assertFalse(manager.hasHealthCheck(endpoint1), "Should not have health check initially"); + + // Create a simple strategy + HealthCheckStrategy strategy = new HealthCheckStrategy() { + @Override + public int getInterval() { + return 100; + } + + @Override + public int getTimeout() { + return 50; + } + + @Override + public HealthStatus doHealthCheck(Endpoint endpoint) { + return HealthStatus.HEALTHY; + } + }; + + // Add health check + manager.add(endpoint1, strategy); + assertTrue(manager.hasHealthCheck(endpoint1), "Should have health check after adding"); + + // Remove health check + manager.remove(endpoint1); + assertFalse(manager.hasHealthCheck(endpoint1), "Should not have health check after removing"); + } +} diff --git a/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java b/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java index f72923d0fc..6383f1e869 100644 --- a/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java +++ b/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java @@ -8,11 +8,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.*; +import redis.clients.jedis.MultiClusterClientConfig.ClusterConfig; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.exceptions.JedisConnectionException; import java.io.IOException; -import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -25,10 +25,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; -@Tags({ - @Tag("failover"), - @Tag("scenario") -}) +@Tags({ @Tag("failover"), @Tag("scenario") }) public class ActiveActiveFailoverTest { private static final Logger log = LoggerFactory.getLogger(ActiveActiveFailoverTest.class); @@ -52,13 +49,13 @@ public void testFailover() { MultiClusterClientConfig.ClusterConfig[] clusterConfig = new MultiClusterClientConfig.ClusterConfig[2]; JedisClientConfig config = endpoint.getClientConfigBuilder() - .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) - .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); - clusterConfig[0] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(0), - config, RecommendedSettings.poolConfig); - clusterConfig[1] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(1), - config, RecommendedSettings.poolConfig); + clusterConfig[0] = ClusterConfig.builder(endpoint.getHostAndPort(0), config) + .connectionPoolConfig(RecommendedSettings.poolConfig).weight(1.0f).build(); + clusterConfig[1] = ClusterConfig.builder(endpoint.getHostAndPort(1), config) + .connectionPoolConfig(RecommendedSettings.poolConfig).weight(0.5f).build(); MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder(clusterConfig); @@ -67,6 +64,10 @@ public void testFailover() { builder.circuitBreakerSlidingWindowMinCalls(1); builder.circuitBreakerFailureRateThreshold(10.0f); // percentage of failures to trigger circuit breaker + builder.failbackSupported(true); + builder.failbackCheckInterval(1000); + builder.gracePeriod(10000); + builder.retryWaitDuration(10); builder.retryMaxAttempts(1); builder.retryWaitDurationExponentialBackoffMultiplier(1); @@ -79,6 +80,10 @@ class FailoverReporter implements Consumer { Instant failoverAt = null; + boolean failbackHappened = false; + + Instant failbackAt = null; + public String getCurrentClusterName() { return currentClusterName; } @@ -86,17 +91,19 @@ public String getCurrentClusterName() { @Override public void accept(String clusterName) { this.currentClusterName = clusterName; - log.info( - "\n\n====FailoverEvent=== \nJedis failover to cluster: {}\n====FailoverEvent===\n\n", - clusterName); - - failoverHappened = true; - failoverAt = Instant.now(); + log.info("\n\n====FailoverEvent=== \nJedis failover to cluster: {}\n====FailoverEvent===\n\n", clusterName); + + if (failoverHappened) { + failbackHappened = true; + failbackAt = Instant.now(); + } else { + failoverHappened = true; + failoverAt = Instant.now(); + } } } - MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider( - builder.build()); + MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(builder.build()); FailoverReporter reporter = new FailoverReporter(); provider.setClusterFailoverPostProcessor(reporter); provider.setActiveCluster(endpoint.getHostAndPort(0)); @@ -117,15 +124,17 @@ public void accept(String clusterName) { int retryingDelay = 5; while (true) { try { - Map executionInfo = new HashMap() {{ - put("threadId", String.valueOf(threadId)); - put("cluster", reporter.getCurrentClusterName()); - }}; + Map executionInfo = new HashMap() { + { + put("threadId", String.valueOf(threadId)); + put("cluster", reporter.getCurrentClusterName()); + } + }; client.xadd("execution_log", StreamEntryID.NEW_ENTRY, executionInfo); if (attempt > 0) { log.info("Thread {} recovered after {} ms. Threads still not recovered: {}", threadId, - attempt * retryingDelay, retryingThreadsCounter.decrementAndGet()); + attempt * retryingDelay, retryingThreadsCounter.decrementAndGet()); } break; @@ -134,15 +143,13 @@ public void accept(String clusterName) { if (reporter.failoverHappened) { long failedCommands = failedCommandsAfterFailover.incrementAndGet(); lastFailedCommandAt.set(Instant.now()); - log.warn( - "Thread {} failed to execute command after failover. Failed commands after failover: {}", - threadId, failedCommands); + log.warn("Thread {} failed to execute command after failover. Failed commands after failover: {}", threadId, + failedCommands); } if (attempt == 0) { long failedThreads = retryingThreadsCounter.incrementAndGet(); - log.warn("Thread {} failed to execute command. Failed threads: {}", threadId, - failedThreads); + log.warn("Thread {} failed to execute command. Failed threads: {}", threadId, failedThreads); } try { Thread.sleep(retryingDelay); @@ -153,20 +160,21 @@ public void accept(String clusterName) { } } return true; - }, 18); + }, 4); fakeApp.setKeepExecutingForSeconds(30); Thread t = new Thread(fakeApp); t.start(); HashMap params = new HashMap<>(); params.put("bdb_id", endpoint.getBdbId()); - params.put("rlutil_command", "pause_bdb"); + params.put("actions", + "[{\"type\":\"execute_rlutil_command\",\"params\":{\"rlutil_command\":\"pause_bdb\"}},{\"type\":\"wait\",\"params\":{\"wait_time\":\"15\"}},{\"type\":\"execute_rlutil_command\",\"params\":{\"rlutil_command\":\"resume_bdb\"}}]"); FaultInjectionClient.TriggerActionResponse actionResponse = null; try { - log.info("Triggering bdb_pause"); - actionResponse = faultClient.triggerAction("execute_rlutil_command", params); + log.info("Triggering bdb_pause + wait 15 seconds + bdb_resume"); + actionResponse = faultClient.triggerAction("sequence_of_actions", params); } catch (IOException e) { fail("Fault Injection Server error:" + e.getMessage()); } @@ -182,15 +190,17 @@ public void accept(String clusterName) { ConnectionPool pool = provider.getCluster(endpoint.getHostAndPort(0)).getConnectionPool(); - log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(), - pool.getNumIdle()); - log.info("Full failover time: {} s", - Duration.between(reporter.failoverAt, lastFailedCommandAt.get()).getSeconds()); + log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(), pool.getNumIdle()); + log.info("Failover happened at: {}", reporter.failoverAt); + log.info("Failback happened at: {}", reporter.failbackAt); + log.info("Last failed command at: {}", lastFailedCommandAt.get()); assertEquals(0, pool.getNumActive()); assertTrue(fakeApp.capturedExceptions().isEmpty()); + assertTrue(reporter.failoverHappened); + assertTrue(reporter.failbackHappened); client.close(); } -} +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java index c4e1c5717b..17170251cd 100644 --- a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java +++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java @@ -89,8 +89,8 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration private static CloseableHttpClient getHttpClient() { RequestConfig requestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(5000, TimeUnit.MILLISECONDS) - .setResponseTimeout(5000, TimeUnit.MILLISECONDS).build(); + .setConnectionRequestTimeout(10000, TimeUnit.MILLISECONDS) + .setResponseTimeout(10000, TimeUnit.MILLISECONDS).build(); return HttpClientBuilder.create() .setDefaultRequestConfig(requestConfig).build();