Skip to content

Commit 2ba8ab2

Browse files
Damian Seredyndrivebyer
authored andcommitted
feat: Add retry and validation logic to Redis cluster operations
- Add validateClusterBalance() to verify slot distribution - Implement retry logic for RebalanceRedisCluster functions - Dynamically select healthy leader node for operations - Validate cluster state after rebalance operations - Return structured errors with context Signed-off-by: Damian Seredyn <[email protected]>
1 parent 9ba8db7 commit 2ba8ab2

File tree

1 file changed

+204
-12
lines changed

1 file changed

+204
-12
lines changed

internal/k8sutils/cluster-scaling.go

Lines changed: 204 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"net"
77
"strconv"
88
"strings"
9+
"time"
910

1011
rcvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/rediscluster/v1beta2"
12+
retry "github.com/avast/retry-go"
1113
redis "github.com/redis/go-redis/v9"
1214
"k8s.io/client-go/kubernetes"
1315
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -129,10 +131,57 @@ func getRedisNodeID(ctx context.Context, client kubernetes.Interface, cr *rcvb2.
129131

130132
// Rebalance the Redis CLuster using the Empty Master Nodes
131133
func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
134+
logger := log.FromContext(ctx)
135+
136+
// Use retry logic for rebalancing
137+
err := retry.Do(
138+
func() error {
139+
return rebalanceClusterWithEmptyMasters(ctx, client, cr)
140+
},
141+
retry.Attempts(3),
142+
retry.Delay(2*time.Second),
143+
retry.DelayType(retry.BackOffDelay),
144+
retry.OnRetry(func(n uint, err error) {
145+
logger.Info("Retrying cluster rebalance with empty masters", "attempt", n+1, "error", err)
146+
}),
147+
)
148+
if err != nil {
149+
logger.Error(err, "Failed to rebalance cluster with empty masters after retries")
150+
return
151+
}
152+
153+
logger.Info("Successfully rebalanced cluster with empty masters")
154+
}
155+
156+
// rebalanceClusterWithEmptyMasters performs the actual rebalance operation and validates the result
157+
func rebalanceClusterWithEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error {
158+
logger := log.FromContext(ctx)
159+
160+
// Find a healthy leader node to execute the rebalance command
161+
totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader")
162+
var selectedPod string
163+
for i := int32(0); i < totalRedisLeaderNodes; i++ {
164+
podName := fmt.Sprintf("%s-leader-%d", cr.Name, i)
165+
// Test if pod is reachable
166+
redisClient := configureRedisClient(ctx, client, cr, podName)
167+
if _, err := redisClient.Ping(ctx).Result(); err == nil {
168+
selectedPod = podName
169+
redisClient.Close()
170+
break
171+
}
172+
redisClient.Close()
173+
}
174+
175+
if selectedPod == "" {
176+
return fmt.Errorf("no healthy leader node found to execute rebalance")
177+
}
178+
179+
logger.V(1).Info("Executing rebalance with empty masters", "pod", selectedPod)
180+
132181
// cmd = redis-cli --cluster rebalance <redis>:<port> --cluster-use-empty-masters -a <pass>
133182
var cmd []string
134183
pod := RedisDetails{
135-
PodName: cr.Name + "-leader-1",
184+
PodName: selectedPod,
136185
Namespace: cr.Namespace,
137186
}
138187
cmd = []string{"redis-cli", "--cluster", "rebalance"}
@@ -141,15 +190,26 @@ func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.In
141190
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
142191
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
143192
if err != nil {
144-
log.FromContext(ctx).Error(err, "Error in getting redis password")
193+
return fmt.Errorf("error getting redis password: %w", err)
145194
}
146-
cmd = append(cmd, "-a")
147-
cmd = append(cmd, pass)
195+
cmd = append(cmd, "-a", pass)
148196
}
149197

150-
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
198+
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, selectedPod)...)
199+
200+
output, err := executeCommand1(ctx, client, cr, cmd, selectedPod)
201+
if err != nil {
202+
return fmt.Errorf("failed to execute rebalance command: %w", err)
203+
}
151204

152-
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1")
205+
logger.V(1).Info("Rebalance command output", "output", output)
206+
207+
// Validate that the cluster is now balanced (no empty masters)
208+
if err := validateClusterBalance(ctx, client, cr, false); err != nil {
209+
return fmt.Errorf("cluster validation failed after rebalance: %w", err)
210+
}
211+
212+
return nil
153213
}
154214

155215
func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
@@ -175,26 +235,85 @@ func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *r
175235

