diff --git a/internal/k8sutils/cluster-scaling.go b/internal/k8sutils/cluster-scaling.go index c7fc8ca75..91189e072 100644 --- a/internal/k8sutils/cluster-scaling.go +++ b/internal/k8sutils/cluster-scaling.go @@ -6,8 +6,10 @@ import ( "net" "strconv" "strings" + "time" rcvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/rediscluster/v1beta2" + retry "github.com/avast/retry-go" redis "github.com/redis/go-redis/v9" "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/log" @@ -129,10 +131,57 @@ func getRedisNodeID(ctx context.Context, client kubernetes.Interface, cr *rcvb2. // Rebalance the Redis CLuster using the Empty Master Nodes func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) { + logger := log.FromContext(ctx) + + // Use retry logic for rebalancing + err := retry.Do( + func() error { + return rebalanceClusterWithEmptyMasters(ctx, client, cr) + }, + retry.Attempts(3), + retry.Delay(2*time.Second), + retry.DelayType(retry.BackOffDelay), + retry.OnRetry(func(n uint, err error) { + logger.Info("Retrying cluster rebalance with empty masters", "attempt", n+1, "error", err) + }), + ) + if err != nil { + logger.Error(err, "Failed to rebalance cluster with empty masters after retries") + return + } + + logger.Info("Successfully rebalanced cluster with empty masters") +} + +// rebalanceClusterWithEmptyMasters performs the actual rebalance operation and validates the result +func rebalanceClusterWithEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error { + logger := log.FromContext(ctx) + + // Find a healthy leader node to execute the rebalance command + totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader") + var selectedPod string + for i := int32(0); i < totalRedisLeaderNodes; i++ { + podName := fmt.Sprintf("%s-leader-%d", cr.Name, i) + // Test if pod is reachable + redisClient := configureRedisClient(ctx, client, cr, podName) + if _, err := redisClient.Ping(ctx).Result(); err == nil { + selectedPod = podName + redisClient.Close() + break + } + redisClient.Close() + } + + if selectedPod == "" { + return fmt.Errorf("no healthy leader node found to execute rebalance") + } + + logger.V(1).Info("Executing rebalance with empty masters", "pod", selectedPod) + // cmd = redis-cli --cluster rebalance : --cluster-use-empty-masters -a var cmd []string pod := RedisDetails{ - PodName: cr.Name + "-leader-1", + PodName: selectedPod, Namespace: cr.Namespace, } cmd = []string{"redis-cli", "--cluster", "rebalance"} @@ -141,15 +190,26 @@ func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.In if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil { pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key) if err != nil { - log.FromContext(ctx).Error(err, "Error in getting redis password") + return fmt.Errorf("error getting redis password: %w", err) } - cmd = append(cmd, "-a") - cmd = append(cmd, pass) + cmd = append(cmd, "-a", pass) } - cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...) + cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, selectedPod)...) + + output, err := executeCommand1(ctx, client, cr, cmd, selectedPod) + if err != nil { + return fmt.Errorf("failed to execute rebalance command: %w", err) + } - executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1") + logger.V(1).Info("Rebalance command output", "output", output) + + // Validate that the cluster is now balanced (no empty masters) + if err := validateClusterBalance(ctx, client, cr, false); err != nil { + return fmt.Errorf("cluster validation failed after rebalance: %w", err) + } + + return nil } func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) { @@ -175,10 +235,57 @@ func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *r // Rebalance Redis Cluster Would Rebalance the Redis Cluster without using the empty masters func RebalanceRedisCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) { + logger := log.FromContext(ctx) + + // Use retry logic for rebalancing + err := retry.Do( + func() error { + return rebalanceCluster(ctx, client, cr) + }, + retry.Attempts(3), + retry.Delay(2*time.Second), + retry.DelayType(retry.BackOffDelay), + retry.OnRetry(func(n uint, err error) { + logger.Info("Retrying cluster rebalance", "attempt", n+1, "error", err) + }), + ) + if err != nil { + logger.Error(err, "Failed to rebalance cluster after retries") + return + } + + logger.Info("Successfully rebalanced cluster") +} + +// rebalanceCluster performs the actual rebalance operation and validates the result +func rebalanceCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error { + logger := log.FromContext(ctx) + + // Find a healthy leader node to execute the rebalance command + totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader") + var selectedPod string + for i := int32(0); i < totalRedisLeaderNodes; i++ { + podName := fmt.Sprintf("%s-leader-%d", cr.Name, i) + // Test if pod is reachable + redisClient := configureRedisClient(ctx, client, cr, podName) + if _, err := redisClient.Ping(ctx).Result(); err == nil { + selectedPod = podName + redisClient.Close() + break + } + redisClient.Close() + } + + if selectedPod == "" { + return fmt.Errorf("no healthy leader node found to execute rebalance") + } + + logger.V(1).Info("Executing rebalance", "pod", selectedPod) + // cmd = redis-cli --cluster rebalance : -a var cmd []string pod := RedisDetails{ - PodName: cr.Name + "-leader-1", + PodName: selectedPod, Namespace: cr.Namespace, } cmd = []string{"redis-cli", "--cluster", "rebalance"} @@ -186,15 +293,27 @@ func RebalanceRedisCluster(ctx context.Context, client kubernetes.Interface, cr if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil { pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key) if err != nil { - log.FromContext(ctx).Error(err, "Error in getting redis password") + return fmt.Errorf("error getting redis password: %w", err) } - cmd = append(cmd, "-a") - cmd = append(cmd, pass) + cmd = append(cmd, "-a", pass) } - cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...) + cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, selectedPod)...) + + output, err := executeCommand1(ctx, client, cr, cmd, selectedPod) + if err != nil { + return fmt.Errorf("failed to execute rebalance command: %w", err) + } + + logger.V(1).Info("Rebalance command output", "output", output) - executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1") + // Validate that the cluster is balanced + // Allow empty masters here as this is called during scale-down + if err := validateClusterBalance(ctx, client, cr, true); err != nil { + return fmt.Errorf("cluster validation failed after rebalance: %w", err) + } + + return nil } // Add redis cluster node would add a node to the existing redis cluster using redis-cli @@ -364,3 +483,76 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *rcvb2 } return nil } + +// validateClusterBalance checks if the cluster is properly balanced +// Returns error if any master has 0 slots (empty master) or if slots are not evenly distributed +func validateClusterBalance(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, allowEmptyMasters bool) error { + logger := log.FromContext(ctx) + totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader") + + if totalRedisLeaderNodes == 0 { + return fmt.Errorf("no leader nodes found in cluster") + } + + // Try to connect to any available leader node + var redisClient *redis.Client + var connectedPod string + for i := 0; i < int(totalRedisLeaderNodes); i++ { + podName := fmt.Sprintf("%s-leader-%d", cr.Name, i) + redisClient = configureRedisClient(ctx, client, cr, podName) + // Test connection + if _, err := redisClient.Ping(ctx).Result(); err == nil { + connectedPod = podName + break + } + redisClient.Close() + } + + if redisClient == nil || connectedPod == "" { + return fmt.Errorf("unable to connect to any leader node") + } + defer redisClient.Close() + + logger.V(1).Info("Validating cluster balance", "connectedPod", connectedPod, "totalLeaders", totalRedisLeaderNodes) + + emptyMasterCount := 0 + slotsPerNode := make(map[string]int) + + for i := 0; i < int(totalRedisLeaderNodes); i++ { + pod := RedisDetails{ + PodName: fmt.Sprintf("%s-leader-%d", cr.Name, i), + Namespace: cr.Namespace, + } + podNodeID := getRedisNodeID(ctx, client, cr, pod) + if podNodeID == "" { + logger.V(1).Info("Could not get node ID for pod", "pod", pod.PodName) + continue + } + + podSlots := getRedisClusterSlots(ctx, redisClient, podNodeID) + slotCount := 0 + if podSlots != "" && podSlots != "0" { + // Parse slot count (podSlots can be like "5461" or "0-5460") + if strings.Contains(podSlots, "-") { + slotCount, _ = strconv.Atoi(strings.Split(podSlots, "-")[1]) + slotCount++ // inclusive range + } else { + slotCount, _ = strconv.Atoi(podSlots) + } + } + + slotsPerNode[pod.PodName] = slotCount + if slotCount == 0 { + emptyMasterCount++ + logger.V(1).Info("Found empty master node", "pod", pod.PodName, "nodeID", podNodeID) + } + } + + // Check if we have empty masters when we shouldn't + if !allowEmptyMasters && emptyMasterCount > 0 { + return fmt.Errorf("found %d empty master nodes, cluster is not balanced", emptyMasterCount) + } + + logger.Info("Cluster balance validation completed", "emptyMasters", emptyMasterCount, "slotsPerNode", slotsPerNode) + return nil +}