Skip to content

Commit eb90b5a

Browse files
committed
fix: support cluster creation/scale with NodePort and TLS enabled
When using the NodePort service type, `redis-cli --cluster create node-ip:nodeport` and `redis-cli --cluster add-node node-ip:nodeport` are used. Added support for `cluster-announce-tls-port` when both TLS and NodePort are enabled. Signed-off-by: drivebyer <[email protected]>
1 parent f9808ec commit eb90b5a

File tree

11 files changed

+95
-247
lines changed

11 files changed

+95
-247
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ require (
7070
github.com/prometheus/procfs v0.15.1 // indirect
7171
github.com/sagikazarmark/locafero v0.4.0 // indirect
7272
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
73+
github.com/samber/lo v1.52.0
7374
github.com/sourcegraph/conc v0.3.0 // indirect
7475
github.com/spf13/afero v1.11.0 // indirect
7576
github.com/spf13/cast v1.6.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
203203
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
204204
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
205205
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
206+
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
207+
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
206208
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
207209
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
208210
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=

internal/agent/bootstrap/redis/config.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func GenerateConfig() error {
3838
externalConfigFile, _ = util.CoalesceEnv("EXTERNAL_CONFIG_FILE", "/etc/redis/external.conf.d/redis-additional.conf")
3939
redisMajorVersion, _ = util.CoalesceEnv("REDIS_MAJOR_VERSION", "v7")
4040
redisPort, _ = util.CoalesceEnv("REDIS_PORT", "6379")
41+
nodeport, _ = util.CoalesceEnv("NODEPORT", "false")
42+
tlsMode, _ = util.CoalesceEnv("TLS_MODE", "false")
4143
)
4244

4345
if val, ok := util.CoalesceEnv("REDIS_PASSWORD", ""); ok && val != "" {
@@ -70,7 +72,7 @@ func GenerateConfig() error {
7072
fmt.Println("Setting up redis in standalone mode")
7173
}
7274

73-
if tlsMode, ok := util.CoalesceEnv("TLS_MODE", ""); ok && tlsMode == "true" {
75+
if tlsMode == "true" {
7476
redisTLSCert, _ := util.CoalesceEnv("REDIS_TLS_CERT", "")
7577
redisTLSCertKey, _ := util.CoalesceEnv("REDIS_TLS_CERT_KEY", "")
7678
redisTLSCAKey, _ := util.CoalesceEnv("REDIS_TLS_CA_KEY", "")
@@ -83,7 +85,7 @@ func GenerateConfig() error {
8385

8486
if setupMode, ok := util.CoalesceEnv("SETUP_MODE", ""); ok && setupMode == "cluster" {
8587
cfg.Append("tls-cluster", "yes")
86-
if redisMajorVersion == "v7" {
88+
if redisMajorVersion == "v7" && nodeport == "false" {
8789
cfg.Append("cluster-preferred-endpoint-type", "hostname")
8890
}
8991
}
@@ -108,7 +110,7 @@ func GenerateConfig() error {
108110
fmt.Println("Running without persistence mode")
109111
}
110112

111-
if tlsMode, ok := util.CoalesceEnv("TLS_MODE", ""); ok && tlsMode == "true" {
113+
if tlsMode == "true" {
112114
cfg.Append("port", "0")
113115
cfg.Append("tls-port", redisPort)
114116
} else {
@@ -126,6 +128,9 @@ func GenerateConfig() error {
126128

127129
if clusterAnnouncePort != "" {
128130
cfg.Append("cluster-announce-port", clusterAnnouncePort)
131+
if tlsMode == "true" {
132+
cfg.Append("cluster-announce-tls-port", clusterAnnouncePort)
133+
}
129134
}
130135
if clusterAnnounceBusPort != "" {
131136
cfg.Append("cluster-announce-bus-port", clusterAnnounceBusPort)

internal/k8sutils/cluster-scaling.go

Lines changed: 15 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package k8sutils
33
import (
44
"context"
55
"fmt"
6+
"net"
67
"strconv"
78
"strings"
89

@@ -33,13 +34,7 @@ func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *r
3334
Namespace: cr.Namespace,
3435
}
3536
cmd = []string{"redis-cli", "--cluster", "reshard"}
36-
37-
if *cr.Spec.ClusterVersion == "v7" {
38-
cmd = append(cmd, getRedisHostname(transferPOD, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
39-
} else {
40-
cmd = append(cmd, getRedisServerAddress(ctx, client, transferPOD, *cr.Spec.Port))
41-
}
42-
37+
cmd = append(cmd, getEndpoint(ctx, client, cr, transferPOD))
4338
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
4439
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
4540
if err != nil {
@@ -75,7 +70,6 @@ func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *r
7570

7671
cmd = append(cmd, "--cluster-yes")
7772

78-
log.FromContext(ctx).V(1).Info("redis cluster reshard command is", "Command", cmd)
7973
log.FromContext(ctx).Info(fmt.Sprintf("transferring %s slots from shard %d to shard %d", slots, shardIdx, transferNodeIdx))
8074
executeCommand(ctx, client, cr, cmd, transferNodeName)
8175
log.FromContext(ctx).Info(fmt.Sprintf("transferring %s slots from shard %d to shard %d completed", slots, shardIdx, transferNodeIdx))
@@ -142,15 +136,8 @@ func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.In
142136
Namespace: cr.Namespace,
143137
}
144138
cmd = []string{"redis-cli", "--cluster", "rebalance"}
145-
146-
if *cr.Spec.ClusterVersion == "v7" {
147-
cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
148-
} else {
149-
cmd = append(cmd, getRedisServerAddress(ctx, client, pod, *cr.Spec.Port))
150-
}
151-
139+
cmd = append(cmd, getEndpoint(ctx, client, cr, pod))
152140
cmd = append(cmd, "--cluster-use-empty-masters")
153-
154141
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
155142
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
156143
if err != nil {
@@ -162,7 +149,6 @@ func RebalanceRedisClusterEmptyMasters(ctx context.Context, client kubernetes.In
162149

163150
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
164151

165-
log.FromContext(ctx).V(1).Info("Redis cluster rebalance command is", "Command", cmd)
166152
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1")
167153
}
168154

@@ -196,13 +182,7 @@ func RebalanceRedisCluster(ctx context.Context, client kubernetes.Interface, cr
196182
Namespace: cr.Namespace,
197183
}
198184
cmd = []string{"redis-cli", "--cluster", "rebalance"}
199-
200-
if *cr.Spec.ClusterVersion == "v7" {
201-
cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
202-
} else {
203-
cmd = append(cmd, getRedisServerAddress(ctx, client, pod, *cr.Spec.Port))
204-
}
205-
185+
cmd = append(cmd, getEndpoint(ctx, client, cr, pod))
206186
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
207187
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
208188
if err != nil {
@@ -214,15 +194,13 @@ func RebalanceRedisCluster(ctx context.Context, client kubernetes.Interface, cr
214194

215195
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
216196

217-
log.FromContext(ctx).V(1).Info("Redis cluster rebalance command is", "Command", cmd)
218197
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-1")
219198
}
220199

221200
// Add redis cluster node would add a node to the existing redis cluster using redis-cli
222201
func AddRedisNodeToCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
223-
var cmd []string
202+
cmd := []string{"redis-cli", "--cluster", "add-node"}
224203
activeRedisNode := CheckRedisNodeCount(ctx, client, cr, "leader")
225-
226204
newPod := RedisDetails{
227205
PodName: cr.Name + "-leader-" + strconv.Itoa(int(activeRedisNode)),
228206
Namespace: cr.Namespace,
@@ -231,17 +209,8 @@ func AddRedisNodeToCluster(ctx context.Context, client kubernetes.Interface, cr
231209
PodName: cr.Name + "-leader-0",
232210
Namespace: cr.Namespace,
233211
}
234-
235-
cmd = []string{"redis-cli", "--cluster", "add-node"}
236-
237-
if *cr.Spec.ClusterVersion == "v7" {
238-
cmd = append(cmd, getRedisHostname(newPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
239-
cmd = append(cmd, getRedisHostname(existingPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
240-
} else {
241-
cmd = append(cmd, getRedisServerAddress(ctx, client, newPod, *cr.Spec.Port))
242-
cmd = append(cmd, getRedisServerAddress(ctx, client, existingPod, *cr.Spec.Port))
243-
}
244-
212+
cmd = append(cmd, getEndpoint(ctx, client, cr, newPod))
213+
cmd = append(cmd, getEndpoint(ctx, client, cr, existingPod))
245214
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
246215
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
247216
if err != nil {
@@ -253,7 +222,6 @@ func AddRedisNodeToCluster(ctx context.Context, client kubernetes.Interface, cr
253222

254223
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
255224

256-
log.FromContext(ctx).V(1).Info("Redis cluster add-node command is", "Command", cmd)
257225
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-0")
258226
}
259227

@@ -305,47 +273,25 @@ func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.
305273
followerNodeIDs := getAttachedFollowerNodeIDs(ctx, redisClient, lastLeaderPodNodeID)
306274

307275
cmd = append(cmd, "--cluster", "del-node")
308-
if *cr.Spec.ClusterVersion == "v7" {
309-
cmd = append(cmd, getRedisHostname(existingPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
310-
} else {
311-
cmd = append(cmd, getRedisServerAddress(ctx, client, existingPod, *cr.Spec.Port))
312-
}
313-
276+
cmd = append(cmd, getEndpoint(ctx, client, cr, existingPod))
314277
for _, followerNodeID := range followerNodeIDs {
315278
cmd = append(cmd, followerNodeID)
316-
log.FromContext(ctx).V(1).Info("Redis cluster follower remove command is", "Command", cmd)
317279
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-0")
318280
cmd = cmd[:len(cmd)-1]
319281
}
320282
}
321283

322284
// Remove redis cluster node would remove last node to the existing redis cluster using redis-cli
323285
func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, removePod RedisDetails) {
324-
var cmd []string
325286
redisClient := configureRedisClient(ctx, client, cr, cr.Name+"-leader-0")
326287
defer redisClient.Close()
327-
// currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader")
328-
329288
existingPod := RedisDetails{
330289
PodName: cr.Name + "-leader-0",
331290
Namespace: cr.Namespace,
332291
}
333-
//removePod := RedisDetails{
334-
// PodName: cr.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
335-
// Namespace: cr.Namespace,
336-
//}
337-
338-
cmd = []string{"redis-cli", "--cluster", "del-node"}
339-
340-
if *cr.Spec.ClusterVersion == "v7" {
341-
cmd = append(cmd, getRedisHostname(existingPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
342-
} else {
343-
cmd = append(cmd, getRedisServerAddress(ctx, client, existingPod, *cr.Spec.Port))
344-
}
345-
346-
removePodNodeID := getRedisNodeID(ctx, client, cr, removePod)
347-
cmd = append(cmd, removePodNodeID)
348-
292+
cmd := []string{"redis-cli", "--cluster", "del-node"}
293+
cmd = append(cmd, getEndpoint(ctx, client, cr, existingPod))
294+
cmd = append(cmd, getRedisNodeID(ctx, client, cr, removePod))
349295
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
350296
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
351297
if err != nil {
@@ -354,13 +300,7 @@ func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface
354300
cmd = append(cmd, "-a")
355301
cmd = append(cmd, pass)
356302
}
357-
358303
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.Name+"-leader-0")...)
359-
360-
log.FromContext(ctx).V(1).Info("Redis cluster leader remove command is", "Command", cmd)
361-
if getRedisClusterSlots(ctx, redisClient, removePodNodeID) != "0" {
362-
log.FromContext(ctx).V(1).Info("Skipping execution remove leader not empty", "cmd", cmd)
363-
}
364304
executeCommand(ctx, client, cr, cmd, cr.Name+"-leader-0")
365305
}
366306

@@ -399,17 +339,11 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *rcvb2
399339
PodName: slavePodName,
400340
Namespace: cr.Namespace,
401341
}
402-
403-
cmd = []string{"redis-cli", "-h"}
404-
405-
if *cr.Spec.ClusterVersion == "v7" {
406-
cmd = append(cmd, getRedisHostname(pod, cr, "leader"))
407-
} else {
408-
cmd = append(cmd, getRedisServerIP(ctx, client, pod))
342+
host, port, err := net.SplitHostPort(getEndpoint(ctx, client, cr, pod))
343+
if err != nil {
344+
return err
409345
}
410-
cmd = append(cmd, "-p")
411-
cmd = append(cmd, strconv.Itoa(*cr.Spec.Port))
412-
346+
cmd = []string{"redis-cli", "-h", host, "-p", port}
413347
if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
414348
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
415349
if err != nil {

internal/k8sutils/redis-cluster.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,7 @@ func (service RedisClusterService) CreateRedisClusterService(ctx context.Context
368368
log.FromContext(ctx).Error(err, "Cannot create service for Redis", "Setup.Type", service.RedisServiceRole)
369369
return err
370370
}
371-
additionalServiceType := cr.Spec.KubernetesConfig.GetServiceType()
372-
if additionalServiceType == "NodePort" {
373-
// If NodePort is enabled, we need to create a service for every redis pod.
374-
// Then use --cluster-announce-ip --cluster-announce-port --cluster-announce-bus-port to make cluster.
371+
if cr.Spec.KubernetesConfig.GetServiceType() == "NodePort" {
375372
err = service.createOrUpdateClusterNodePortService(ctx, cr, cl)
376373
if err != nil {
377374
log.FromContext(ctx).Error(err, "Cannot create nodeport service for Redis", "Setup.Type", service.RedisServiceRole)
@@ -383,7 +380,7 @@ func (service RedisClusterService) CreateRedisClusterService(ctx context.Context
383380
additionalExtraPorts = append(additionalExtraPorts, busPort)
384381
}
385382
if cr.Spec.KubernetesConfig.ShouldCreateAdditionalService() {
386-
err = CreateOrUpdateService(ctx, cr.Namespace, additionalObjectMetaInfo, redisClusterAsOwner(cr), disableMetrics, false, additionalServiceType, *cr.Spec.Port, cl, additionalExtraPorts...)
383+
err = CreateOrUpdateService(ctx, cr.Namespace, additionalObjectMetaInfo, redisClusterAsOwner(cr), disableMetrics, false, cr.Spec.KubernetesConfig.GetServiceType(), *cr.Spec.Port, cl, additionalExtraPorts...)
387384
if err != nil {
388385
log.FromContext(ctx).Error(err, "Cannot create additional service for Redis", "Setup.Type", service.RedisServiceRole)
389386
return err

0 commit comments

Comments
 (0)