176236
// Rebalance Redis Cluster Would Rebalance the Redis Cluster without using the empty masters
177237
func RebalanceRedisCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
238+
logger := log.FromContext(ctx)
239+
240+
// Use retry logic for rebalancing
241+
err := retry.Do(
242+
func() error {
243+
return rebalanceCluster(ctx, client, cr)
244+
},
245+
retry.Attempts(3),
246+
retry.Delay(2*time.Second),
247+
retry.DelayType(retry.BackOffDelay),
248+
retry.OnRetry(func(n uint, err error) {
249+
logger.Info("Retrying cluster rebalance", "attempt", n+1, "error", err)
250+
}),
251+
)
252+
if err != nil {
253+
logger.Error(err, "Failed to rebalance cluster after retries")
254+
return
255+
}
256+
257+
logger.Info("Successfully rebalanced cluster")
258+
}
259+
260+
// rebalanceCluster performs the actual rebalance operation and validates the result
261+
func rebalanceCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error {
262+
logger := log.FromContext(ctx)
263+
264+
// Find a healthy leader node to execute the rebalance command
265+
totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader")
266+
var selectedPod string
267+
for i := int32(0); i < totalRedisLeaderNodes; i++ {
268+
podName := fmt.Sprintf("%s-leader-%d", cr.Name, i)
269+
// Test if pod is reachable
270+
redisClient := configureRedisClient(ctx, client, cr, podName)
271+
if _, err := redisClient.Ping(ctx).Result(); err == nil {
272+
selectedPod = podName
273+
redisClient.Close()
274+
break
275+
}
276+
redisClient.Close()
277+
}
278+
279+
if selectedPod == "" {
280+
return fmt.Errorf("no healthy leader node found to execute rebalance")
281+
}
282+
283+
logger.V(1).Info("Executing rebalance", "pod", selectedPod)
284+
178285
// cmd = redis-cli --cluster rebalance <redis>:<port> -a <pass>
179286
var cmd []string
180287
pod := RedisDetails{
181-
PodName: cr.Name + "-leader-1",
288+
PodName: selectedPod,
182289
Namespace: cr.Namespace,
183290
}
184291
cmd = []string{"redis-cli", "--cluster", "rebalance"}
185292
cmd = append(cmd, getEndpoint(ctx, client, cr, pod))
186293
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
187294
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
188295
if err != nil {
189-
log.FromContext(ctx).Error(err, "Error in getting redis password")
296+
return fmt.Errorf("error getting redis password: %w", err)
190297
}
191-
cmd = append(cmd, "-a")
192-
cmd = append(cmd, pass)
298+
cmd = append(cmd, "-a", pass)
193299
}
194300

195-
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
301+
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, selectedPod)...)
302+
303+
output, err := executeCommand1(ctx, client, cr, cmd, selectedPod)
304+
if err != nil {
305+
return fmt.Errorf("failed to execute rebalance command: %w", err)
306+
}
307+
308+
logger.V(1).Info("Rebalance command output", "output", output)
196309

197-
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1")
310+
// Validate that the cluster is balanced
311+
// Allow empty masters here as this is called during scale-down
312+
if err := validateClusterBalance(ctx, client, cr, true); err != nil {
313+
return fmt.Errorf("cluster validation failed after rebalance: %w", err)
314+
}
315+
316+
return nil
198317
}
199318

200319
// Add redis cluster node would add a node to the existing redis cluster using redis-cli
@@ -364,3 +483,76 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *rcvb2
364483
}
365484
return nil
366485
}
486+
487+
// validateClusterBalance checks if the cluster is properly balanced
488+
// Returns error if any master has 0 slots (empty master) or if slots are not evenly distributed
489+
func validateClusterBalance(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, allowEmptyMasters bool) error {
490+
logger := log.FromContext(ctx)
491+
totalRedisLeaderNodes := CheckRedisNodeCount(ctx, client, cr, "leader")
492+
493+
if totalRedisLeaderNodes == 0 {
494+
return fmt.Errorf("no leader nodes found in cluster")
495+
}
496+
497+
// Try to connect to any available leader node
498+
var redisClient *redis.Client
499+
var connectedPod string
500+
for i := 0; i < int(totalRedisLeaderNodes); i++ {
501+
podName := fmt.Sprintf("%s-leader-%d", cr.Name, i)
502+
redisClient = configureRedisClient(ctx, client, cr, podName)
503+
// Test connection
504+
if _, err := redisClient.Ping(ctx).Result(); err == nil {
505+
connectedPod = podName
506+
break
507+
}
508+
redisClient.Close()
509+
}
510+
511+
if redisClient == nil || connectedPod == "" {
512+
return fmt.Errorf("unable to connect to any leader node")
513+
}
514+
defer redisClient.Close()
515+
516+
logger.V(1).Info("Validating cluster balance", "connectedPod", connectedPod, "totalLeaders", totalRedisLeaderNodes)
517+
518+
emptyMasterCount := 0
519+
slotsPerNode := make(map[string]int)
520+
521+
for i := 0; i < int(totalRedisLeaderNodes); i++ {
522+
pod := RedisDetails{
523+
PodName: fmt.Sprintf("%s-leader-%d", cr.Name, i),
524+
Namespace: cr.Namespace,
525+
}
526+
podNodeID := getRedisNodeID(ctx, client, cr, pod)
527+
if podNodeID == "" {
528+
logger.V(1).Info("Could not get node ID for pod", "pod", pod.PodName)
529+
continue
530+
}
531+
532+
podSlots := getRedisClusterSlots(ctx, redisClient, podNodeID)
533+
slotCount := 0
534+
if podSlots != "" && podSlots != "0" {
535+
// Parse slot count (podSlots can be like "5461" or "0-5460")
536+
if strings.Contains(podSlots, "-") {
537+
slotCount, _ = strconv.Atoi(strings.Split(podSlots, "-")[1])
538+
slotCount++ // inclusive range
539+
} else {
540+
slotCount, _ = strconv.Atoi(podSlots)
541+
}
542+
}
543+
544+
slotsPerNode[pod.PodName] = slotCount
545+
if slotCount == 0 {
546+
emptyMasterCount++
547+
logger.V(1).Info("Found empty master node", "pod", pod.PodName, "nodeID", podNodeID)
548+
}
549+
}
550+
551+
// Check if we have empty masters when we shouldn't
552+
if !allowEmptyMasters && emptyMasterCount > 0 {
553+
return fmt.Errorf("found %d empty master nodes, cluster is not balanced", emptyMasterCount)
554+
}
555+
556+
logger.Info("Cluster balance validation completed", "emptyMasters", emptyMasterCount, "slotsPerNode", slotsPerNode)
557+
return nil
558+
}

0 commit comments

Comments
 (0)