Skip to content

Commit 808a17c

Browse files
committed
Configure cluster replication
1 parent fe2f62e commit 808a17c

File tree

1 file changed

+108
-1
lines changed

1 file changed

+108
-1
lines changed

internal/controller/valkey_controller.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"math/big"
2929
"net"
3030
"os"
31+
"sort"
3132
"strconv"
3233
"strings"
3334
"text/template"
@@ -461,6 +462,7 @@ func (r *ValkeyReconciler) getPodNames(ctx context.Context, valkey *hyperv1.Valk
461462
for _, pod := range pods.Items {
462463
names = append(names, pod.Name+"."+valkey.Name+"-headless."+valkey.Namespace+".svc")
463464
}
465+
sort.Strings(names)
464466
return names, nil
465467
}
466468

@@ -627,9 +629,114 @@ func (r *ValkeyReconciler) initCluster(ctx context.Context, valkey *hyperv1.Valk
627629
}
628630
}
629631

632+
// ok, now we need to setup replicas.
633+
// First, lets check which nodes have slots assigned, those should be the masters, lets assert that the shard count matches this
634+
clusterNodes, err := r.buildClusterNodeInfo(ctx, clients)
635+
if err != nil {
636+
logger.Error(err, "failed to get cluster nodes")
637+
return err
638+
}
639+
640+
masterCount := 0
641+
for _, node := range clusterNodes {
642+
for _, flag := range node.Flags {
643+
if flag == "master" {
644+
masterCount++
645+
}
646+
}
647+
}
648+
if masterCount != int(valkey.Spec.Shards) {
649+
logger.Info("master count does not match shard count", "masterCount", masterCount, "shardCount", valkey.Spec.Shards)
650+
if masterCount < int(valkey.Spec.Shards) {
651+
return fmt.Errorf("master count is less than shard count, cannot be reconciled")
652+
}
653+
654+
// we need to configure replicas
655+
// first, we need to get the nodes with slot assignments
656+
masterNodes := []ClusterNode{}
657+
slaveNodes := []ClusterNode{}
658+
for _, node := range clusterNodes {
659+
if node.SlotRange != "" {
660+
masterNodes = append(masterNodes, node)
661+
} else {
662+
slaveNodes = append(slaveNodes, node)
663+
}
664+
}
665+
logger.Info("master nodes", "masterNodes", masterNodes)
666+
logger.Info("slave nodes", "slaveNodes", slaveNodes)
667+
668+
if len(slaveNodes) != int(valkey.Spec.Shards)*int(valkey.Spec.Replicas) {
669+
logger.Info("slave count does not match shard*replica count", "slaveCount", len(slaveNodes), "shardCount", valkey.Spec.Shards, "replicaCount", valkey.Spec.Replicas)
670+
return fmt.Errorf("slave count does not match shard*replica count")
671+
}
672+
673+
chunkBy := func(items []ClusterNode, chunkSize int) (chunks [][]ClusterNode) {
674+
for chunkSize < len(items) {
675+
items, chunks = items[chunkSize:], append(chunks, items[0:chunkSize:chunkSize])
676+
}
677+
return append(chunks, items)
678+
}
679+
slaveChunks := chunkBy(slaveNodes, int(valkey.Spec.Replicas))
680+
for i, chunk := range slaveChunks {
681+
for _, slave := range chunk {
682+
if err := clients[slave.PodName].Do(ctx, clients[slave.PodName].B().ClusterReplicate().NodeId(masterNodes[i].ID).Build()).Error(); err != nil {
683+
logger.Error(err, "failed to replicate", "master", masterNodes[i].ID, "replica", slave.ID)
684+
return err
685+
}
686+
}
687+
}
688+
} else {
689+
logger.Info("master count matches shard count")
690+
}
691+
630692
return nil
631693
}
632694

695+
// https://valkey.io/commands/cluster-nodes/
696+
type ClusterNode struct {
697+
PodName string
698+
ID string
699+
Flags []string
700+
SlotRange string
701+
}
702+
703+
func (r *ValkeyReconciler) buildClusterNodeInfo(ctx context.Context, clients map[string]valkeyClient.Client) ([]ClusterNode, error) {
704+
results := []ClusterNode{}
705+
706+
logger := log.FromContext(ctx)
707+
for podName, client := range clients {
708+
clusterNodesStr, err := client.Do(ctx, client.B().ClusterNodes().Build()).ToString()
709+
if err != nil {
710+
logger.Error(err, "failed to get cluster")
711+
return nil, err
712+
}
713+
for _, line := range strings.Split(clusterNodesStr, "\n") {
714+
if strings.Contains(line, "myself") {
715+
strings.Fields(line)
716+
fields := strings.Fields(line)
717+
flagsWithoutMyself := []string{}
718+
flags := strings.Split(fields[2], ",")
719+
for _, flag := range flags {
720+
if flag != "myself" {
721+
flagsWithoutMyself = append(flagsWithoutMyself, flag)
722+
}
723+
}
724+
slotRange := ""
725+
if len(fields) > 8 {
726+
slotRange = fields[8]
727+
}
728+
results = append(results, ClusterNode{
729+
PodName: podName,
730+
ID: strings.ReplaceAll(fields[0], "txt:", ""),
731+
Flags: flagsWithoutMyself,
732+
SlotRange: slotRange,
733+
})
734+
}
735+
}
736+
}
737+
return results, nil
738+
}
739+
633740
func (r *ValkeyReconciler) setClusterAnnounceIp(ctx context.Context, valkey *hyperv1.Valkey) error {
634741
logger := log.FromContext(ctx)
635742

@@ -1598,7 +1705,7 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val
15981705
pods := map[string]string{}
15991706
var tries int
16001707
for {
1601-
if len(pods) != int(valkey.Spec.Shards) {
1708+
if len(pods) != int(valkey.Spec.Shards+valkey.Spec.Shards*valkey.Spec.Replicas) {
16021709
pods, err = r.getPodIPs(ctx, valkey)
16031710
if err != nil {
16041711
logger.Error(err, "failed to get pod ips")

0 commit comments

Comments
 (0)