Skip to content

Commit 5f80a1a

Browse files
authored
Merge pull request #30 from ucloud/v0.2.1
V0.2.1
2 parents 3eb1ccd + b43f643 commit 5f80a1a

File tree

10 files changed

+346
-17
lines changed

10 files changed

+346
-17
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,5 @@ tags
7979
vendor/*
8080
/main
8181
/Dockerfile-withvendor
82-
Dockerfile-TZSH
82+
Dockerfile-TZSH
83+
skaffold.yaml

pkg/controller/distributedrediscluster/helper.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,16 @@ type IWaitHandle interface {
142142
// we get a result from handler.Handler() or the timeout expires
143143
func waiting(handler IWaitHandle, reqLogger logr.Logger) error {
144144
timeout := time.After(handler.Timeout())
145-
tick := time.Tick(handler.Tick())
145+
tick := time.NewTicker(time.Second)
146+
defer tick.Stop()
146147
// Keep trying until we're timed out or got a result or got an error
147148
for {
148149
select {
149150
// Got a timeout! fail with a timeout error
150151
case <-timeout:
151152
return fmt.Errorf("%s timed out", handler.Name())
152153
// Got a tick, we should check on Handler()
153-
case <-tick:
154+
case <-tick.C:
154155
err := handler.Handler()
155156
if err == nil {
156157
return nil

pkg/controller/distributedrediscluster/sync_handler.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ func (r *ReconcileDistributedRedisCluster) ensureCluster(ctx *syncContext) error
3636
}
3737
return StopRetry.Wrap(err, "stop retry")
3838
}
39+
40+
// Redis only load db from append only file when AOF ON, because of
41+
// we only backed up the RDB file when doing data backup, so we set
42+
// "appendonly no" force here when do restore.
43+
dbLoadedFromDiskWhenRestore(cluster, ctx.reqLogger)
3944
labels := getLabels(cluster)
4045
if err := r.ensurer.EnsureRedisConfigMap(cluster, labels); err != nil {
4146
return Kubernetes.Wrap(err, "EnsureRedisConfigMap")
@@ -98,9 +103,19 @@ func (r *ReconcileDistributedRedisCluster) validate(cluster *redisv1alpha1.Distr
98103
if update || updateDefault {
99104
return r.crController.UpdateCR(cluster)
100105
}
106+
101107
return nil
102108
}
103109

110+
func dbLoadedFromDiskWhenRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) {
111+
if cluster.IsRestoreFromBackup() && !cluster.IsRestored() {
112+
if cluster.Spec.Config != nil {
113+
reqLogger.Info("force appendonly = no when do restore")
114+
cluster.Spec.Config["appendonly"] = "no"
115+
}
116+
}
117+
}
118+
104119
func (r *ReconcileDistributedRedisCluster) validateRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) (bool, error) {
105120
update := false
106121
if cluster.Status.Restore.Backup == nil {

pkg/controller/manager/ensurer.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package manager
22

33
import (
44
"strconv"
5+
"strings"
56

67
"github.com/go-logr/logr"
78
appsv1 "k8s.io/api/apps/v1"
@@ -189,7 +190,7 @@ func (r *realEnsureResource) EnsureRedisSvc(cluster *redisv1alpha1.DistributedRe
189190

190191
func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error {
191192
cmName := configmaps.RedisConfigMapName(cluster.Name)
192-
_, err := r.configMapClient.GetConfigMap(cluster.Namespace, cmName)
193+
drcCm, err := r.configMapClient.GetConfigMap(cluster.Namespace, cmName)
193194
if err != nil {
194195
if errors.IsNotFound(err) {
195196
r.logger.WithValues("ConfigMap.Namespace", cluster.Namespace, "ConfigMap.Name", cmName).
@@ -201,6 +202,13 @@ func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.Distrib
201202
} else {
202203
return err
203204
}
205+
} else {
206+
if isRedisConfChanged(drcCm.Data[configmaps.RedisConfKey], cluster.Spec.Config, r.logger) {
207+
cm := configmaps.NewConfigMapForCR(cluster, labels)
208+
if err2 := r.configMapClient.UpdateConfigMap(cm); err2 != nil {
209+
return err2
210+
}
211+
}
204212
}
205213

206214
if cluster.IsRestoreFromBackup() {
@@ -237,3 +245,26 @@ func (r *realEnsureResource) EnsureRedisOSMSecret(cluster *redisv1alpha1.Distrib
237245
}
238246
return nil
239247
}
248+
249+
func isRedisConfChanged(confInCm string, currentConf map[string]string, log logr.Logger) bool {
250+
lines := strings.Split(strings.TrimSuffix(confInCm, "\n"), "\n")
251+
if len(lines) != len(currentConf) {
252+
return true
253+
}
254+
for _, line := range lines {
255+
line = strings.TrimSuffix(line, " ")
256+
confLine := strings.SplitN(line, " ", 2)
257+
if len(confLine) == 2 {
258+
if valueInCurrentConf, ok := currentConf[confLine[0]]; !ok {
259+
return true
260+
} else {
261+
if valueInCurrentConf != confLine[1] {
262+
return true
263+
}
264+
}
265+
} else {
266+
log.Info("custom config is invalid", "raw", line, "split", confLine)
267+
}
268+
}
269+
return false
270+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package manager
2+
3+
import (
4+
"testing"
5+
6+
"github.com/go-logr/logr"
7+
logf "sigs.k8s.io/controller-runtime/pkg/log"
8+
)
9+
10+
var log = logf.Log.WithName("test")
11+
12+
func Test_isRedisConfChanged(t *testing.T) {
13+
type args struct {
14+
confInCm string
15+
currentConf map[string]string
16+
log logr.Logger
17+
}
18+
tests := []struct {
19+
name string
20+
args args
21+
want bool
22+
}{
23+
{
24+
name: "should false",
25+
args: args{
26+
confInCm: `appendfsync everysec
27+
appendonly yes
28+
auto-aof-rewrite-min-size 67108864
29+
save 900 1 300 10`,
30+
currentConf: map[string]string{
31+
"appendfsync": "everysec",
32+
"appendonly": "yes",
33+
"auto-aof-rewrite-min-size": "67108864",
34+
"save": "900 1 300 10",
35+
},
36+
log: log,
37+
},
38+
want: false,
39+
},
40+
{
41+
name: "should false with newline",
42+
args: args{
43+
confInCm: `appendfsync everysec
44+
appendonly yes
45+
auto-aof-rewrite-min-size 67108864
46+
save 900 1 300 10
47+
`,
48+
currentConf: map[string]string{
49+
"appendfsync": "everysec",
50+
"appendonly": "yes",
51+
"auto-aof-rewrite-min-size": "67108864",
52+
"save": "900 1 300 10",
53+
},
54+
log: log,
55+
},
56+
want: false,
57+
},
58+
{
59+
name: "should true, compare value",
60+
args: args{
61+
confInCm: `appendfsync everysec
62+
appendonly yes
63+
auto-aof-rewrite-min-size 6710886
64+
save 900 1 300 10
65+
`,
66+
currentConf: map[string]string{
67+
"appendfsync": "everysec",
68+
"appendonly": "yes",
69+
"auto-aof-rewrite-min-size": "67108864",
70+
"save": "900 1 300 10",
71+
},
72+
log: log,
73+
},
74+
want: true,
75+
},
76+
{
77+
name: "should true, add current",
78+
args: args{
79+
confInCm: `appendfsync everysec
80+
appendonly yes
81+
save 900 1 300 10
82+
`,
83+
currentConf: map[string]string{
84+
"appendfsync": "everysec",
85+
"appendonly": "yes",
86+
"auto-aof-rewrite-min-size": "67108864",
87+
"save": "900 1 300 10",
88+
},
89+
log: log,
90+
},
91+
want: true,
92+
},
93+
{
94+
name: "should true, del current",
95+
args: args{
96+
confInCm: `appendfsync everysec
97+
appendonly yes
98+
auto-aof-rewrite-min-size 67108864
99+
save 900 1 300 10
100+
`,
101+
currentConf: map[string]string{
102+
"appendfsync": "everysec",
103+
"appendonly": "yes",
104+
"save": "900 1 300 10",
105+
},
106+
log: log,
107+
},
108+
want: true,
109+
},
110+
{
111+
name: "should true, compare key",
112+
args: args{
113+
confInCm: `appendfsync everysec
114+
appendonly yes
115+
save 900 1 300 10
116+
`,
117+
currentConf: map[string]string{
118+
"appendonly": "yes",
119+
"auto-aof-rewrite-min-size": "67108864",
120+
"save": "900 1 300 10",
121+
},
122+
log: log,
123+
},
124+
want: true,
125+
},
126+
{
127+
name: "should true, compare save",
128+
args: args{
129+
confInCm: `appendfsync everysec
130+
appendonly yes
131+
auto-aof-rewrite-min-size 67108864
132+
save 900 1 300 10
133+
`,
134+
currentConf: map[string]string{
135+
"appendfsync": "everysec",
136+
"appendonly": "yes",
137+
"auto-aof-rewrite-min-size": "67108864",
138+
"save": "900 1",
139+
},
140+
log: log,
141+
},
142+
want: true,
143+
},
144+
}
145+
for _, tt := range tests {
146+
t.Run(tt.name, func(t *testing.T) {
147+
if got := isRedisConfChanged(tt.args.confInCm, tt.args.currentConf, tt.args.log); got != tt.want {
148+
t.Errorf("isRedisConfChanged() = %v, want %v", got, tt.want)
149+
}
150+
})
151+
}
152+
}

pkg/redisutil/admin.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -388,12 +388,7 @@ func (a *Admin) MigrateKeys(addr string, dest *Node, slots []Slot, batch int, ti
388388
break
389389
}
390390

391-
var args []string
392-
if replace {
393-
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "REPLACE", "KEYS"}, keys...)
394-
} else {
395-
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "KEYS"}, keys...)
396-
}
391+
args := a.migrateCmdArgs(dest, timeoutStr, replace, keys)
397392

398393
resp = c.Cmd("MIGRATE", args)
399394
if err := a.Connections().ValidateResp(resp, addr, "Unable to run command MIGRATE"); err != nil {
@@ -432,12 +427,7 @@ func (a *Admin) MigrateKeysInSlot(addr string, dest *Node, slot Slot, batch int,
432427
break
433428
}
434429

435-
var args []string
436-
if replace {
437-
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "REPLACE", "KEYS"}, keys...)
438-
} else {
439-
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "KEYS"}, keys...)
440-
}
430+
args := a.migrateCmdArgs(dest, timeoutStr, replace, keys)
441431

442432
resp = c.Cmd("MIGRATE", args)
443433
if err := a.Connections().ValidateResp(resp, addr, "Unable to run command MIGRATE"); err != nil {
@@ -448,6 +438,20 @@ func (a *Admin) MigrateKeysInSlot(addr string, dest *Node, slot Slot, batch int,
448438
return keyCount, nil
449439
}
450440

441+
func (a *Admin) migrateCmdArgs(dest *Node, timeoutStr string, replace bool, keys []string) []string {
442+
args := []string{dest.IP, dest.Port, "", "0", timeoutStr}
443+
if password, ok := a.Connections().GetAUTH(); ok {
444+
args = append(args, "AUTH", password)
445+
}
446+
if replace {
447+
args = append(args, "REPLACE", "KEYS")
448+
} else {
449+
args = append(args, "KEYS")
450+
}
451+
args = append(args, keys...)
452+
return args
453+
}
454+
451455
// ForgetNode used to force other redis cluster node to forget a specific node
452456
func (a *Admin) ForgetNode(id string) error {
453457
infos, _ := a.GetClusterInfos()

pkg/redisutil/connections.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ type IAdminConnections interface {
5858
ValidatePipeResp(c IClient, addr, errMessage string) bool
5959
// Reset close all connections and clear the connection map
6060
Reset()
61+
// GetAUTH return password and true if connection password is set, else return false.
62+
GetAUTH() (string, bool)
6163
}
6264

6365
// AdminConnections connection map for redis cluster
@@ -98,6 +100,14 @@ func NewAdminConnections(addrs []string, options *AdminOptions, log logr.Logger)
98100
return cnx
99101
}
100102

103+
// GetAUTH return password and true if connection password is set, else return false.
104+
func (cnx *AdminConnections) GetAUTH() (string, bool) {
105+
if len(cnx.password) > 0 {
106+
return cnx.password, true
107+
}
108+
return "", false
109+
}
110+
101111
// Reconnect force a reconnection on the given address
102112
// is the adress is not part of the map, act like Add
103113
func (cnx *AdminConnections) Reconnect(addr string) error {

0 commit comments

Comments
 (0)