Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 204 additions & 12 deletions internal/k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"net"
"strconv"
"strings"
"time"

rcvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/rediscluster/v1beta2"
retry "github.com/avast/retry-go"
redis "github.com/redis/go-redis/v9"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -129,10 +131,57 @@ func getRedisNodeID(ctx context.Context, client kubernetes.Interface, cr *rcvb2.

// Rebalance the Redis CLuster using the Empty Master Nodes
func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
logger := log.FromContext(ctx)

// Use retry logic for rebalancing
err := retry.Do(
func() error {
return rebalanceClusterWithEmptyMasters(ctx, client, cr)
},
retry.Attempts(3),
retry.Delay(2*time.Second),
retry.DelayType(retry.BackOffDelay),
retry.OnRetry(func(n uint, err error) {
logger.Info("Retrying cluster rebalance with empty masters", "attempt", n+1, "error", err)
}),
)
if err != nil {
logger.Error(err, "Failed to rebalance cluster with empty masters after retries")
return
}

logger.Info("Successfully rebalanced cluster with empty masters")
}

// rebalanceClusterWithEmptyMasters performs the actual rebalance operation and validates the result
func rebalanceClusterWithEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error {
logger := log.FromContext(ctx)

// Find a healthy leader node to execute the rebalance command
totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader")
var selectedPod string
for i := int32(0); i < totalRedisLeaderNodes; i++ {
podName := fmt.Sprintf("%s-leader-%d", cr.Name, i)
// Test if pod is reachable
redisClient := configureRedisClient(ctx, client, cr, podName)
if _, err := redisClient.Ping(ctx).Result(); err == nil {
selectedPod = podName
redisClient.Close()
break
}
redisClient.Close()
}

if selectedPod == "" {
return fmt.Errorf("no healthy leader node found to execute rebalance")
}

logger.V(1).Info("Executing rebalance with empty masters", "pod", selectedPod)

// cmd = redis-cli --cluster rebalance <redis>:<port> --cluster-use-empty-masters -a <pass>
var cmd []string
pod := RedisDetails{
PodName: cr.Name + "-leader-1",
PodName: selectedPod,
Namespace: cr.Namespace,
}
cmd = []string{"redis-cli", "--cluster", "rebalance"}
Expand All @@ -141,15 +190,26 @@ func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.In
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
if err != nil {
log.FromContext(ctx).Error(err, "Error in getting redis password")
return fmt.Errorf("error getting redis password: %w", err)
}
cmd = append(cmd, "-a")
cmd = append(cmd, pass)
cmd = append(cmd, "-a", pass)
}

cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, selectedPod)...)

output, err := executeCommand1(ctx, client, cr, cmd, selectedPod)
if err != nil {
return fmt.Errorf("failed to execute rebalance command: %w", err)
}

executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1")
logger.V(1).Info("Rebalance command output", "output", output)

// Validate that the cluster is now balanced (no empty masters)
if err := validateClusterBalance(ctx, client, cr, false); err != nil {
return fmt.Errorf("cluster validation failed after rebalance: %w", err)
}

return nil
}

func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
Expand All @@ -175,26 +235,85 @@ func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *r

// Rebalance Redis Cluster Would Rebalance the Redis Cluster without using the empty masters
func RebalanceRedisCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
logger := log.FromContext(ctx)

// Use retry logic for rebalancing
err := retry.Do(
func() error {
return rebalanceCluster(ctx, client, cr)
},
retry.Attempts(3),
retry.Delay(2*time.Second),
retry.DelayType(retry.BackOffDelay),
retry.OnRetry(func(n uint, err error) {
logger.Info("Retrying cluster rebalance", "attempt", n+1, "error", err)
}),
)
if err != nil {
logger.Error(err, "Failed to rebalance cluster after retries")
return
}

logger.Info("Successfully rebalanced cluster")
}

