Skip to content

Commit 5ef8f61

Browse files
authored
Merge pull request #318 from intelligentfu8/clear-resource-refactor
[Refactor](ddc)refactor clear resource code,fix service label error,use sts replicas…
2 parents adcfcd3 + 6960aa6 commit 5ef8f61

File tree

6 files changed

+257
-52
lines changed

6 files changed

+257
-52
lines changed

pkg/common/utils/k8s/client.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,23 @@ func ApplyService(ctx context.Context, k8sclient client.Client, svc *corev1.Serv
6969
return PatchClientObject(ctx, k8sclient, svc)
7070
}
7171

72+
func ListServicesInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]corev1.Service, error) {
73+
var svcList corev1.ServiceList
74+
if err := k8sclient.List(ctx, &svcList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
75+
return nil, err
76+
}
77+
78+
return svcList.Items, nil
79+
}
80+
81+
func ListStatefulsetInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]appv1.StatefulSet, error) {
82+
var stsList appv1.StatefulSetList
83+
if err := k8sclient.List(ctx, &stsList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
84+
return nil, err
85+
}
86+
return stsList.Items, nil
87+
}
88+
7289
// ApplyStatefulSet when the object is not exist, create object. if exist and statefulset have been updated, patch the statefulset.
7390
func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual) error {
7491
var est appv1.StatefulSet
@@ -93,6 +110,12 @@ func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.St
93110
return err
94111
}
95112

113+
func GetStatefulSet(ctx context.Context, k8sclient client.Client, namespace, name string) (*appv1.StatefulSet, error) {
114+
var est appv1.StatefulSet
115+
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &est)
116+
return &est, err
117+
}
118+
96119
func CreateClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
97120
klog.Info("Creating resource service ", "namespace ", object.GetNamespace(), " name ", object.GetName(), " kind ", object.GetObjectKind().GroupVersionKind().Kind)
98121
if err := k8sclient.Create(ctx, object); err != nil {

pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go

Lines changed: 196 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -259,79 +259,220 @@ func (dcgs *DisaggregatedComputeGroupsController) validateRegex(cgs []dv1.Comput
259259
func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Context, obj client.Object) (bool, error) {
260260
ddc := obj.(*dv1.DorisDisaggregatedCluster)
261261

262-
if !dcgs.feAvailable(ddc) {
263-
return false, nil
264-
}
265-
266-
var clearCGs []dv1.ComputeGroupStatus
267262
var eCGs []dv1.ComputeGroupStatus
268-
269263
for i, cgs := range ddc.Status.ComputeGroupStatuses {
270264
for _, cg := range ddc.Spec.ComputeGroups {
271265
if cgs.UniqueId == cg.UniqueId {
272266
eCGs = append(eCGs, ddc.Status.ComputeGroupStatuses[i])
273-
goto NoNeedAppend
267+
break
274268
}
275269
}
276-
277-
clearCGs = append(clearCGs, ddc.Status.ComputeGroupStatuses[i])
278-
// no need clear should not append.
279-
NoNeedAppend:
280270
}
281271

282-
sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
272+
//list the svcs and stss owner reference to dorisDisaggregatedCluster.
273+
cls := dcgs.GetCG2LayerCommonSchedulerLabels(ddc.Name)
274+
svcs, err := k8s.ListServicesInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
283275
if err != nil {
284-
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
285-
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
276+
klog.Errorf("DisaggregatedComputeGroupsController ListServicesInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
277+
return false, err
278+
}
279+
stss, err := k8s.ListStatefulsetInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
280+
if err != nil {
281+
klog.Errorf("DisaggregatedComputeGroupsController ListStatefulsetInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
286282
return false, err
287283
}
288-
defer sqlClient.Close()
289284

290-
for i := range clearCGs {
291-
cgs := clearCGs[i]
292-
cleared := true
293-
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, cgs.StatefulsetName); err != nil {
294-
cleared = false
295-
klog.Errorf("disaggregatedComputeGroupsController delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, cgs.StatefulsetName, err.Error())
296-
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGStatefulsetDeleteFailed), err.Error())
297-
}
285+
//clear unused service and statefulset.
286+
delSvcNames := dcgs.findUnusedSvcs(svcs, ddc)
287+
delStsNames, delUniqueIds := dcgs.findUnusedStssAndUniqueIds(stss, ddc)
288+
289+
if err = dcgs.clearCGInDorisMeta(ctx, delUniqueIds, ddc); err != nil {
290+
return false, err
291+
}
292+
if err = dcgs.clearSvcs(ctx, delSvcNames, ddc); err != nil {
293+
return false, err
294+
}
295+
if err = dcgs.clearStatefulsets(ctx, delStsNames, ddc); err != nil {
296+
return false, err
297+
}
298298

299-
if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, cgs.ServiceName); err != nil {
300-
cleared = false
301-
klog.Errorf("disaggregatedComputeGroupsController delete service namespace %s name %s failed, err=%s", ddc.Namespace, cgs.ServiceName, err.Error())
302-
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGServiceDeleteFailed), err.Error())
299+
//clear unused pvc
300+
for i := range eCGs {
301+
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
302+
if err != nil {
303+
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear ComputeGroup reduced replicas PVC failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, eCGs[i].UniqueId, err.Error())
303304
}
304-
if !cleared {
305-
eCGs = append(eCGs, clearCGs[i])
306-
continue
305+
}
306+
307+
for _, uniqueId := range delUniqueIds {
308+
//new fake computeGroup status for clear all pvcs owner reference to deleted compute group.
309+
fakeCgs := dv1.ComputeGroupStatus{
310+
UniqueId: uniqueId,
307311
}
308-
// drop compute group
309-
cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
310-
cgKeepAmount := int32(0)
311-
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
312+
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, fakeCgs)
312313
if err != nil {
313-
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
314-
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
314+
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear deleted compute group failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, uniqueId, err.Error())
315315
}
316+
}
317+
318+
ddc.Status.ComputeGroupStatuses = eCGs
319+
return true, nil
316320

