11package distributedrediscluster
22
33import (
4+ "fmt"
45 "net"
56 "time"
67
78 redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1"
89 "github.com/ucloud/redis-cluster-operator/pkg/controller/clustering"
10+ "github.com/ucloud/redis-cluster-operator/pkg/k8sutil"
911 "github.com/ucloud/redis-cluster-operator/pkg/redisutil"
1012)
1113
@@ -15,18 +17,37 @@ const (
1517)
1618
1719func (r * ReconcileDistributedRedisCluster ) waitPodReady (cluster * redisv1alpha1.DistributedRedisCluster ) error {
18- cluster .Validate ()
20+ if err := r .validate (cluster ); err != nil {
21+ if k8sutil .IsRequestRetryable (err ) {
22+ return Kubernetes .Wrap (err , "Validate" )
23+ }
24+ return StopRetry .Wrap (err , "stop retry" )
25+ }
1926 // step 1. apply statefulSet for cluster
2027 labels := getLabels (cluster )
28+ var backup * redisv1alpha1.RedisClusterBackup
29+ var err error
30+ if cluster .Spec .Init != nil {
31+ backup , err = r .crController .GetRedisClusterBackup (cluster .Spec .Init .BackupSource .Namespace , cluster .Spec .Init .BackupSource .Name )
32+ if err != nil {
33+ return err
34+ }
35+ }
2136 if err := r .ensurer .EnsureRedisConfigMap (cluster , labels ); err != nil {
2237 return Kubernetes .Wrap (err , "EnsureRedisConfigMap" )
2338 }
24- if err := r .ensurer .EnsureRedisStatefulset (cluster , labels ); err != nil {
39+ if err := r .ensurer .EnsureRedisStatefulset (cluster , backup , labels ); err != nil {
2540 return Kubernetes .Wrap (err , "EnsureRedisStatefulset" )
2641 }
2742 if err := r .ensurer .EnsureRedisHeadLessSvc (cluster , labels ); err != nil {
2843 return Kubernetes .Wrap (err , "EnsureRedisHeadLessSvc" )
2944 }
45+ if err := r .ensurer .EnsureRedisOSMSecret (cluster , backup , labels ); err != nil {
46+ if k8sutil .IsRequestRetryable (err ) {
47+ return Kubernetes .Wrap (err , "EnsureRedisOSMSecret" )
48+ }
49+ return StopRetry .Wrap (err , "stop retry" )
50+ }
3051
3152 // step 2. wait for all redis node ready
3253 if err := r .checker .CheckRedisNodeNum (cluster ); err != nil {
@@ -36,6 +57,33 @@ func (r *ReconcileDistributedRedisCluster) waitPodReady(cluster *redisv1alpha1.D
3657 return nil
3758}
3859
60+ func (r * ReconcileDistributedRedisCluster ) validate (cluster * redisv1alpha1.DistributedRedisCluster ) error {
61+ initSpec := cluster .Spec .Init
62+ if initSpec != nil {
63+ if initSpec .BackupSource == nil {
64+ return fmt .Errorf ("backupSource is required" )
65+ }
66+ backup , err := r .crController .GetRedisClusterBackup (initSpec .BackupSource .Namespace , initSpec .BackupSource .Name )
67+ if err != nil {
68+ return err
69+ }
70+ if backup .Status .Phase != redisv1alpha1 .BackupPhaseSucceeded {
71+ return fmt .Errorf ("backup is still running" )
72+ }
73+ if cluster .Spec .Image == "" {
74+ cluster .Spec .Image = backup .Status .ClusterImage
75+ }
76+ cluster .Spec .MasterSize = backup .Status .MasterSize
77+ if cluster .Status .RestoreSucceeded <= 0 {
78+ cluster .Spec .ClusterReplicas = 0
79+ } else {
80+ cluster .Spec .ClusterReplicas = backup .Status .ClusterReplicas
81+ }
82+ }
83+ cluster .Validate ()
84+ return nil
85+ }
86+
3987func (r * ReconcileDistributedRedisCluster ) waitForClusterJoin (cluster * redisv1alpha1.DistributedRedisCluster , clusterInfos * redisutil.ClusterInfos , admin redisutil.IAdmin ) error {
4088 logger := log .WithValues ("namespace" , cluster .Namespace , "name" , cluster .Name )
4189 //logger.Info(">>> Assign a different config epoch to each node")
@@ -221,11 +269,11 @@ func (r *ReconcileDistributedRedisCluster) ensureCluster(cluster *redisv1alpha1.
221269 return Cluster .Wrap (err , "newRedisCluster" )
222270 }
223271
224- currentMasterNodes := nodes .FilterByFunc (redisutil .IsMasterWithSlot )
225- if len (currentMasterNodes ) == int (cluster .Spec .MasterSize ) {
226- logger .V (3 ).Info ("cluster ok" )
227- return nil
228- }
272+ // currentMasterNodes := nodes.FilterByFunc(redisutil.IsMasterWithSlot)
273+ // if len(currentMasterNodes) == int(cluster.Spec.MasterSize) {
274+ // logger.V(3).Info("cluster ok")
275+ // return nil
276+ // }
229277
230278 // First, we define the new masters
231279 newMasters , curMasters , allMaster , err := clustering .DispatchMasters (rCluster , nodes , cNbMaster )
@@ -280,6 +328,15 @@ func (r *ReconcileDistributedRedisCluster) ensureCluster(cluster *redisv1alpha1.
280328 if err := clustering .RebalancedCluster (admin , newMasters ); err != nil {
281329 return Cluster .Wrap (err , "RebalancedCluster" )
282330 }
331+ } else if len (newMasters ) == len (curMasters ) {
332+ newRedisSlavesByMaster , bestEffort := clustering .PlaceSlaves (rCluster , newMasters , currentSlaveNodes , newSlave , cReplicaFactor )
333+ if bestEffort {
334+ rCluster .NodesPlacement = redisv1alpha1 .NodesPlacementInfoBestEffort
335+ }
336+
337+ if err := clustering .AttachingSlavesToMaster (rCluster , admin , newRedisSlavesByMaster ); err != nil {
338+ return Cluster .Wrap (err , "AttachingSlavesToMaster" )
339+ }
283340 }
284341
285342 return nil
0 commit comments