Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 55 additions & 31 deletions pkg/controller/broker/broker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package broker

import (
"context"
"github.com/go-logr/logr"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -167,38 +168,8 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque

share.BrokerClusterName = broker.Name
replicaPerGroup := broker.Spec.ReplicaPerGroup
reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))
for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ {
reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum))
dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
err = r.client.Create(context.TODO(), dep)
if err != nil {
reqLogger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to get broker master StatefulSet.")
}

for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
err = r.client.Create(context.TODO(), replicaDep)
if err != nil {
reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to get broker replica StatefulSet.")
}
}
}
r.EnsureBrokerStatefulSets(broker, replicaPerGroup, reqLogger)

// Check for name server scaling
if broker.Spec.AllowRestart {
Expand Down Expand Up @@ -347,6 +318,59 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}

func (r *ReconcileBroker) EnsureBrokerStatefulSets(broker *rocketmqv1alpha1.Broker, replicaPerGroup int, reqLogger logr.Logger) {
reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))
for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ {
reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum))
dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
err = r.client.Create(context.TODO(), dep)
if err != nil {
reqLogger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to get broker master StatefulSet.")
} else {
for index := range found.Spec.Template.Spec.Containers[0].Env {
if cons.EnvNameServiceAddress == found.Spec.Template.Spec.Containers[0].Env[index].Name {
if found.Spec.Template.Spec.Containers[0].Env[index].Value != share.NameServersStr {
share.IsNameServersStrUpdated = true
}
break
}
}
}

for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
err = r.client.Create(context.TODO(), replicaDep)
if err != nil {
reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to get broker replica StatefulSet.")
} else {
for index := range found.Spec.Template.Spec.Containers[0].Env {
if cons.EnvNameServiceAddress == found.Spec.Template.Spec.Containers[0].Env[index].Name {
if found.Spec.Template.Spec.Containers[0].Env[index].Value != share.NameServersStr {
share.IsNameServersStrUpdated = true
}
break
}
}
}
}
}
}

func getCopyMetadataJsonCommand(dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) string {
cmdOpts := buildInputCommand(dir)
topicsJsonStr, err := exec(cmdOpts, sourcePodName, k8s, namespace)
Expand Down