@@ -6,8 +6,10 @@ import (
66 "net"
77 "strconv"
88 "strings"
9+ "time"
910
1011 rcvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/rediscluster/v1beta2"
12+ retry "github.com/avast/retry-go"
1113 redis "github.com/redis/go-redis/v9"
1214 "k8s.io/client-go/kubernetes"
1315 "sigs.k8s.io/controller-runtime/pkg/log"
@@ -129,10 +131,57 @@ func getRedisNodeID(ctx context.Context, client kubernetes.Interface, cr *rcvb2.
129131
130132// Rebalance the Redis CLuster using the Empty Master Nodes
131133func RebalanceRedisClusterEmptyMasters (ctx context.Context , client kubernetes.Interface , cr * rcvb2.RedisCluster ) {
134+ logger := log .FromContext (ctx )
135+
136+ // Use retry logic for rebalancing
137+ err := retry .Do (
138+ func () error {
139+ return rebalanceClusterWithEmptyMasters (ctx , client , cr )
140+ },
141+ retry .Attempts (3 ),
142+ retry .Delay (2 * time .Second ),
143+ retry .DelayType (retry .BackOffDelay ),
144+ retry .OnRetry (func (n uint , err error ) {
145+ logger .Info ("Retrying cluster rebalance with empty masters" , "attempt" , n + 1 , "error" , err )
146+ }),
147+ )
148+ if err != nil {
149+ logger .Error (err , "Failed to rebalance cluster with empty masters after retries" )
150+ return
151+ }
152+
153+ logger .Info ("Successfully rebalanced cluster with empty masters" )
154+ }
155+
156+ // rebalanceClusterWithEmptyMasters performs the actual rebalance operation and validates the result
157+ func rebalanceClusterWithEmptyMasters (ctx context.Context , client kubernetes.Interface , cr * rcvb2.RedisCluster ) error {
158+ logger := log .FromContext (ctx )
159+
160+ // Find a healthy leader node to execute the rebalance command
161+ totalRedisLeaderNodes := CheckRedisNodeCount (ctx , client , cr , "leader" )
162+ var selectedPod string
163+ for i := int32 (0 ); i < totalRedisLeaderNodes ; i ++ {
164+ podName := fmt .Sprintf ("%s-leader-%d" , cr .Name , i )
165+ // Test if pod is reachable
166+ redisClient := configureRedisClient (ctx , client , cr , podName )
167+ if _ , err := redisClient .Ping (ctx ).Result (); err == nil {
168+ selectedPod = podName
169+ redisClient .Close ()
170+ break
171+ }
172+ redisClient .Close ()
173+ }
174+
175+ if selectedPod == "" {
176+ return fmt .Errorf ("no healthy leader node found to execute rebalance" )
177+ }
178+
179+ logger .V (1 ).Info ("Executing rebalance with empty masters" , "pod" , selectedPod )
180+
132181 // cmd = redis-cli --cluster rebalance <redis>:<port> --cluster-use-empty-masters -a <pass>
133182 var cmd []string
134183 pod := RedisDetails {
135- PodName : cr . Name + "-leader-1" ,
184+ PodName : selectedPod ,
136185 Namespace : cr .Namespace ,
137186 }
138187 cmd = []string {"redis-cli" , "--cluster" , "rebalance" }
@@ -141,15 +190,26 @@ func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.In
141190 if cr .Spec .KubernetesConfig .ExistingPasswordSecret != nil {
142191 pass , err := getRedisPassword (ctx , client , cr .Namespace , * cr .Spec .KubernetesConfig .ExistingPasswordSecret .Name , * cr .Spec .KubernetesConfig .ExistingPasswordSecret .Key )
143192 if err != nil {
144- log . FromContext ( ctx ). Error ( err , "Error in getting redis password" )
193+ return fmt . Errorf ( "error getting redis password: %w" , err )
145194 }
146- cmd = append (cmd , "-a" )
147- cmd = append (cmd , pass )
195+ cmd = append (cmd , "-a" , pass )
148196 }
149197
150- cmd = append (cmd , getRedisTLSArgs (cr .Spec .TLS , cr .Name + "-leader-0" )... )
198+ cmd = append (cmd , getRedisTLSArgs (cr .Spec .TLS , selectedPod )... )
199+
200+ output , err := executeCommand1 (ctx , client , cr , cmd , selectedPod )
201+ if err != nil {
202+ return fmt .Errorf ("failed to execute rebalance command: %w" , err )
203+ }
151204
152- executeCommand (ctx , client , cr , cmd , cr .Name + "-leader-1" )
205+ logger .V (1 ).Info ("Rebalance command output" , "output" , output )
206+
207+ // Validate that the cluster is now balanced (no empty masters)
208+ if err := validateClusterBalance (ctx , client , cr , false ); err != nil {
209+ return fmt .Errorf ("cluster validation failed after rebalance: %w" , err )
210+ }
211+
212+ return nil
153213}
154214
155215func CheckIfEmptyMasters (ctx context.Context , client kubernetes.Interface , cr * rcvb2.RedisCluster ) {
@@ -175,26 +235,85 @@ func CheckIfEmptyMasters(ctx context.Context, client kubernetes.Interface, cr *r
175235
176236// Rebalance Redis Cluster Would Rebalance the Redis Cluster without using the empty masters
177237func RebalanceRedisCluster (ctx context.Context , client kubernetes.Interface , cr * rcvb2.RedisCluster ) {
238+ logger := log .FromContext (ctx )
239+
240+ // Use retry logic for rebalancing
241+ err := retry .Do (
242+ func () error {
243+ return rebalanceCluster (ctx , client , cr )
244+ },
245+ retry .Attempts (3 ),
246+ retry .Delay (2 * time .Second ),
247+ retry .DelayType (retry .BackOffDelay ),
248+ retry .OnRetry (func (n uint , err error ) {
249+ logger .Info ("Retrying cluster rebalance" , "attempt" , n + 1 , "error" , err )
250+ }),
251+ )
252+ if err != nil {
253+ logger .Error (err , "Failed to rebalance cluster after retries" )
254+ return
255+ }
256+
257+ logger .Info ("Successfully rebalanced cluster" )
258+ }
259+
260+ // rebalanceCluster performs the actual rebalance operation and validates the result
261+ func rebalanceCluster (ctx context.Context , client kubernetes.Interface , cr * rcvb2.RedisCluster ) error {
262+ logger := log .FromContext (ctx )
263+
264+ // Find a healthy leader node to execute the rebalance command
265+ totalRedisLeaderNodes := CheckRedisNodeCount (ctx , client , cr , "leader" )
266+ var selectedPod string
267+ for i := int32 (0 ); i < totalRedisLeaderNodes ; i ++ {
268+ podName := fmt .Sprintf ("%s-leader-%d" , cr .Name , i )
269+ // Test if pod is reachable
270+ redisClient := configureRedisClient (ctx , client , cr , podName )
271+ if _ , err := redisClient .Ping (ctx ).Result (); err == nil {
272+ selectedPod = podName
273+ redisClient .Close ()
274+ break
275+ }
276+ redisClient .Close ()
277+ }
278+
279+ if selectedPod == "" {
280+ return fmt .Errorf ("no healthy leader node found to execute rebalance" )
281+ }
282+
283+ logger .V (1 ).Info ("Executing rebalance" , "pod" , selectedPod )
284+
178285 // cmd = redis-cli --cluster rebalance <redis>:<port> -a <pass>
179286 var cmd []string
180287 pod := RedisDetails {
181- PodName : cr . Name + "-leader-1" ,
288+ PodName : selectedPod ,
182289 Namespace : cr .Namespace ,
183290 }
184291 cmd = []string {"redis-cli" , "--cluster" , "rebalance" }
185292 cmd = append (cmd , getEndpoint (ctx , client , cr , pod ))
186293 if cr .Spec .KubernetesConfig .ExistingPasswordSecret != nil {
187294 pass , err := getRedisPassword (ctx , client , cr .Namespace , * cr .Spec .KubernetesConfig .ExistingPasswordSecret .Name , * cr .Spec .KubernetesConfig .ExistingPasswordSecret .Key )
188295 if err != nil {
189- log . FromContext ( ctx ). Error ( err , "Error in getting redis password" )
296+ return fmt . Errorf ( "error getting redis password: %w" , err )
190297 }
191- cmd = append (cmd , "-a" )
192- cmd = append (cmd , pass )
298+ cmd = append (cmd , "-a" , pass )
193299 }
194300
195- cmd = append (cmd , getRedisTLSArgs (cr .Spec .TLS , cr .Name + "-leader-0" )... )
301+ cmd = append (cmd , getRedisTLSArgs (cr .Spec .TLS , selectedPod )... )
302+
303+ output , err := executeCommand1 (ctx , client , cr , cmd , selectedPod )
304+ if err != nil {
305+ return fmt .Errorf ("failed to execute rebalance command: %w" , err )
306+ }
307+
308+ logger .V (1 ).Info ("Rebalance command output" , "output" , output )
196309
197- executeCommand (ctx , client , cr , cmd , cr .Name + "-leader-1" )
310+ // Validate that the cluster is balanced
311+ // Allow empty masters here as this is called during scale-down
312+ if err := validateClusterBalance (ctx , client , cr , true ); err != nil {
313+ return fmt .Errorf ("cluster validation failed after rebalance: %w" , err )
314+ }
315+
316+ return nil
198317}
199318
200319// Add redis cluster node would add a node to the existing redis cluster using redis-cli
@@ -364,3 +483,76 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *rcvb2
364483 }
365484 return nil
366485}
486+
487+ // validateClusterBalance checks if the cluster is properly balanced
488+ // Returns error if any master has 0 slots (empty master) or if slots are not evenly distributed
489+ func validateClusterBalance (ctx context.Context , client kubernetes.Interface , cr * rcvb2.RedisCluster , allowEmptyMasters bool ) error {
490+ logger := log .FromContext (ctx )
491+ totalRedisLeaderNodes := CheckRedisNodeCount (ctx , client , cr , "leader" )
492+
493+ if totalRedisLeaderNodes == 0 {
494+ return fmt .Errorf ("no leader nodes found in cluster" )
495+ }
496+
497+ // Try to connect to any available leader node
498+ var redisClient * redis.Client
499+ var connectedPod string
500+ for i := 0 ; i < int (totalRedisLeaderNodes ); i ++ {
501+ podName := fmt .Sprintf ("%s-leader-%d" , cr .Name , i )
502+ redisClient = configureRedisClient (ctx , client , cr , podName )
503+ // Test connection
504+ if _ , err := redisClient .Ping (ctx ).Result (); err == nil {
505+ connectedPod = podName
506+ break
507+ }
508+ redisClient .Close ()
509+ }
510+
511+ if redisClient == nil || connectedPod == "" {
512+ return fmt .Errorf ("unable to connect to any leader node" )
513+ }
514+ defer redisClient .Close ()
515+
516+ logger .V (1 ).Info ("Validating cluster balance" , "connectedPod" , connectedPod , "totalLeaders" , totalRedisLeaderNodes )
517+
518+ emptyMasterCount := 0
519+ slotsPerNode := make (map [string ]int )
520+
521+ for i := 0 ; i < int (totalRedisLeaderNodes ); i ++ {
522+ pod := RedisDetails {
523+ PodName : fmt .Sprintf ("%s-leader-%d" , cr .Name , i ),
524+ Namespace : cr .Namespace ,
525+ }
526+ podNodeID := getRedisNodeID (ctx , client , cr , pod )
527+ if podNodeID == "" {
528+ logger .V (1 ).Info ("Could not get node ID for pod" , "pod" , pod .PodName )
529+ continue
530+ }
531+
532+ podSlots := getRedisClusterSlots (ctx , redisClient , podNodeID )
533+ slotCount := 0
534+ if podSlots != "" && podSlots != "0" {
535+ // Parse slot count (podSlots can be like "5461" or "0-5460")
536+ if strings .Contains (podSlots , "-" ) {
537+ slotCount , _ = strconv .Atoi (strings .Split (podSlots , "-" )[1 ])
538+ slotCount ++ // inclusive range
539+ } else {
540+ slotCount , _ = strconv .Atoi (podSlots )
541+ }
542+ }
543+
544+ slotsPerNode [pod .PodName ] = slotCount
545+ if slotCount == 0 {
546+ emptyMasterCount ++
547+ logger .V (1 ).Info ("Found empty master node" , "pod" , pod .PodName , "nodeID" , podNodeID )
548+ }
549+ }
550+
551+ // Check if we have empty masters when we shouldn't
552+ if ! allowEmptyMasters && emptyMasterCount > 0 {
553+ return fmt .Errorf ("found %d empty master nodes, cluster is not balanced" , emptyMasterCount )
554+ }
555+
556+ logger .Info ("Cluster balance validation completed" , "emptyMasters" , emptyMasterCount , "slotsPerNode" , slotsPerNode )
557+ return nil
558+ }
0 commit comments