// rebalanceCluster performs the actual rebalance operation and validates the result
func rebalanceCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error {
logger := log.FromContext(ctx)

// Find a healthy leader node to execute the rebalance command
totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader")
var selectedPod string
for i := int32(0); i < totalRedisLeaderNodes; i++ {
podName := fmt.Sprintf("%s-leader-%d", cr.Name, i)
// Test if pod is reachable
redisClient := configureRedisClient(ctx, client, cr, podName)
if _, err := redisClient.Ping(ctx).Result(); err == nil {
selectedPod = podName
redisClient.Close()
break
}
redisClient.Close()
}

if selectedPod == "" {
return fmt.Errorf("no healthy leader node found to execute rebalance")
}

logger.V(1).Info("Executing rebalance", "pod", selectedPod)

// cmd = redis-cli --cluster rebalance <redis>:<port> -a <pass>
var cmd []string
pod := RedisDetails{
PodName: cr.Name + "-leader-1",
PodName: selectedPod,
Namespace: cr.Namespace,
}
cmd = []string{"redis-cli", "--cluster", "rebalance"}
cmd = append(cmd, getEndpoint(ctx, client, cr, pod))
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
if err != nil {
log.FromContext(ctx).Error(err, "Error in getting redis password")
return fmt.Errorf("error getting redis password: %w", err)
}
cmd = append(cmd, "-a")
cmd = append(cmd, pass)
cmd = append(cmd, "-a", pass)
}

cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, selectedPod)...)

output, err := executeCommand1(ctx, client, cr, cmd, selectedPod)
if err != nil {
return fmt.Errorf("failed to execute rebalance command: %w", err)
}

logger.V(1).Info("Rebalance command output", "output", output)

executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1")
// Validate that the cluster is balanced
// Allow empty masters here as this is called during scale-down
if err := validateClusterBalance(ctx, client, cr, true); err != nil {
return fmt.Errorf("cluster validation failed after rebalance: %w", err)
}

return nil
}

// Add redis cluster node would add a node to the existing redis cluster using redis-cli
Expand Down Expand Up @@ -364,3 +483,76 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *rcvb2
}
return nil
}

// validateClusterBalance checks if the cluster is properly balanced
// Returns error if any master has 0 slots (empty master) or if slots are not evenly distributed
func validateClusterBalance(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, allowEmptyMasters bool) error {
logger := log.FromContext(ctx)
totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader")

if totalRedisLeaderNodes == 0 {
return fmt.Errorf("no leader nodes found in cluster")
}

// Try to connect to any available leader node
var redisClient *redis.Client
var connectedPod string
for i := 0; i < int(totalRedisLeaderNodes); i++ {
podName := fmt.Sprintf("%s-leader-%d", cr.Name, i)
redisClient = configureRedisClient(ctx, client, cr, podName)
// Test connection
if _, err := redisClient.Ping(ctx).Result(); err == nil {
connectedPod = podName
break
}
redisClient.Close()
}

if redisClient == nil || connectedPod == "" {
return fmt.Errorf("unable to connect to any leader node")
}
defer redisClient.Close()

logger.V(1).Info("Validating cluster balance", "connectedPod", connectedPod, "totalLeaders", totalRedisLeaderNodes)

emptyMasterCount := 0
slotsPerNode := make(map[string]int)

for i := 0; i < int(totalRedisLeaderNodes); i++ {
pod := RedisDetails{
PodName: fmt.Sprintf("%s-leader-%d", cr.Name, i),
Namespace: cr.Namespace,
}
podNodeID := getRedisNodeID(ctx, client, cr, pod)
if podNodeID == "" {
logger.V(1).Info("Could not get node ID for pod", "pod", pod.PodName)
continue
}

podSlots := getRedisClusterSlots(ctx, redisClient, podNodeID)
slotCount := 0
if podSlots != "" && podSlots != "0" {
// Parse slot count (podSlots can be like "5461" or "0-5460")
if strings.Contains(podSlots, "-") {
slotCount, _ = strconv.Atoi(strings.Split(podSlots, "-")[1])
slotCount++ // inclusive range
} else {
slotCount, _ = strconv.Atoi(podSlots)
}
}

slotsPerNode[pod.PodName] = slotCount
if slotCount == 0 {
emptyMasterCount++
logger.V(1).Info("Found empty master node", "pod", pod.PodName, "nodeID", podNodeID)
}
}

// Check if we have empty masters when we shouldn't
if !allowEmptyMasters && emptyMasterCount > 0 {
return fmt.Errorf("found %d empty master nodes, cluster is not balanced", emptyMasterCount)
}

logger.Info("Cluster balance validation completed", "emptyMasters", emptyMasterCount, "slotsPerNode", slotsPerNode)
return nil
}
Loading