Skip to content

Commit 4d200a3

Browse files
feat: Round robin where to transfer cluster shards when scaling in a Redis Cluster (#1412)
* round robin where to move shards when scaling in Signed-off-by: dimitar <[email protected]> * rename slot to slots Signed-off-by: dimitar <[email protected]> * rename target to transfer variables for consistency Signed-off-by: dimitar <[email protected]> * swap function parameters for better cohesion Signed-off-by: dimitar <[email protected]> * fail fast and log Signed-off-by: yangw <[email protected]> * try log Signed-off-by: yangw <[email protected]> --------- Signed-off-by: dimitar <[email protected]> Signed-off-by: yangw <[email protected]> Co-authored-by: yangw <[email protected]>
1 parent 261dc1d commit 4d200a3

File tree

2 files changed

+26
-19
lines changed

2 files changed

+26
-19
lines changed

internal/controller/rediscluster/rediscluster_controller.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
106106
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance, shardIdx)
107107
monitoring.RedisClusterRemoveFollowerAttempt.WithLabelValues(instance.Namespace, instance.Name).Inc()
108108
// Step 2 Reshard the Cluster
109-
k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, shardIdx, true)
109+
// We round robin over the remaining nodes to pick a node where to move the shard to.
110+
// This helps reduce the chance of overloading/OOMing the remaining nodes
111+
// and makes the subsequent rebalancing step more efficient.
112+
// TODO: consider doing the resharding in parallel
113+
shardMoveNodeIdx := shardIdx % leaderReplicas
114+
k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, shardIdx, shardMoveNodeIdx, true)
110115
monitoring.RedisClusterReshardTotal.WithLabelValues(instance.Namespace, instance.Name).Inc()
111116
}
112117
logger.Info("Redis cluster is downscaled... Rebalancing the cluster")

internal/k8sutils/cluster-scaling.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,19 @@ import (
1212
"sigs.k8s.io/controller-runtime/pkg/log"
1313
)
1414

15-
// ReshardRedisCluster transfer the slots from the last node to the first node.
15+
// ReshardRedisCluster transfer the slots from the last node to the provided transfer node.
1616
//
17-
// NOTE: when all slot been transferred, the node become slave of the first master node.
18-
func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, shardIdx int32, remove bool) {
19-
redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0")
17+
// NOTE: when all slot been transferred, the node become slave of the transfer node.
18+
func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, shardIdx int32, transferNodeIdx int32, remove bool) {
19+
transferNodeName := fmt.Sprintf("%s-leader-%d", cr.ObjectMeta.Name, transferNodeIdx)
20+
redisClient := configureRedisClient(ctx, client, cr, transferNodeName)
2021
defer redisClient.Close()
2122

2223
var cmd []string
2324

2425
// Transfer Pod details
2526
transferPOD := RedisDetails{
26-
PodName: cr.ObjectMeta.Name + "-leader-0",
27+
PodName: transferNodeName,
2728
Namespace: cr.Namespace,
2829
}
2930
// Remove POD details
@@ -42,13 +43,14 @@ func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *r
4243
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
4344
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
4445
if err != nil {
45-
log.FromContext(ctx).Error(err, "Error in getting redis password")
46+
log.FromContext(ctx).Error(err, "error in getting redis password")
47+
return
4648
}
4749
cmd = append(cmd, "-a")
4850
cmd = append(cmd, pass)
4951
}
5052

51-
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.ObjectMeta.Name+"-leader-0")...)
53+
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, transferNodeName)...)
5254

5355
//--cluster-from <node-id> --cluster-to <node-id> --cluster-slots <number of slots> --cluster-yes
5456

@@ -63,19 +65,20 @@ func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *r
6365
cmd = append(cmd, transferNodeID)
6466

6567
// Cluster Slots
66-
slot := getRedisClusterSlots(ctx, redisClient, removeNodeID)
68+
slots := getRedisClusterSlots(ctx, redisClient, removeNodeID)
69+
if slots == "0" || slots == "" {
70+
log.FromContext(ctx).Info("skipping the execution cmd because no slots found", "Cmd", cmd)
71+
return
72+
}
6773
cmd = append(cmd, "--cluster-slots")
68-
cmd = append(cmd, slot)
74+
cmd = append(cmd, slots)
6975

7076
cmd = append(cmd, "--cluster-yes")
7177

72-
log.FromContext(ctx).V(1).Info("Redis cluster reshard command is", "Command", cmd)
73-
74-
if slot == "0" {
75-
log.FromContext(ctx).V(1).Info("Skipped the execution of", "Cmd", cmd)
76-
return
77-
}
78-
executeCommand(ctx, client, cr, cmd, cr.ObjectMeta.Name+"-leader-0")
78+
log.FromContext(ctx).V(1).Info("redis cluster reshard command is", "Command", cmd)
79+
log.FromContext(ctx).Info(fmt.Sprintf("transferring %s slots from shard %d to shard %d", slots, shardIdx, transferNodeIdx))
80+
executeCommand(ctx, client, cr, cmd, transferNodeName)
81+
log.FromContext(ctx).Info(fmt.Sprintf("transferring %s slots from shard %d to shard %d completed", slots, shardIdx, transferNodeIdx))
7982

8083
if remove {
8184
RemoveRedisNodeFromCluster(ctx, client, cr, removePOD)
@@ -87,7 +90,7 @@ func getRedisClusterSlots(ctx context.Context, redisClient *redis.Client, nodeID
8790

8891
redisSlots, err := redisClient.ClusterSlots(ctx).Result()
8992
if err != nil {
90-
log.FromContext(ctx).Error(err, "Failed to Get Cluster Slots")
93+
log.FromContext(ctx).Error(err, "failed to get cluster slots")
9194
return ""
9295
}
9396
for _, slot := range redisSlots {
@@ -100,7 +103,6 @@ func getRedisClusterSlots(ctx context.Context, redisClient *redis.Client, nodeID
100103
}
101104
}
102105

103-
log.FromContext(ctx).V(1).Info("Total cluster slots to be transferred from", "node", nodeID, "is", totalSlots)
104106
return strconv.Itoa(totalSlots)
105107
}
106108

0 commit comments

Comments
 (0)