321+
//TODO: next pr remove the code
322+
//sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
323+
//if err != nil {
324+
// klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
325+
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
326+
// return false, err
327+
//}
328+
//defer sqlClient.Close()
329+
//
330+
//for i := range clearCGs {
331+
// cgs := clearCGs[i]
332+
// cleared := true
333+
// if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, cgs.StatefulsetName); err != nil {
334+
// cleared = false
335+
// klog.Errorf("disaggregatedComputeGroupsController delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, cgs.StatefulsetName, err.Error())
336+
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGStatefulsetDeleteFailed), err.Error())
337+
// }
338+
//
339+
// if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, cgs.ServiceName); err != nil {
340+
// cleared = false
341+
// klog.Errorf("disaggregatedComputeGroupsController delete service namespace %s name %s failed, err=%s", ddc.Namespace, cgs.ServiceName, err.Error())
342+
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGServiceDeleteFailed), err.Error())
343+
// }
344+
// if !cleared {
345+
// eCGs = append(eCGs, clearCGs[i])
346+
// continue
347+
// }
348+
// // drop compute group
349+
// cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
350+
// cgKeepAmount := int32(0)
351+
// err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
352+
// if err != nil {
353+
// klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
354+
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
355+
// }
356+
//
357+
//}
358+
//
359+
//for i := range eCGs {
360+
// err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
361+
// if err != nil {
362+
// klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear whole ComputeGroup PVC failed, err=%s", err.Error())
363+
// }
364+
//}
365+
//for i := range clearCGs {
366+
// err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, clearCGs[i])
367+
// if err != nil {
368+
// klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear part ComputeGroup PVC failed, err=%s", err.Error())
369+
// }
370+
//}
371+
//
372+
//ddc.Status.ComputeGroupStatuses = eCGs
373+
//
374+
//return true, nil
375+
}
376+
377+
func (dcgs *DisaggregatedComputeGroupsController) clearStatefulsets(ctx context.Context, stsNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
378+
for _, name := range stsNames {
379+
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
380+
klog.Errorf("DisaggregatedComputeGroupsController clear statefulset failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
381+
return err
382+
}
317383
}
384+
return nil
385+
}
318386

