diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index cbd31a1f..ef7d1aac 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -385,9 +385,12 @@ type ComputeGroupStatus struct { // the service that can access the compute group pods. ServiceName string `json:"serviceName,omitempty"` - // + //the unique id of compute group in kubernetes, this field is part of compute group statefulset. UniqueId string `json:"uniqueId,omitempty"` + //the compute group id in doris meta, this response to the backend's tag "compute_group_id"; + ComputeGroupId string `json:"computeGroupId,omitempty"` + //AvailableStatus represents the compute group available or not. AvailableStatus AvailableStatus `json:"availableStatus,omitempty"` diff --git a/api/disaggregated/v1/unique_id.go b/api/disaggregated/v1/unique_id.go index 8b440dd8..06d49d9f 100644 --- a/api/disaggregated/v1/unique_id.go +++ b/api/disaggregated/v1/unique_id.go @@ -18,9 +18,9 @@ package v1 import ( - "crypto/sha256" - "math/big" - "strings" + "crypto/sha256" + "math/big" + "strings" ) /* @@ -80,6 +80,7 @@ func (ddc *DorisDisaggregatedCluster) GetMSServiceName() string { return ddc.Name + "-" + "ms" } +//the first deployed used computegroup name, when user rename the compute group name by sql command `ALTER SYSTEM RENAME COMPUTE GROUP `, this function will not right. func (ddc *DorisDisaggregatedCluster) GetCGName(cg *ComputeGroup) string { // use uniqueId as compute group name, the uniqueId restrict not empty, and the computegroup's name should use "_" not "-" return strings.ReplaceAll(cg.UniqueId, "-", "_") diff --git a/config/crd/bases/crds.yaml b/config/crd/bases/crds.yaml index eed33d3a..1887a9fb 100644 --- a/config/crd/bases/crds.yaml +++ b/config/crd/bases/crds.yaml @@ -15953,6 +15953,10 @@ spec: description: AvailableStatus represents the compute group available or not. type: string + computeGroupId: + description: the compute group id in doris meta, this response + to the backend's tag "compute_group_id"; + type: string phase: description: Phase represent the stage of reconciling. type: string @@ -15973,6 +15977,8 @@ spec: format: int32 type: integer uniqueId: + description: the unique id of compute group in kubernetes, this + field is part of compute group statefulset. type: string type: object type: array diff --git a/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml b/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml index a50acfca..593db396 100644 --- a/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml +++ b/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml @@ -6826,6 +6826,10 @@ spec: description: AvailableStatus represents the compute group available or not. type: string + computeGroupId: + description: the compute group id in doris meta, this response + to the backend's tag "compute_group_id"; + type: string phase: description: Phase represent the stage of reconciling. type: string @@ -6846,6 +6850,8 @@ spec: format: int32 type: integer uniqueId: + description: the unique id of compute group in kubernetes, this + field is part of compute group statefulset. type: string type: object type: array diff --git a/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml b/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml index a50acfca..593db396 100644 --- a/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml +++ b/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml @@ -6826,6 +6826,10 @@ spec: description: AvailableStatus represents the compute group available or not. type: string + computeGroupId: + description: the compute group id in doris meta, this response + to the backend's tag "compute_group_id"; + type: string phase: description: Phase represent the stage of reconciling. type: string @@ -6846,6 +6850,8 @@ spec: format: int32 type: integer uniqueId: + description: the unique id of compute group in kubernetes, this + field is part of compute group statefulset. type: string type: object type: array diff --git a/pkg/common/utils/mysql/mysql.go b/pkg/common/utils/mysql/mysql.go index 4f02bb06..41ba5a0f 100644 --- a/pkg/common/utils/mysql/mysql.go +++ b/pkg/common/utils/mysql/mysql.go @@ -18,12 +18,18 @@ package mysql import ( - "database/sql" - "encoding/json" - "fmt" - _ "github.com/go-sql-driver/mysql" - "github.com/jmoiron/sqlx" - "k8s.io/klog/v2" + "database/sql" + "encoding/json" + "fmt" + "os" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "k8s.io/klog/v2" +) + +const ( + COMPUTE_GROUP_ID = "compute_group_id" ) type DBConfig struct { @@ -34,11 +40,20 @@ type DBConfig struct { Database string } +func NewDBConfig() DBConfig { + return DBConfig{ + Database: "mysql", + } +} + type DB struct { *sqlx.DB } func NewDorisSqlDB(cfg DBConfig) (*DB, error) { + if os.Getenv("DEBUG") == "true" { + cfg.Host = "10.152.183.86" + } dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) db, err := sqlx.Open("mysql", dsn) if err != nil { @@ -168,10 +183,10 @@ func (db *DB) GetObservers() ([]*Frontend, error) { return res, nil } -func (db *DB) GetBackendsByCGName(cgName string) ([]*Backend, error) { +func (db *DB) GetBackendsByComputeGroupId(cgid string) ([]*Backend, error) { backends, err := db.ShowBackends() if err != nil { - klog.Errorf("GetBackendsByCGName show backends failed, err: %s\n", err.Error()) + klog.Errorf("GetBackendsByComputeGroupId show backends failed, err: %s\n", err.Error()) return nil, err } var res []*Backend @@ -179,16 +194,16 @@ func (db *DB) GetBackendsByCGName(cgName string) ([]*Backend, error) { var m map[string]interface{} err := json.Unmarshal([]byte(be.Tag), &m) if err != nil { - klog.Errorf("GetBackendsByCGName backends tag stirng to map failed, tag: %s, err: %s\n", be.Tag, err.Error()) + klog.Errorf("GetBackendsByComputeGroupId backends tag stirng to map failed, tag: %s, err: %s\n", be.Tag, err.Error()) return nil, err } - if _, ok := m["compute_group_name"]; !ok { - klog.Errorf("GetBackendsByCGName backends tag get compute_group_name failed, tag: %s, err: %s\n", be.Tag, err.Error()) + if _, ok := m[COMPUTE_GROUP_ID]; !ok { + klog.Errorf("GetBackendsByComputeGroupId backends tag get compute_group_name failed, tag: %s, err: %s\n", be.Tag, err.Error()) return nil, err } - cgNameFromTag := fmt.Sprintf("%s", m["compute_group_name"]) - if cgNameFromTag == cgName { + computegroupId := fmt.Sprintf("%s", m[COMPUTE_GROUP_ID]) + if computegroupId == cgid { res = append(res, be) } } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index 67be6d3b..0faf94fd 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -18,26 +18,29 @@ package computegroups import ( - "context" - "errors" - "fmt" - dv1 "github.com/apache/doris-operator/api/disaggregated/v1" - "github.com/apache/doris-operator/pkg/common/utils" - "github.com/apache/doris-operator/pkg/common/utils/k8s" - "github.com/apache/doris-operator/pkg/common/utils/resource" - "github.com/apache/doris-operator/pkg/common/utils/set" - sc "github.com/apache/doris-operator/pkg/controller/sub_controller" - appv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - "regexp" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" - "strings" - "sync" + "context" + "encoding/json" + "errors" + "fmt" + "regexp" + "strconv" + "strings" + "sync" + + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" + "github.com/apache/doris-operator/pkg/common/utils" + "github.com/apache/doris-operator/pkg/common/utils/k8s" + "github.com/apache/doris-operator/pkg/common/utils/mysql" + "github.com/apache/doris-operator/pkg/common/utils/resource" + "github.com/apache/doris-operator/pkg/common/utils/set" + sc "github.com/apache/doris-operator/pkg/controller/sub_controller" + appv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ sc.DisaggregatedSubController = &DisaggregatedComputeGroupsController{} @@ -166,7 +169,7 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { var est appv1.StatefulSet if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { - //TODO: add downlaodAPI volume Mounts + // add downlaodAPI volume Mounts dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(st) if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil { klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) @@ -236,11 +239,16 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris for i := range cgss { if cgss[i].UniqueId == uniqueId { - if cgss[i].Phase != dv1.Ready { + /*if cgss[i].Phase != dv1.Ready { defaultStatus.Phase = cgss[i].Phase } defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas - cgss[i] = defaultStatus + cgss[i] = defaultStatus*/ + if cgss[i].Phase == dv1.Ready { + cgss[i].Phase = defaultStatus.Phase + } + cgss[i].Replicas = *cg.Replicas + return } } @@ -288,13 +296,20 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con ddc := obj.(*dv1.DorisDisaggregatedCluster) var eCGs []dv1.ComputeGroupStatus + var delComputeGroupIds []string for i, cgs := range ddc.Status.ComputeGroupStatuses { + exist := false for _, cg := range ddc.Spec.ComputeGroups { if cgs.UniqueId == cg.UniqueId { eCGs = append(eCGs, ddc.Status.ComputeGroupStatuses[i]) + exist = true break } } + + if !exist { + delComputeGroupIds = append(delComputeGroupIds, cgs.ComputeGroupId) + } } //list the svcs and stss owner reference to dorisDisaggregatedCluster. @@ -314,7 +329,7 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con delSvcNames := dcgs.findUnusedSvcs(svcs, ddc) delStsNames, delUniqueIds := dcgs.findUnusedStssAndUniqueIds(stss, ddc) - if err = dcgs.clearCGInDorisMeta(ctx, delUniqueIds, ddc); err != nil { + if err = dcgs.clearCGInDorisMeta(ctx, delComputeGroupIds, ddc); err != nil { return false, err } if err = dcgs.clearSvcs(ctx, delSvcNames, ddc); err != nil { @@ -367,8 +382,8 @@ func (dcgs *DisaggregatedComputeGroupsController) clearSvcs(ctx context.Context, return nil } -func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context.Context, cgNames []string, ddc *dv1.DorisDisaggregatedCluster) error { - if len(cgNames) == 0 { +func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context.Context, cgids []string, ddc *dv1.DorisDisaggregatedCluster) error { + if len(cgids) == 0 { return nil } @@ -380,11 +395,9 @@ func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context } defer sqlClient.Close() - for _, name := range cgNames { + for _, cgid := range cgids { //clear cg, the keepAmount = 0 - //confirm used the right cgName, as the cgName get from the uniqueid that '-' replaced by '_'. - cgName := strings.ReplaceAll(name, "-", "_") - err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, 0) + err = dcgs.scaledOutBENodesByDrop(sqlClient, cgid, 0) if err != nil { klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient failed: %s", err.Error()) dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) @@ -421,7 +434,7 @@ func (dcgs *DisaggregatedComputeGroupsController) findUnusedSvcs(svcs []corev1.S return unusedSvcNames } -func (dcgs *DisaggregatedComputeGroupsController) findUnusedStssAndUniqueIds(stss []appv1.StatefulSet, ddc *dv1.DorisDisaggregatedCluster) ([]string /*sts*/, []string /*cgNames*/) { +func (dcgs *DisaggregatedComputeGroupsController) findUnusedStssAndUniqueIds(stss []appv1.StatefulSet, ddc *dv1.DorisDisaggregatedCluster) ([]string /*sts*/, []string /*uniqueIds*/) { var unusedStsNames []string var unusedUniqueIds []string for i, _ := range stss { @@ -550,6 +563,14 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie } } + + for _, cgs := range ddc.Status.ComputeGroupStatuses { + if cgs.ComputeGroupId == "" { + dcgs.recordComputeGroupIds(ddc) + break + } + } + var fullAvailableCount int32 var availableCount int32 for _, cgs := range ddc.Status.ComputeGroupStatuses { @@ -569,6 +590,59 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie return errors.New(errMs) } +func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1.DorisDisaggregatedCluster) error { + // get user and password + adminUserName, password := dcgs.GetManagementAdminUserAndPWD(context.Background(), ddc) + + // get host and port + // When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace + host := ddc.GetFEVIPAddresss() + confMap := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.FE_RESOLVEKEY, ddc.Spec.FeSpec.ConfigMaps) + queryPort := resource.GetPort(confMap, resource.QUERY_PORT) + cfg := mysql.NewDBConfig() + cfg.User = adminUserName + cfg.Password = password + cfg.Host = host + cfg.Port = strconv.FormatInt(int64(queryPort), 10) + + db,err := mysql.NewDorisSqlDB(cfg) + if err != nil { + klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds new doris client failed,err=%s", err.Error()) + return err + } + backends, err := db.ShowBackends() + if err != nil { + klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds show backends failed, err=%s", err.Error()) + return err + } + + m := map[string]string{} //statefulsetname:computegroupid + for _, backend := range backends { + tags :=map[string]string{} + err = json.Unmarshal([]byte(backend.Tag), &tags) + if err != nil { + klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds backend tag stirng to map failed, tag: %s, err: %s", backend.Tag, err.Error()) + return err + } + if _, ok := tags[mysql.COMPUTE_GROUP_ID]; !ok { + klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds backend tag get compute_group_name failed, tag: %s, err: %s", backend.Tag, err.Error()) + return err + } + + podName := strings.Split(backend.Host, ".")[0] + re,_ := regexp.Compile("(.*)-[0-9]+$") + matchs := re.FindStringSubmatch(podName) + stsName := matchs[len(matchs)-1] + m[stsName] = tags[mysql.COMPUTE_GROUP_ID] + } + + for i,cgs := range ddc.Status.ComputeGroupStatuses { + ddc.Status.ComputeGroupStatuses[i].ComputeGroupId = m[cgs.StatefulsetName] + } + return nil +} + + func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisDisaggregatedCluster, cgs *dv1.ComputeGroupStatus) error { stfName := cgs.StatefulsetName sts, err := k8s.GetStatefulSet(context.Background(), dcgs.K8sclient, ddc.Namespace, stfName) @@ -583,7 +657,7 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD generation := dcgs.DisaggregatedSubDefaultController.ReturnStatefulsetUpdatedGeneration(sts, updateStatefulsetKey) //if this reconcile not update statefulset will not check the generation equals or not. if ddc.Generation != generation { - return errors.New("waiting statefulset upd ated") + return errors.New("waiting statefulset updated") } } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go index 6f56ff44..8fba7cc1 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -62,16 +62,16 @@ func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context, defer sqlClient.Close() cgKeepAmount := *cg.Replicas - cgName := cluster.GetCGName(cg) + cgid := cgStatus.ComputeGroupId if cluster.Spec.EnableDecommission { - if err := dcgs.scaledOutBENodesByDecommission(cluster, cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { + if err := dcgs.scaledOutBENodesByDecommission(cluster, cgStatus, sqlClient, cgid, cgKeepAmount); err != nil { return err } } else { // not decommission , drop node - if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil { + if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgid, cgKeepAmount); err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleOut scaledOutBENodesByDrop ddcName:%s, namespace:%s, computeGroupName:%s, drop nodes failed:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) + klog.Errorf("ScaleOut scaledOutBENodesByDrop ddcName:%s, namespace:%s, computeGroupName:%s, drop nodes failed:%s ", cluster.Name, cluster.Namespace, cgid, err.Error()) return err } cgStatus.Phase = dv1.Scaling @@ -80,27 +80,27 @@ func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context, return nil } -func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cluster *dv1.DorisDisaggregatedCluster, cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { - decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cluster *dv1.DorisDisaggregatedCluster, cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgid string, cgKeepAmount int32) error { + decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgid, cgKeepAmount) if err != nil { return err } switch decommissionPhase { case resource.DecommissionAcceptable: - err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount) + err = dcgs.decommissionBENodes(sqlClient, cgid, cgKeepAmount) if err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s , Decommission failed, err:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) + klog.Errorf("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupId:%s , Decommission failed, err:%s ", cluster.Name, cluster.Namespace, cgid, err.Error()) return err } cgStatus.Phase = dv1.Decommissioning return nil case resource.Decommissioning, resource.DecommissionPhaseUnknown: cgStatus.Phase = dv1.Decommissioning - klog.Infof("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s, Decommission in progress", cluster.Name, cluster.Namespace, cgName) + klog.Infof("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupId:%s, Decommission in progress", cluster.Name, cluster.Namespace, cgid) return nil case resource.Decommissioned: - dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) + dcgs.scaledOutBENodesByDrop(sqlClient, cgid, cgKeepAmount) } cgStatus.Phase = dv1.Scaling return nil @@ -117,12 +117,12 @@ func getOperationType(st, est *appv1.StatefulSet, phase dv1.Phase) string { func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDrop( masterDBClient *mysql.DB, - cgName string, + cgid string, cgKeepAmount int32) error { - dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + dropNodes, err := getScaledOutBENode(masterDBClient, cgid, cgKeepAmount) if err != nil { - klog.Errorf("scaledOutBENodesByDrop getScaledOutBENode cgName %s failed, err:%s ", cgName, err.Error()) + klog.Errorf("scaledOutBENodesByDrop getScaledOutBENode cgid %s failed, err:%s ", cgid, err.Error()) return err } @@ -131,7 +131,7 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDrop( } err = masterDBClient.DropBE(dropNodes) if err != nil { - klog.Errorf("scaledOutBENodesByDrop cgName %s DropBENodes failed, err:%s ", cgName, err.Error()) + klog.Errorf("scaledOutBENodesByDrop cgid %s DropBENodes failed, err:%s ", cgid, err.Error()) return err } return nil @@ -188,10 +188,10 @@ func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context } // isDecommissionProgressFinished check decommission status -func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) +func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgid string, cgKeepAmount int32) (resource.DecommissionPhase, error) { + allBackends, err := masterDBClient.GetBackendsByComputeGroupId(cgid) if err != nil { - klog.Errorf("decommissionProgressCheck failed, cgName %s ShowBackends err:%s", cgName, err.Error()) + klog.Errorf("decommissionProgressCheck failed, cgid %s ShowBackends err:%s", cgid, err.Error()) return resource.DecommissionPhaseUnknown, err } dts := resource.ConstructDecommissionTaskStatus(allBackends, cgKeepAmount) @@ -200,12 +200,12 @@ func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(mast func getScaledOutBENode( masterDBClient *mysql.DB, - cgName string, + cgid string, cgKeepAmount int32) ([]*mysql.Backend, error) { - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + allBackends, err := masterDBClient.GetBackendsByComputeGroupId(cgid) if err != nil { - klog.Errorf("scaledOutBEPreprocessing failed, cgName %s ShowBackends err:%s", cgName, err.Error()) + klog.Errorf("scaledOutBEPreprocessing failed, cgid %s ShowBackends err:%s", cgid, err.Error()) return nil, err } @@ -216,7 +216,7 @@ func getScaledOutBENode( splitCGIDArr := strings.Split(split[0], "-") podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) if err != nil { - klog.Errorf("scaledOutBEPreprocessing cgName %s splitCGIDArr can not split host : %s,err:%s", cgName, node.Host, err.Error()) + klog.Errorf("scaledOutBEPreprocessing cgid %s splitCGIDArr can not split host : %s,err:%s", cgid, node.Host, err.Error()) return nil, err } if podNum >= int(cgKeepAmount) {