diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index f6ea6640..eb9b1495 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -20,6 +20,7 @@ package broker import ( "context" + "github.com/go-logr/logr" "reflect" "strconv" "strings" @@ -167,38 +168,8 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque share.BrokerClusterName = broker.Name replicaPerGroup := broker.Spec.ReplicaPerGroup - reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) - for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { - reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum)) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) - // Check if the statefulSet already exists, if not create a new one - found := &appsv1.StatefulSet{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) - if err != nil && errors.IsNotFound(err) { - reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name) - err = r.client.Create(context.TODO(), dep) - if err != nil { - reqLogger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name) - } - } else if err != nil { - reqLogger.Error(err, "Failed to get broker master StatefulSet.") - } - for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { - reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex) - err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found) - if err != nil && errors.IsNotFound(err) { - reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) - err = r.client.Create(context.TODO(), replicaDep) - if err != nil { - reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) - } - } else if err != nil { - reqLogger.Error(err, "Failed to get broker replica StatefulSet.") - } - } - } + r.EnsureBrokerStatefulSets(broker, replicaPerGroup, reqLogger) // Check for name server scaling if broker.Spec.AllowRestart { @@ -347,6 +318,59 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } +func (r *ReconcileBroker) EnsureBrokerStatefulSets(broker *rocketmqv1alpha1.Broker, replicaPerGroup int, reqLogger logr.Logger) { + reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) + for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { + reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum)) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) + // Check if the statefulSet already exists, if not create a new one + found := &appsv1.StatefulSet{} + err := r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) + if err != nil && errors.IsNotFound(err) { + reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name) + err = r.client.Create(context.TODO(), dep) + if err != nil { + reqLogger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name) + } + } else if err != nil { + reqLogger.Error(err, "Failed to get broker master StatefulSet.") + } else { + for index := range found.Spec.Template.Spec.Containers[0].Env { + if cons.EnvNameServiceAddress == found.Spec.Template.Spec.Containers[0].Env[index].Name { + if found.Spec.Template.Spec.Containers[0].Env[index].Value != share.NameServersStr { + share.IsNameServersStrUpdated = true + } + break + } + } + } + + for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { + reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex) + err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found) + if err != nil && errors.IsNotFound(err) { + reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) + err = r.client.Create(context.TODO(), replicaDep) + if err != nil { + reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) + } + } else if err != nil { + reqLogger.Error(err, "Failed to get broker replica StatefulSet.") + } else { + for index := range found.Spec.Template.Spec.Containers[0].Env { + if cons.EnvNameServiceAddress == found.Spec.Template.Spec.Containers[0].Env[index].Name { + if found.Spec.Template.Spec.Containers[0].Env[index].Value != share.NameServersStr { + share.IsNameServersStrUpdated = true + } + break + } + } + } + } + } +} + func getCopyMetadataJsonCommand(dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) string { cmdOpts := buildInputCommand(dir) topicsJsonStr, err := exec(cmdOpts, sourcePodName, k8s, namespace)