319-
for i := range eCGs {
320-
err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
321-
if err != nil {
322-
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear whole ComputeGroup PVC failed, err=%s", err.Error())
387+
func (dcgs *DisaggregatedComputeGroupsController) clearSvcs(ctx context.Context, svcNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
388+
for _, name := range svcNames {
389+
if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
390+
klog.Errorf("DisaggregatedComputeGroupsController clear service failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
391+
return err
323392
}
324393
}
325-
for i := range clearCGs {
326-
err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, clearCGs[i])
394+
return nil
395+
}
396+
397+
func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context.Context, cgNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
398+
if len(cgNames) == 0 {
399+
return nil
400+
}
401+
402+
sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
403+
if err != nil {
404+
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
405+
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
406+
return err
407+
}
408+
defer sqlClient.Close()
409+
410+
for _, name := range cgNames {
411+
//clear cg, the keepAmount = 0
412+
//confirm used the right cgName, as the cgName get from the uniqueid that '-' replaced by '_'.
413+
cgName := strings.ReplaceAll(name, "-", "_")
414+
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, 0)
327415
if err != nil {
328-
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear part ComputeGroup PVC failed, err=%s", err.Error())
416+
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient failed: %s", err.Error())
417+
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
418+
return err
329419
}
330420
}
331421

332-
ddc.Status.ComputeGroupStatuses = eCGs
422+
return nil
423+
}
333424

334-
return true, nil
425+
func (dcgs *DisaggregatedComputeGroupsController) findUnusedSvcs(svcs []corev1.Service, ddc *dv1.DorisDisaggregatedCluster) []string {
426+
var unusedSvcNames []string
427+
for i, _ := range svcs {
428+
own := ownerReference2ddc(&svcs[i], ddc)
429+
if !own {
430+
//not owner reference to ddc, should skip the service.
431+
continue
432+
}
433+
434+
svcUniqueId := getUniqueIdFromClientObject(&svcs[i])
435+
exist := false
436+
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
437+
if ddc.Spec.ComputeGroups[j].UniqueId == svcUniqueId {
438+
exist = true
439+
break
440+
}
441+
}
442+
443+
if !exist {
444+
unusedSvcNames = append(unusedSvcNames, svcs[i].Name)
445+
}
446+
}
447+
448+
return unusedSvcNames
449+
}
450+
451+
func (dcgs *DisaggregatedComputeGroupsController) findUnusedStssAndUniqueIds(stss []appv1.StatefulSet, ddc *dv1.DorisDisaggregatedCluster) ([]string /*sts*/, []string /*cgNames*/) {
452+
var unusedStsNames []string
453+
var unusedUniqueIds []string
454+
for i, _ := range stss {
455+
own := ownerReference2ddc(&stss[i], ddc)
456+
if !own {
457+
//not owner reference tto ddc should skip the statefulset.
458+
continue
459+
}
460+
461+
stsUniqueId := getUniqueIdFromClientObject(&stss[i])
462+
exist := false
463+
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
464+
if ddc.Spec.ComputeGroups[j].UniqueId == stsUniqueId {
465+
exist = true
466+
break
467+
}
468+
}
469+
if !exist {
470+
unusedStsNames = append(unusedStsNames, stss[i].Name)
471+
unusedUniqueIds = append(unusedUniqueIds, stsUniqueId)
472+
}
473+
}
474+
475+
return unusedStsNames, unusedUniqueIds
335476
}
336477

337478
// ClearStatefulsetUnusedPVCs
@@ -365,8 +506,17 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearStatefulsetUnusedPVCs(ctx
365506
}
366507

