diff --git a/README.md b/README.md index ae534692..24a7f44d 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,8 @@ spec: size: 1 # nameServers is the [ip:port] list of name service nameServers: "" + # rocketMQName is the rocketmq name, must equal to nameservice.spec.rocketMQName and and topictransfer.spec.rocketMQName + rocketMQName: "rocketmq_name" # replicaPerGroup is the number of each broker cluster replicaPerGroup: 1 # brokerImage is the customized docker image repo of the RocketMQ broker @@ -245,6 +247,8 @@ spec: nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 # imagePullPolicy is the image pull policy imagePullPolicy: Always + # rocketMQName is the rocketmq name, must equal to broker.spec.rocketMQName and and topictransfer.spec.rocketMQName + rocketMQName: "rocketmq_name" # hostNetwork can be true or false hostNetwork: true # Set DNS policy for the pod. @@ -440,6 +444,8 @@ spec: sourceCluster: broker-0 # targetCluster defines the target cluster targetCluster: broker-1 + # rocketMQName is the rocketmq name, must equal to nameservice.spec.rocketMQName and and broker.spec.rocketMQName + rocketMQName: "rocketmq_name" ``` Then apply the ```TopicTransfer``` resource: diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 26e25730..a8a26122 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -27,7 +27,6 @@ import ( "github.com/apache/rocketmq-operator/pkg/apis" "github.com/apache/rocketmq-operator/pkg/controller" - "github.com/operator-framework/operator-sdk/pkg/k8sutil" "github.com/operator-framework/operator-sdk/pkg/leader" "github.com/operator-framework/operator-sdk/pkg/log/zap" "github.com/operator-framework/operator-sdk/pkg/metrics" @@ -76,12 +75,6 @@ func main() { printVersion() - namespace, err := k8sutil.GetWatchNamespace() - if err != nil { - log.Error(err, "Failed to get watch namespace") - os.Exit(1) - } - // Get a config to talk to the apiserver cfg, err := config.GetConfig() if err != nil { @@ -100,7 +93,6 @@ func main() { // Create a new Cmd to provide shared dependencies and start components mgr, err := manager.New(cfg, manager.Options{ - Namespace: namespace, MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), }) if err != nil { diff --git a/deploy/cluster_role.yaml b/deploy/cluster_role.yaml new file mode 100644 index 00000000..fa714c3c --- /dev/null +++ b/deploy/cluster_role.yaml @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + creationTimestamp: null + name: rocketmq-operator +rules: + - apiGroups: + - "" + resources: + - pods + - services + - endpoints + - persistentvolumeclaims + - events + - configmaps + - secrets + - pods/exec + verbs: + - '*' + - apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - apiGroups: + - apps + resources: + - deployments + - daemonsets + - replicasets + - statefulsets + verbs: + - '*' + - apiGroups: + - monitoring.coreos.com + resources: + - servicemonitors + verbs: + - get + - create + - apiGroups: + - apps + resourceNames: + - rocketmq-operator + resources: + - deployments/finalizers + verbs: + - update + - apiGroups: + - rocketmq.apache.org + resources: + - '*' + - brokers + - pods/exec + - topictransfers + verbs: + - '*' diff --git a/deploy/clusterrole_binding.yaml b/deploy/clusterrole_binding.yaml new file mode 100644 index 00000000..9e70848a --- /dev/null +++ b/deploy/clusterrole_binding.yaml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rocketmq-operator +subjects: + - kind: ServiceAccount + name: rocketmq-operator + namespace: default +roleRef: + kind: ClusterRole + name: rocketmq-operator + apiGroup: rbac.authorization.k8s.io diff --git a/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml b/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml index 834c43c9..e15629a4 100644 --- a/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml +++ b/deploy/crds/rocketmq_v1alpha1_broker_crd.yaml @@ -79,6 +79,9 @@ spec: items: type: object type: array + rocketMQName: + description: The name of rocketmq, and the broker and nameserver in the same cluster must be filled with the same name + type: string required: - size - replicaPerGroup @@ -93,6 +96,7 @@ spec: - volumes - volumeClaimTemplates - scalePodName + - rocketMQName type: object status: properties: diff --git a/deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml b/deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml index 437ce049..baaa1d6f 100644 --- a/deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml +++ b/deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml @@ -63,6 +63,9 @@ spec: items: type: object type: array + rocketMQName: + description: The name of rocketmq, and the broker and nameserver in the same cluster must be filled with the same name + type: string required: - size - nameServiceImage @@ -73,6 +76,7 @@ spec: - storageMode - hostPath - volumeClaimTemplates + - rocketMQName type: object status: properties: diff --git a/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml b/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml index c36a3b00..0abe8005 100644 --- a/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml +++ b/deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml @@ -38,6 +38,11 @@ spec: topic: description: Topic name type: string + rocketMQName: + description: The name of rocketmq, and the broker and nameserver in the same cluster must be filled with the same name + type: string + required: + - rocketMQName type: object status: type: object diff --git a/example/rocketmq_v1alpha1_broker_cr.yaml b/example/rocketmq_v1alpha1_broker_cr.yaml index e3d9bd3e..cf932b2a 100644 --- a/example/rocketmq_v1alpha1_broker_cr.yaml +++ b/example/rocketmq_v1alpha1_broker_cr.yaml @@ -38,6 +38,8 @@ spec: size: 1 # nameServers is the [ip:port] list of name service nameServers: "" + # rocketMQName is the rocketmq name, must equal to nameserver.spec.rocketMQName and topictransfer.spec.rocketMQName + rocketMQName: "rocketmq_name" # replicaPerGroup is the number of each broker cluster replicaPerGroup: 1 # brokerImage is the customized docker image repo of the RocketMQ broker diff --git a/example/rocketmq_v1alpha1_nameservice_cr.yaml b/example/rocketmq_v1alpha1_nameservice_cr.yaml index 291d4471..1b17bbf6 100644 --- a/example/rocketmq_v1alpha1_nameservice_cr.yaml +++ b/example/rocketmq_v1alpha1_nameservice_cr.yaml @@ -24,6 +24,8 @@ spec: nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 # imagePullPolicy is the image pull policy imagePullPolicy: Always + # rocketMQName is the rocketmq name, must equal to broker.spec.rocketMQName and topictransfer.spec.rocketMQName + rocketMQName: "rocketmq_name" # hostNetwork can be true or false hostNetwork: true # Set DNS policy for the pod. diff --git a/example/rocketmq_v1alpha1_rocketmq_cluster.yaml b/example/rocketmq_v1alpha1_rocketmq_cluster.yaml index afb01e53..88e71522 100644 --- a/example/rocketmq_v1alpha1_rocketmq_cluster.yaml +++ b/example/rocketmq_v1alpha1_rocketmq_cluster.yaml @@ -39,6 +39,8 @@ spec: size: 1 # nameServers is the [ip:port] list of name service nameServers: "" + # rocketMQName is the rocketmq name, must equal to nameserver.spec.rocketMQName + rocketMQName: "rocketmq_name" # replicaPerGroup is the number of each broker cluster replicaPerGroup: 1 # brokerImage is the customized docker image repo of the RocketMQ broker @@ -99,6 +101,8 @@ spec: nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 # imagePullPolicy is the image pull policy imagePullPolicy: Always + # rocketMQName is the rocketmq name, must equal to broker.spec.rocketMQName and and topictransfer.spec.rocketMQName + rocketMQName: "rocketmq_name" # hostNetwork can be true or false hostNetwork: true # Set DNS policy for the pod. diff --git a/example/rocketmq_v1alpha1_topictransfer_cr.yaml b/example/rocketmq_v1alpha1_topictransfer_cr.yaml index 80e8772b..1586dbbd 100644 --- a/example/rocketmq_v1alpha1_topictransfer_cr.yaml +++ b/example/rocketmq_v1alpha1_topictransfer_cr.yaml @@ -24,3 +24,5 @@ spec: sourceCluster: broker-0 # targetCluster defines the target cluster targetCluster: broker-1 + # rocketMQName is the rocketmq name, must equal to nameserver.spec.rocketMQName and broker.spec.rocketMQName + rocketMQName: "rocketmq_name" diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go index 1e694ee5..ea72b224 100644 --- a/pkg/apis/rocketmq/v1alpha1/broker_types.go +++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go @@ -56,6 +56,8 @@ type BrokerSpec struct { VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates"` // The name of pod where the metadata from ScalePodName string `json:"scalePodName"` + // RocketMQ Name, the broker and nameserver in the same cluster must be filled with the same name + RocketMQName string `json:"rocketMQName"` } // BrokerStatus defines the observed state of Broker diff --git a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go index b8900e9b..33573340 100644 --- a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go +++ b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go @@ -49,6 +49,8 @@ type NameServiceSpec struct { HostPath string `json:"hostPath"` // VolumeClaimTemplates defines the StorageClass VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates"` + // RocketMQ Name, the broker and nameserver in the same cluster must be filled with the same name + RocketMQName string `json:"rocketMQName"` } // NameServiceStatus defines the observed state of NameService diff --git a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go index 82a7e37e..0304d5fd 100644 --- a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go +++ b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go @@ -37,6 +37,8 @@ type TopicTransferSpec struct { SourceCluster string `json:"sourceCluster,omitempty"` // The cluster where the topic will be transferred to TargetCluster string `json:"targetCluster,omitempty"` + // // RocketMQ Name, the broker and nameserver in the same cluster must be filled with the same name + RocketMQName string `json:"rocketMQName"` } // TopicTransferStatus defines the observed state of TopicTransfer diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index 5b2de774..1bd2c542 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -131,31 +131,43 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result return reconcile.Result{}, err } + actualKey := broker.Namespace + "-" + broker.Spec.RocketMQName + actual, _ := share.GetInstance().LoadOrStore(actualKey, share.ShareItem{}) + defer func() { + reqLogger.Info("Broker actualKey:" + actualKey + " actual.NameServerStr:" + actual.NameServersStr + + " actual.BrokerClusterName:" + actual.BrokerClusterName + " IsNameServersStrInitialized:" + strconv.FormatBool(actual.IsNameServersStrInitialized)) + share.GetInstance().Store(actualKey, actual) + }() + + var groupNum int if broker.Status.Size == 0 { - share.GroupNum = broker.Spec.Size + groupNum = broker.Spec.Size } else { - share.GroupNum = broker.Status.Size + groupNum = broker.Status.Size } - if broker.Spec.NameServers == "" { // wait for name server ready when create broker cluster if nameServers is omitted for { - if share.IsNameServersStrInitialized { + actual, _ = share.GetInstance().Load(actualKey) + if actual.IsNameServersStrInitialized { break } else { - log.Info("Broker Waiting for name server ready...") + log.Info("Broker Waiting for name server ready..., actualKey:" + + actualKey + " actual.NameServersStr:" + actual.NameServersStr + " IsNameServersStrInitialized:" + + strconv.FormatBool(actual.IsNameServersStrInitialized)) time.Sleep(time.Duration(cons.WaitForNameServerReadyInSecond) * time.Second) } } } else { - share.NameServersStr = broker.Spec.NameServers + actual.NameServersStr = broker.Spec.NameServers } + actual.GroupNum = groupNum - share.BrokerClusterName = broker.Name + actual.BrokerClusterName = broker.Name replicaPerGroup := broker.Spec.ReplicaPerGroup - reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) - for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { - reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum)) + reqLogger.Info("brokerGroupNum=" + strconv.Itoa(actual.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) + for brokerGroupIndex := 0; brokerGroupIndex < actual.GroupNum; brokerGroupIndex++ { + reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(actual.GroupNum)) dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) // Check if the statefulSet already exists, if not create a new one found := &appsv1.StatefulSet{} @@ -189,8 +201,8 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result // Check for name server scaling if broker.Spec.AllowRestart { // The following code will restart all brokers to update NAMESRV_ADDR env - if share.IsNameServersStrUpdated { - for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ { + if actual.IsNameServersStrUpdated { + for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ { brokerName := getBrokerName(broker, brokerGroupIndex) // Update master broker reqLogger.Info("Update Master Broker NAMESRV_ADDR of " + brokerName) @@ -200,7 +212,7 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result if err != nil { reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName) } else { - found.Spec.Template.Spec.Containers[0].Env[0].Value = share.NameServersStr + found.Spec.Template.Spec.Containers[0].Env[0].Value = actual.NameServersStr err = r.client.Update(context.TODO(), found) if err != nil { reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name) @@ -220,7 +232,7 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result } else { for index := range replicaFound.Spec.Template.Spec.Containers[0].Env { if cons.EnvNameServiceAddress == replicaFound.Spec.Template.Spec.Containers[0].Env[index].Name { - replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = share.NameServersStr + replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = actual.NameServersStr break } } @@ -235,7 +247,7 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result } } } - share.IsNameServersStrUpdated = false + actual.IsNameServersStrUpdated = false } // List the pods for this broker's statefulSet @@ -253,10 +265,17 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result podNames := getPodNames(podList.Items) log.Info("broker.Status.Nodes length = " + strconv.Itoa(len(broker.Status.Nodes))) log.Info("podNames length = " + strconv.Itoa(len(podNames))) + + if len(podNames) == 0 { + reqLogger.Info("Broker podName size is 0, wait for a moment...") + return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil + } + // Ensure every pod is in running phase for _, pod := range podList.Items { if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) { - log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...") + reqLogger.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...") + return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } } @@ -276,6 +295,17 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result cmd = []string{"/bin/bash", "-c", MakeConfigDirCommand + " && " + ChmodDirCommand + " && " + topicsCommand + " && " + subscriptionGroupCommand} } + // Update status.Nodes if needed + if !reflect.DeepEqual(podNames, broker.Status.Nodes) { + broker.Status.Nodes = podNames + if len(broker.Status.Nodes) != 0 { + err = r.client.Status().Update(context.TODO(), broker) + if err != nil { + reqLogger.Error(err, "Failed to update Broker Nodes status.") + } + } + } + // Update status.Size if needed if broker.Spec.Size != broker.Status.Size { log.Info("broker.Status.Size = " + strconv.Itoa(broker.Status.Size)) @@ -287,15 +317,6 @@ func (r *ReconcileBroker) Reconcile(request reconcile.Request) (reconcile.Result } } - // Update status.Nodes if needed - if !reflect.DeepEqual(podNames, broker.Status.Nodes) { - broker.Status.Nodes = podNames - err = r.client.Status().Update(context.TODO(), broker) - if err != nil { - reqLogger.Error(err, "Failed to update Broker Nodes status.") - } - } - //podList := &corev1.PodList{} //labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name)) //listOps := &client.ListOptions{ @@ -454,9 +475,13 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, } func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int) []corev1.EnvVar { + + actualKey := broker.Namespace + "-" + broker.Spec.RocketMQName + actual, _ := share.GetInstance().LoadOrStore(actualKey, share.ShareItem{}) + envs := []corev1.EnvVar{{ Name: cons.EnvNameServiceAddress, - Value: share.NameServersStr, + Value: actual.NameServersStr, }, { Name: cons.EnvBrokerId, Value: strconv.Itoa(replicaIndex), diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go index 410515f8..d3a4f061 100644 --- a/pkg/controller/nameservice/nameservice_controller.go +++ b/pkg/controller/nameservice/nameservice_controller.go @@ -174,7 +174,23 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha } hostIps := getNameServers(podList.Items) + if len(hostIps) != int(instance.Spec.Size) || len(hostIps) == 0 { + reqLogger.Info("nameService hostIps size must equal to spec size, hostIp size: " + + strconv.FormatInt(int64(len(hostIps)), 10) + + " spec size:" + + strconv.FormatInt(int64(instance.Spec.Size), 10)) + return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil + } + + actualKey := instance.Namespace + "-" + instance.Spec.RocketMQName + actual, _ := share.GetInstance().LoadOrStore(actualKey, share.ShareItem{}) + defer func() { + reqLogger.Info("NameServer store key:" + actualKey + " actual.NameServersStr:" + actual.NameServersStr + " IsNameServersStrInitialized:" + strconv.FormatBool(actual.IsNameServersStrInitialized)) + share.GetInstance().Store(actualKey, actual) + }() + // Update status.NameServers if needed + // hostIps is empty, instance.Status.NameServers is empty, also not in if !reflect.DeepEqual(hostIps, instance.Status.NameServers) { oldNameServerListStr := "" for _, value := range instance.Status.NameServers { @@ -185,14 +201,14 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha for _, value := range hostIps { nameServerListStr = nameServerListStr + value + ":9876;" } - share.NameServersStr = nameServerListStr[:len(nameServerListStr)-1] - reqLogger.Info("share.NameServersStr:" + share.NameServersStr) + actual.NameServersStr = nameServerListStr[:len(nameServerListStr)-1] + reqLogger.Info("share.NameServersStr:" + actual.NameServersStr) if len(oldNameServerListStr) <= cons.MinIpListLength { - oldNameServerListStr = share.NameServersStr - } else if len(share.NameServersStr) > cons.MinIpListLength { + oldNameServerListStr = actual.NameServersStr + } else if len(actual.NameServersStr) > cons.MinIpListLength { oldNameServerListStr = oldNameServerListStr[:len(oldNameServerListStr)-1] - share.IsNameServersStrUpdated = true + actual.IsNameServersStrUpdated = true } reqLogger.Info("oldNameServerListStr:" + oldNameServerListStr) @@ -206,17 +222,18 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha } // use admin tool to update broker config - if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) { + if actual.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(actual.NameServersStr) > cons.MinIpListLength) { mqAdmin := cons.AdminToolDir subCmd := cons.UpdateBrokerConfig key := cons.ParamNameServiceAddress - reqLogger.Info("share.GroupNum=broker.Spec.Size=" + strconv.Itoa(share.GroupNum)) + reqLogger.Info("share.GroupNum=broker.Spec.Size=" + strconv.Itoa(actual.GroupNum)) + - clusterName := share.BrokerClusterName + clusterName := actual.BrokerClusterName reqLogger.Info("Updating config " + key + " of cluster" + clusterName) - command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + share.NameServersStr - cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", share.NameServersStr) + command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + actual.NameServersStr + cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", actual.NameServersStr) output, err := cmd.Output() if err != nil { reqLogger.Error(err, "Update Broker config "+key+" failed of cluster "+clusterName+", command: "+command) @@ -224,7 +241,6 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha } reqLogger.Info("Successfully updated Broker config " + key + " of cluster " + clusterName + ", command: " + command + ", with output: " + string(output)) } - } // Print NameServers IP for i, value := range instance.Status.NameServers { @@ -233,7 +249,7 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha runningNameServerNum := getRunningNameServersNum(podList.Items) if runningNameServerNum == instance.Spec.Size { - share.IsNameServersStrInitialized = true + actual.IsNameServersStrInitialized = true } if requeue { diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go index f6c842cd..968d75a0 100644 --- a/pkg/controller/topictransfer/topictransfer_controller.go +++ b/pkg/controller/topictransfer/topictransfer_controller.go @@ -127,7 +127,9 @@ func (r *ReconcileTopicTransfer) Reconcile(request reconcile.Request) (reconcile targetCluster := topicTransfer.Spec.TargetCluster sourceCluster := topicTransfer.Spec.SourceCluster - nameServer := strings.Split(share.NameServersStr, ";")[0] + actualKey := topicTransfer.Namespace + "-" + topicTransfer.Spec.RocketMQName + actual, _ := share.GetInstance().LoadOrStore(actualKey, share.ShareItem{}) + nameServer := strings.Split(actual.NameServersStr, ";")[0] if len(nameServer) < cons.MinIpListLength { reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.") // terminate the transfer process diff --git a/pkg/share/share.go b/pkg/share/share.go index bd1304bf..f25d7547 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -18,19 +18,45 @@ // Package share defines some variables shared by different packages package share +import "sync" + var ( - // GroupNum is the number of broker group - GroupNum = 0 + once sync.Once + instance *Singleton +) +type ShareItem struct { + // GroupNum is the number of broker group + GroupNum int // NameServersStr is the name server list - NameServersStr = "" - + NameServersStr string // IsNameServersStrUpdated is whether the name server list is updated - IsNameServersStrUpdated = false - + IsNameServersStrUpdated bool // IsNameServersStrInitialized is whether the name server list is initialized - IsNameServersStrInitialized = false - + IsNameServersStrInitialized bool // BrokerClusterName is the broker cluster name - BrokerClusterName = "" -) + BrokerClusterName string +} + +type Singleton struct { + itemSyncMap ItemSyncMap +} + +func GetInstance() *Singleton { + once.Do(func() { + instance = &Singleton{} + }) + return instance +} + +func (singleton *Singleton) Load(key string) (value ShareItem, ok bool) { + return singleton.itemSyncMap.Load(key) +} + +func (singleton *Singleton) Store(key string, value ShareItem) { + singleton.itemSyncMap.Store(key, value) +} + +func (singleton *Singleton) LoadOrStore(key string, value ShareItem) (actual ShareItem, loaded bool){ + return singleton.itemSyncMap.LoadOrStore(key, value) +} diff --git a/pkg/share/shareitem_sync_map.go b/pkg/share/shareitem_sync_map.go new file mode 100644 index 00000000..f939bfc7 --- /dev/null +++ b/pkg/share/shareitem_sync_map.go @@ -0,0 +1,36 @@ +package share + +import "sync" + +type ItemSyncMap struct { + m sync.Map +} + +func (sMap *ItemSyncMap) Delete(key string) { + sMap.m.Delete(key) +} + +func (sMap *ItemSyncMap) Load(key string) (value ShareItem, ok bool) { + v, ok := sMap.m.Load(key) + if v != nil { + value = v.(ShareItem) + } + return +} + +func (sMap *ItemSyncMap) LoadOrStore(key string, value ShareItem) (actual ShareItem, loaded bool) { + a, loaded := sMap.m.LoadOrStore(key, value) + actual = a.(ShareItem) + return +} + +func (sMap *ItemSyncMap) Range(f func(key string, value ShareItem) bool) { + f1 := func(key, value interface{}) bool { + return f(key.(string), value.(ShareItem)) + } + sMap.m.Range(f1) +} + +func (sMap *ItemSyncMap) Store(key string, value ShareItem) { + sMap.m.Store(key, value) +}