Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/rediscluster/v1beta2/rediscluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ type RedisClusterStatus struct {
// +kubebuilder:default=0
ReadyLeaderReplicas int32 `json:"readyLeaderReplicas,omitempty"`
// +kubebuilder:default=0
ReadyFollowerReplicas int32 `json:"readyFollowerReplicas,omitempty"`
ReadyFollowerReplicas int32 `json:"readyFollowerReplicas,omitempty"`
TLSLastVersion string `json:"tlsLastVersion,omitempty"`
}

type RedisClusterState string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7852,6 +7852,8 @@ spec:
type: string
state:
type: string
tlsLastVersion:
type: string
type: object
required:
- spec
Expand Down
13 changes: 7 additions & 6 deletions internal/cmd/manager/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu
return err
}
if err := (&redisclustercontroller.Reconciler{
Client: mgr.GetClient(),
K8sClient: k8sClient,
Healer: healer,
Checker: redis.NewChecker(k8sClient),
Recorder: mgr.GetEventRecorderFor("rediscluster-controller"),
StatefulSet: k8sutils.NewStatefulSetService(k8sClient),
Client: mgr.GetClient(),
K8sClient: k8sClient,
Healer: healer,
Checker: redis.NewChecker(k8sClient),
Recorder: mgr.GetEventRecorderFor("rediscluster-controller"),
StatefulSet: k8sutils.NewStatefulSetService(k8sClient),
ResourceWatcher: intctrlutil.NewResourceWatcher(),
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RedisCluster")
return err
Expand Down
79 changes: 75 additions & 4 deletions internal/controller/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package rediscluster
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/types"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/handler"
"strconv"
"time"

rcvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/rediscluster/v1beta2"
Expand Down Expand Up @@ -49,10 +52,11 @@ const (
type Reconciler struct {
client.Client
k8sutils.StatefulSet
Healer redis.Healer
Checker redis.Checker
K8sClient kubernetes.Interface
Recorder record.EventRecorder
Healer redis.Healer
Checker redis.Checker
K8sClient kubernetes.Interface
Recorder record.EventRecorder
ResourceWatcher *intctrlutil.ResourceWatcher
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -84,6 +88,21 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return intctrlutil.RequeueE(ctx, err, "failed to add finalizer")
}

// hotreload tls
if instance.Spec.TLS != nil {
r.ResourceWatcher.Watch(
ctx,
types.NamespacedName{
Namespace: instance.Namespace,
Name: instance.Spec.TLS.Secret.SecretName,
},
types.NamespacedName{
Namespace: instance.Namespace,
Name: instance.Name,
},
)
}

// Check if the cluster is downscaled
if leaderCount := r.GetStatefulSetReplicas(ctx, instance.Namespace, instance.Name+"-leader"); leaderReplicas < leaderCount {
if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") || !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower") {
Expand Down Expand Up @@ -195,6 +214,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return intctrlutil.Reconciled()
}

if instance.Spec.TLS != nil {
err := r.reloadTLS(ctx, instance, int(leaderReplicas), int(followerReplicas))
if err != nil {
log.FromContext(ctx).Error(err, "hotReloadTLS failed, will retry later")
return intctrlutil.RequeueAfter(ctx, 30*time.Second, "Retry hotReloadTLS")
}
}

// Mark the cluster status as bootstrapping if all the leader and follower nodes are ready
if instance.Status.ReadyLeaderReplicas != leaderReplicas || instance.Status.ReadyFollowerReplicas != followerReplicas {
err = r.updateStatus(ctx, instance, rcvb2.RedisClusterStatus{
Expand Down Expand Up @@ -334,6 +361,48 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
}

func (r *Reconciler) reloadTLS(ctx context.Context, rc *rcvb2.RedisCluster, leaderReplicas, followerReplicas int) error {
secretName := rc.Spec.TLS.Secret.SecretName
var tlsSecret corev1.Secret

if err := r.Get(ctx, client.ObjectKey{Name: secretName, Namespace: rc.Namespace}, &tlsSecret); err != nil {
return fmt.Errorf("failed to get TLS secret %s/%s: %w", rc.Namespace, rc.Name, err)
}

if rc.Status.TLSLastVersion == tlsSecret.ResourceVersion {
return nil
}

log.FromContext(ctx).Info("hotReloadTLS: reloading TLS configuration")
for i := 0; i < followerReplicas; i++ {
err := k8sutils.HotReloadTLS(ctx, r.K8sClient, rc, rc.Name+"-follower-"+strconv.Itoa(i))
if err != nil {
return fmt.Errorf("RedisCluster controller -> failed reloading tls in follower: %w", err)
}
}
for j := 0; j < leaderReplicas; j++ {
err := k8sutils.HotReloadTLS(ctx, r.K8sClient, rc, rc.Name+"-leader-"+strconv.Itoa(j))
if err != nil {
return fmt.Errorf("RedisCluster controller -> failed reloading tls in leader: %w", err)
}
}

// update status
err := r.updateStatus(ctx, rc, rcvb2.RedisClusterStatus{
State: rc.Status.State,
Reason: rc.Status.Reason,
ReadyFollowerReplicas: rc.Status.ReadyFollowerReplicas,
ReadyLeaderReplicas: rc.Status.ReadyLeaderReplicas,
TLSLastVersion: tlsSecret.ResourceVersion,
})
if err != nil {
log.FromContext(ctx).Error(err, "update status error")
}

log.FromContext(ctx).Info("hotReloadTLS: reload TLS configuration has been completed")
return nil
}

func (r *Reconciler) updateStatus(ctx context.Context, rc *rcvb2.RedisCluster, status rcvb2.RedisClusterStatus) error {
if reflect.DeepEqual(rc.Status, status) {
return nil
Expand Down Expand Up @@ -365,5 +434,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, opts controller.Options)
For(&rcvb2.RedisCluster{}).
Owns(&appsv1.StatefulSet{}).
WithOptions(opts).
Watches(&rcvb2.RedisCluster{}, &handler.EnqueueRequestForObject{}).
Watches(&corev1.Secret{}, r.ResourceWatcher).
Complete(r)
}
16 changes: 16 additions & 0 deletions internal/k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ func createRedisReplicationCommand(ctx context.Context, client kubernetes.Interf
return cmd
}

func HotReloadTLS(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, podName string) error {
redisClient := configureRedisClient(ctx, client, cr, podName)
commands := []struct{ k, v string }{
{"tls-cert-file", "/tls/tls.crt"},
{"tls-key-file", "/tls/tls.key"},
{"tls-ca-cert-file", "/tls/ca.crt"},
}
for _, cmd := range commands {
if err := redisClient.ConfigSet(ctx, cmd.k, cmd.v).Err(); err != nil {
log.FromContext(ctx).Error(err, "hotReloadTLS: Failed to set tls config", "cmd", cmd, "on pod", podName)
return err
}
}
return nil
}

// ExecuteRedisReplicationCommand will execute the replication command
func ExecuteRedisReplicationCommand(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) {
var podIP string
Expand Down