367508
if cg != nil {
368-
replicas := int(*cg.Replicas)
509+
//we should use statefulset replicas for avoiding the phase=scaleDown, when phase `scaleDown` cg' replicas is less than statefuslet.
510+
replicas := 0
369511
stsName := ddc.GetCGStatefulsetName(cg)
512+
sts, err := k8s.GetStatefulSet(ctx, dcgs.K8sclient, ddc.Namespace, stsName)
513+
if err != nil {
514+
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs get statefulset namespace=%s, name=%s, failed, err=%s", ddc.Namespace, stsName, err.Error())
515+
//waiting next reconciling.
516+
return nil
517+
}
518+
replicas = int(*sts.Spec.Replicas)
519+
370520
cvs := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.BE_RESOLVEKEY, cg.CommonSpec.ConfigMaps)
371521
paths, _ := dcgs.getCacheMaxSizeAndPaths(cvs)
372522

pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (dcgs *DisaggregatedComputeGroupsController) newService(ddc *dv1.DorisDisag
3232

3333
ob := &svc.ObjectMeta
3434
ob.Name = ddc.GetCGServiceName(cg)
35-
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Namespace, uniqueId)
35+
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId)
3636

3737
spec := &svc.Spec
3838
spec.Selector = dcgs.newCGPodsSelector(ddc.Name, uniqueId)

pkg/controller/sub_controller/disaggregated_cluster/computegroups/statefulset.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,17 @@ const (
4646

4747
// generate statefulset or service labels
4848
func (dcgs *DisaggregatedComputeGroupsController) newCG2LayerSchedulerLabels(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
49+
labels := dcgs.GetCG2LayerCommonSchedulerLabels(ddcName)
50+
labels[dv1.DorisDisaggregatedComputeGroupUniqueId] = uniqueId
51+
return labels
52+
}
53+
54+
func (dcgs *DisaggregatedComputeGroupsController) GetCG2LayerCommonSchedulerLabels(ddcName string) map[string]string {
4955
return map[string]string{
50-
dv1.DorisDisaggregatedClusterName: ddcName,
51-
dv1.DorisDisaggregatedComputeGroupUniqueId: uniqueId,
52-
dv1.DorisDisaggregatedOwnerReference: ddcName,
56+
dv1.DorisDisaggregatedClusterName: ddcName,
57+
dv1.DorisDisaggregatedOwnerReference: ddcName,
5358
}
5459
}
55-
5660
func (dcgs *DisaggregatedComputeGroupsController) newCGPodsSelector(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
5761
return map[string]string{
5862
dv1.DorisDisaggregatedClusterName: ddcName,

pkg/controller/sub_controller/disaggregated_cluster/computegroups/util.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,36 @@
1717

1818
package computegroups
1919

20+
import (
21+
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
)
24+
2025
// regex
2126
var (
2227
compute_group_name_regex = "[a-zA-Z](_?[0-9a-zA-Z])*"
2328
compute_group_id_regex = "[a-zA-Z](_?[0-9a-zA-Z])*"
2429
)
30+
31+
func ownerReference2ddc(obj client.Object, cluster *dv1.DorisDisaggregatedCluster) bool {
32+
if obj == nil {
33+
return false
34+
}
35+
36+
ors := obj.GetOwnerReferences()
37+
for _, or := range ors {
38+
if or.Name == cluster.Name && or.UID == cluster.UID {
39+
return true
40+
}
41+
}
42+
43+
return false
44+
}
45+
46+
func getUniqueIdFromClientObject(obj client.Object) string {
47+
if obj == nil {
48+
return ""
49+
}
50+
labels := obj.GetLabels()
51+
return labels[dv1.DorisDisaggregatedComputeGroupUniqueId]
52+
}

pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (dfc *DisaggregatedFEController) newService(ddc *dv1.DorisDisaggregatedClus
3131
svc := dfc.NewDefaultService(ddc)
3232
om := &svc.ObjectMeta
3333
om.Name = ddc.GetFEServiceName()
34-
om.Labels = dfc.newFESchedulerLabels(ddc.Namespace)
34+
om.Labels = dfc.newFESchedulerLabels(ddc.Name)
3535

3636
spec := &svc.Spec
3737
spec.Selector = dfc.newFEPodsSelector(ddc.Name)

0 commit comments

Comments
 (0)