Skip to content

Commit 5a3b191

Browse files
committed
add phase for restore
1 parent 31885ed commit 5a3b191

File tree

9 files changed

+101
-21
lines changed

9 files changed

+101
-21
lines changed

pkg/apis/redis/v1alpha1/constants.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ const (
5959
NodesPlacementInfoOptimal NodesPlacementInfo = "Optimal"
6060
)
6161

62+
type RestorePhase string
63+
64+
const (
65+
// RestorePhaseRunning used for Restore that are currently running.
66+
RestorePhaseRunning RestorePhase = "Running"
67+
// RestorePhaseRestart used for Restore that are restart master nodes.
68+
RestorePhaseRestart RestorePhase = "Restart"
69+
// RestorePhaseSucceeded used for Restore that are Succeeded.
70+
RestorePhaseSucceeded RestorePhase = "Succeeded"
71+
)
72+
6273
const (
6374
DatabaseNamePrefix = "redis"
6475

pkg/apis/redis/v1alpha1/default.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,19 @@ func (in *DistributedRedisCluster) IsRestoreFromBackup() bool {
7070
}
7171

7272
func (in *DistributedRedisCluster) IsRestored() bool {
73-
return in.Status.Restore.RestoreSucceeded > 0
73+
return in.Status.Restore.Phase == RestorePhaseSucceeded
74+
}
75+
76+
func (in *DistributedRedisCluster) ShouldInitRestorePhase() bool {
77+
return in.Status.Restore.Phase == ""
78+
}
79+
80+
func (in *DistributedRedisCluster) IsRestoreRunning() bool {
81+
return in.Status.Restore.Phase == RestorePhaseRunning
82+
}
83+
84+
func (in *DistributedRedisCluster) IsRestoreRestarting() bool {
85+
return in.Status.Restore.Phase == RestorePhaseRestart
7486
}
7587

7688
func defaultResource() *v1.ResourceRequirements {

pkg/apis/redis/v1alpha1/distributedrediscluster_types.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,8 @@ type DistributedRedisClusterStatus struct {
116116
}
117117

118118
type Restore struct {
119-
// The number of restore which reached phase Succeeded.
120-
RestoreSucceeded int32 `json:"restoreSucceeded,omitempty"`
121-
Backup *RedisClusterBackup `json:"backup, omitempty"`
122-
//BackupSourceSpec `json:",inline"`
119+
Phase RestorePhase `json:"phase,omitempty"`
120+
Backup *RedisClusterBackup `json:"backup, omitempty"`
123121
}
124122

125123
// RedisClusterNode represent a RedisCluster Node

pkg/controller/distributedrediscluster/distributedrediscluster_controller.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,35 @@ func (r *ReconcileDistributedRedisCluster) Reconcile(request reconcile.Request)
249249
return reconcile.Result{}, err
250250
}
251251

252-
// update cr and wait for the next Reconcile loop
253-
if instance.IsRestoreFromBackup() && !instance.IsRestored() {
252+
// mark .Status.Restore.Phase = RestorePhaseRestart, will
253+
// remove init container and restore volume that referenced in stateulset for
254+
// dump RDB file from backup, then the redis master node will be restart.
255+
if instance.IsRestoreFromBackup() && instance.IsRestoreRunning() {
254256
reqLogger.Info("update restore redis cluster cr")
255-
instance.Status.Restore.RestoreSucceeded = 1
257+
instance.Status.Restore.Phase = redisv1alpha1.RestorePhaseRestart
258+
if err := r.crController.UpdateCRStatus(instance); err != nil {
259+
return reconcile.Result{}, err
260+
}
261+
if err := r.ensurer.UpdateRedisStatefulsets(instance, getLabels(instance)); err != nil {
262+
return reconcile.Result{}, err
263+
}
264+
waiter := &waitStatefulSetUpdating{
265+
name: "waitMasterNodeRestarting",
266+
timeout: 60 * time.Second,
267+
tick: 5 * time.Second,
268+
statefulSetController: r.statefulSetController,
269+
cluster: instance,
270+
}
271+
if err := waiting(waiter, ctx.reqLogger); err != nil {
272+
return reconcile.Result{}, err
273+
}
274+
return reconcile.Result{Requeue: true}, nil
275+
}
276+
277+
// restore succeeded, then update cr and wait for the next Reconcile loop
278+
if instance.IsRestoreFromBackup() && instance.IsRestoreRestarting() {
279+
reqLogger.Info("update restore redis cluster cr")
280+
instance.Status.Restore.Phase = redisv1alpha1.RestorePhaseSucceeded
256281
if err := r.crController.UpdateCRStatus(instance); err != nil {
257282
return reconcile.Result{}, err
258283
}

pkg/controller/distributedrediscluster/status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func compareStatus(old, new *redisv1alpha1.DistributedRedisClusterStatus, reqLog
144144
return true
145145
}
146146

147-
if utils.CompareInt32("restoreSucceeded", old.Restore.RestoreSucceeded, new.Restore.RestoreSucceeded, reqLogger) {
147+
if utils.CompareStringValue("restoreSucceeded", string(old.Restore.Phase), string(new.Restore.Phase), reqLogger) {
148148
return true
149149
}
150150

pkg/controller/distributedrediscluster/sync_handler.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type syncContext struct {
3030

3131
func (r *ReconcileDistributedRedisCluster) ensureCluster(ctx *syncContext) error {
3232
cluster := ctx.cluster
33-
if err := r.validate(cluster, ctx.reqLogger); err != nil {
33+
if err := r.validateAndSetDefault(cluster, ctx.reqLogger); err != nil {
3434
if k8sutil.IsRequestRetryable(err) {
3535
return Kubernetes.Wrap(err, "Validate")
3636
}
@@ -89,16 +89,22 @@ func (r *ReconcileDistributedRedisCluster) waitPodReady(ctx *syncContext) error
8989
return nil
9090
}
9191

92-
func (r *ReconcileDistributedRedisCluster) validate(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) error {
92+
func (r *ReconcileDistributedRedisCluster) validateAndSetDefault(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) error {
9393
var update bool
9494
var err error
9595

96-
if cluster.IsRestoreFromBackup() && !cluster.IsRestored() {
97-
update, err = r.validateRestore(cluster, reqLogger)
96+
if cluster.IsRestoreFromBackup() && cluster.ShouldInitRestorePhase() {
97+
update, err = r.initRestore(cluster, reqLogger)
9898
if err != nil {
9999
return err
100100
}
101101
}
102+
103+
if cluster.IsRestoreFromBackup() && (cluster.IsRestoreRunning() || cluster.IsRestoreRestarting()) {
104+
// Set ClusterReplicas = 0, only start master node in first reconcile loop when do restore
105+
cluster.Spec.ClusterReplicas = 0
106+
}
107+
102108
updateDefault := cluster.DefaultSpec(reqLogger)
103109
if update || updateDefault {
104110
return r.crController.UpdateCR(cluster)
@@ -116,7 +122,7 @@ func dbLoadedFromDiskWhenRestore(cluster *redisv1alpha1.DistributedRedisCluster,
116122
}
117123
}
118124

119-
func (r *ReconcileDistributedRedisCluster) validateRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) (bool, error) {
125+
func (r *ReconcileDistributedRedisCluster) initRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) (bool, error) {
120126
update := false
121127
if cluster.Status.Restore.Backup == nil {
122128
initSpec := cluster.Spec.Init
@@ -130,6 +136,10 @@ func (r *ReconcileDistributedRedisCluster) validateRestore(cluster *redisv1alpha
130136
return update, fmt.Errorf("backup is still running")
131137
}
132138
cluster.Status.Restore.Backup = backup
139+
cluster.Status.Restore.Phase = redisv1alpha1.RestorePhaseRunning
140+
if err := r.crController.UpdateCRStatus(cluster); err != nil {
141+
return update, err
142+
}
133143
}
134144
backup := cluster.Status.Restore.Backup
135145
if cluster.Spec.Image == "" {
@@ -140,8 +150,7 @@ func (r *ReconcileDistributedRedisCluster) validateRestore(cluster *redisv1alpha
140150
cluster.Spec.MasterSize = backup.Status.MasterSize
141151
update = true
142152
}
143-
// Set ClusterReplicas = 0, only start master node in first reconcile loop when do restore
144-
cluster.Spec.ClusterReplicas = 0
153+
145154
return update, nil
146155
}
147156

pkg/controller/manager/ensurer.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package manager
22

33
import (
4-
"strconv"
54
"strings"
65

76
"github.com/go-logr/logr"
@@ -25,6 +24,7 @@ type IEnsureResource interface {
2524
EnsureRedisSvc(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error
2625
EnsureRedisConfigMap(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error
2726
EnsureRedisRCloneSecret(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error
27+
UpdateRedisStatefulsets(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error
2828
}
2929

3030
type realEnsureResource struct {
@@ -223,7 +223,7 @@ func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.Distrib
223223
}
224224
return err
225225
}
226-
if restoreCm.Data[configmaps.RestoreSucceeded] != strconv.Itoa(int(cluster.Status.Restore.RestoreSucceeded)) {
226+
if cluster.Status.Restore.Phase == redisv1alpha1.RestorePhaseRestart && restoreCm.Data[configmaps.RestoreSucceeded] == "0" {
227227
cm := configmaps.NewConfigMapForRestore(cluster, labels)
228228
return r.configMapClient.UpdateConfigMap(cm)
229229
}
@@ -268,3 +268,27 @@ func isRedisConfChanged(confInCm string, currentConf map[string]string, log logr
268268
}
269269
return false
270270
}
271+
272+
func (r *realEnsureResource) UpdateRedisStatefulsets(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error {
273+
for i := 0; i < int(cluster.Spec.MasterSize); i++ {
274+
name := statefulsets.ClusterStatefulSetName(cluster.Name, i)
275+
svcName := statefulsets.ClusterHeadlessSvcName(cluster.Spec.ServiceName, i)
276+
// assign label
277+
labels[redisv1alpha1.StatefulSetLabel] = name
278+
if err := r.updateRedisStatefulset(cluster, name, svcName, labels); err != nil {
279+
return err
280+
}
281+
}
282+
return nil
283+
}
284+
285+
func (r *realEnsureResource) updateRedisStatefulset(cluster *redisv1alpha1.DistributedRedisCluster, ssName, svcName string,
286+
labels map[string]string) error {
287+
r.logger.WithValues("StatefulSet.Namespace", cluster.Namespace, "StatefulSet.Name", ssName).
288+
Info("updating statefulSet immediately")
289+
newSS, err := statefulsets.NewStatefulSetForCR(cluster, ssName, svcName, labels)
290+
if err != nil {
291+
return err
292+
}
293+
return r.statefulSetClient.UpdateStatefulSet(newSS)
294+
}

pkg/resources/configmaps/configmap.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"fmt"
66
"sort"
7+
"strconv"
78

89
corev1 "k8s.io/api/core/v1"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -106,7 +107,7 @@ func NewConfigMapForRestore(cluster *redisv1alpha1.DistributedRedisCluster, labe
106107
OwnerReferences: redisv1alpha1.DefaultOwnerReferences(cluster),
107108
},
108109
Data: map[string]string{
109-
RestoreSucceeded: fmt.Sprintf("%d", cluster.Status.Restore.RestoreSucceeded),
110+
RestoreSucceeded: strconv.Itoa(0),
110111
},
111112
}
112113
}

pkg/resources/statefulsets/statefulset.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func NewStatefulSetForCR(cluster *redisv1alpha1.DistributedRedisCluster, ssName,
8585
if spec.Monitor != nil {
8686
ss.Spec.Template.Spec.Containers = append(ss.Spec.Template.Spec.Containers, redisExporterContainer(cluster, password))
8787
}
88-
if cluster.IsRestoreFromBackup() && !cluster.IsRestored() && cluster.Status.Restore.Backup != nil {
88+
if cluster.IsRestoreFromBackup() && cluster.IsRestoreRunning() && cluster.Status.Restore.Backup != nil {
8989
initContainer, err := redisInitContainer(cluster, password)
9090
if err != nil {
9191
return nil, err
@@ -434,7 +434,7 @@ func redisVolumes(cluster *redisv1alpha1.DistributedRedisCluster) []corev1.Volum
434434

435435
if !cluster.IsRestoreFromBackup() ||
436436
cluster.Status.Restore.Backup == nil ||
437-
cluster.IsRestored() {
437+
!cluster.IsRestoreRunning() {
438438
return volumes
439439
}
440440

0 commit comments

Comments
 (0)