Skip to content

Commit 1b605e7

Browse files
committed
refactor sharding action
1 parent e3b3e94 commit 1b605e7

File tree

2 files changed

+134
-98
lines changed

2 files changed

+134
-98
lines changed

controllers/apps/cluster/transformer_cluster_component.go

Lines changed: 130 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"strings"
3030
"time"
3131

32-
"github.com/pkg/errors"
3332
"golang.org/x/exp/maps"
3433
corev1 "k8s.io/api/core/v1"
3534
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -910,38 +909,31 @@ func (h *clusterShardingHandler) delete(transCtx *clusterTransformContext, dag *
910909
return err
911910
}
912911

913-
needPreTerminate := func() bool {
914-
if shardingDef.Spec.LifecycleActions == nil || shardingDef.Spec.LifecycleActions.PreTerminate == nil {
915-
return false
916-
}
917-
918-
for _, comp := range runningComps {
919-
if comp.Annotations != nil && comp.Annotations[kbShardingPreTerminateDoneKey] != "" {
920-
return false
912+
errs := make([]error, 0)
913+
if shardingDef.Spec.LifecycleActions != nil && shardingDef.Spec.LifecycleActions.PreTerminate != nil {
914+
if shardingDef.Spec.LifecycleActions.PostProvision != nil {
915+
for _, comp := range runningComps {
916+
if comp.Annotations != nil && comp.Annotations[kbShardingPostProvisionKey] != "" {
917+
return fmt.Errorf("sharding post-provision not done yet, waiting for completion")
918+
}
921919
}
922920
}
923-
return true
924-
}
925921

926-
errs := make([]error, 0)
927-
if needPreTerminate() {
928922
var shards []*appsv1.Component
929923
shards, err = selectTargetShard(shardingDef.Spec.LifecycleActions.PreTerminate, generics.SlicePtr(runningComps))
930924
if err != nil {
931925
return err
932926
}
933927

934928
for _, shard := range shards {
935-
err = doShardingLifecycleAction(transCtx, dag, shardingDef, shard, name, kbShardingPreTerminateAction)
929+
err = shardingPreTerminate(transCtx, dag, shardingDef.Spec.LifecycleActions.PreTerminate, shard, name)
936930
if err != nil {
937-
transCtx.Logger.Error(err, "failed to do shard lifecycle actions", "shard", shard.Name)
938931
errs = append(errs, err)
939-
continue
940932
}
941933
}
942934
}
943935
if len(errs) > 0 {
944-
return nil
936+
return ictrlutil.NewRequeueError(time.Second, fmt.Sprintf("%v", errs))
945937
}
946938

947939
graphCli, _ := transCtx.Client.(model.GraphClient)
@@ -996,6 +988,11 @@ func (h *clusterShardingHandler) update(transCtx *clusterTransformContext, dag *
996988
toCreate, toDelete, toUpdate := mapDiff(runningCompsMap, protoCompsMap)
997989

998990
shardingDef := getShardingDef(transCtx, name)
991+
err := h.handleShardingPostProvision(transCtx, dag, runningCompsMap, toUpdate, shardingDef, name)
992+
if err != nil {
993+
return err
994+
}
995+
999996
// TODO: update strategy
1000997
h.deleteComps(transCtx, dag, runningCompsMap, toDelete, name, shardingDef)
1001998
h.updateComps(transCtx, dag, runningCompsMap, protoCompsMap, toUpdate, name, shardingDef)
@@ -1004,6 +1001,23 @@ func (h *clusterShardingHandler) update(transCtx *clusterTransformContext, dag *
10041001
return nil
10051002
}
10061003

1004+
func (h *clusterShardingHandler) handleShardingPostProvision(transCtx *clusterTransformContext, dag *graph.DAG,
1005+
runningComps map[string]*appsv1.Component, updateSet sets.Set[string],
1006+
shardingDef *appsv1.ShardingDefinition, shardingName string) error {
1007+
errs := make([]error, 0)
1008+
for name := range updateSet {
1009+
err := shardingPostProvision(transCtx, dag, shardingDef, runningComps[name], shardingName)
1010+
if err != nil {
1011+
errs = append(errs, err)
1012+
}
1013+
}
1014+
1015+
if len(errs) > 0 {
1016+
return ictrlutil.NewRequeueError(time.Second, fmt.Sprintf("%v", errs))
1017+
}
1018+
return nil
1019+
}
1020+
10071021
func (h *clusterShardingHandler) createComps(transCtx *clusterTransformContext, dag *graph.DAG,
10081022
protoComps map[string]*appsv1.Component, createSet sets.Set[string], shardingDef *appsv1.ShardingDefinition) {
10091023
graphCli, _ := transCtx.Client.(model.GraphClient)
@@ -1031,13 +1045,19 @@ func (h *clusterShardingHandler) deleteComps(transCtx *clusterTransformContext,
10311045
shardingName string, shardingDef *appsv1.ShardingDefinition) {
10321046
graphCli, _ := transCtx.Client.(model.GraphClient)
10331047

1048+
errs := make([]error, 0)
10341049
for name := range deleteSet {
1035-
err := doShardingLifecycleAction(transCtx, dag, shardingDef, runningComps[name], shardingName, kbShardingRemoveAction)
1050+
err := shardRemove(transCtx, dag, shardingDef, runningComps[name], shardingName)
10361051
if err != nil {
1037-
transCtx.Logger.Error(err, "failed to do shard lifecycle actions", "shard", name)
1038-
continue
1052+
transCtx.Logger.Error(err, "failed to do shard remove", "shard", name)
1053+
errs = append(errs, err)
10391054
}
1055+
}
1056+
if len(errs) > 0 {
1057+
return
1058+
}
10401059

1060+
for name := range deleteSet {
10411061
h.deleteComp(transCtx, graphCli, dag, runningComps[name], pointer.Bool(true))
10421062
}
10431063
}
@@ -1046,14 +1066,20 @@ func (h *clusterShardingHandler) updateComps(transCtx *clusterTransformContext,
10461066
runningComps map[string]*appsv1.Component, protoComps map[string]*appsv1.Component, updateSet sets.Set[string],
10471067
shardingName string, shardingDef *appsv1.ShardingDefinition) {
10481068
graphCli, _ := transCtx.Client.(model.GraphClient)
1069+
errs := make([]error, 0)
10491070
for name := range updateSet {
1050-
running, proto := runningComps[name], protoComps[name]
1051-
err := handleShardingAddNPostProvision(transCtx, dag, shardingDef, running, shardingName)
1071+
err := shardAdd(transCtx, dag, shardingDef, runningComps[name], shardingName)
10521072
if err != nil {
1053-
transCtx.Logger.Error(err, "failed to do shard lifecycle actions", "shard", name)
1054-
continue
1073+
transCtx.Logger.Error(err, "failed to do shard add", "shard", name)
1074+
errs = append(errs, err)
10551075
}
1076+
}
1077+
if len(errs) > 0 {
1078+
return
1079+
}
10561080

1081+
for name := range updateSet {
1082+
running, proto := runningComps[name], protoComps[name]
10571083
if obj := copyAndMergeComponent(running, proto); obj != nil {
10581084
graphCli.Update(dag, running, obj)
10591085
}
@@ -1465,83 +1491,63 @@ func lifecycleAction4Sharding(transCtx *clusterTransformContext, comp *appsv1.Co
14651491
return component.BuildLifecycleAgent(transCtx.Context, transCtx.Client, compDef, comp, transCtx.Cluster.Namespace, transCtx.Cluster.Name)
14661492
}
14671493

1468-
func handleShardingAddNPostProvision(transCtx *clusterTransformContext, dag *graph.DAG, shardingDef *appsv1.ShardingDefinition, comp *appsv1.Component, shardingName string) error {
1469-
// if both post-provision and shard-add annotations exist, do post-provision first
1470-
if comp.Annotations[kbShardingPostProvisionKey] != "" {
1471-
return doShardingLifecycleAction(transCtx, dag, shardingDef, comp, shardingName, kbShardingPostProvisionAction)
1472-
} else if comp.Annotations[kbShardingAddKey] != "" {
1473-
return doShardingLifecycleAction(transCtx, dag, shardingDef, comp, shardingName, kbShardingAddAction)
1494+
func shardingPostProvision(transCtx *clusterTransformContext, dag *graph.DAG,
1495+
shardingDef *appsv1.ShardingDefinition,
1496+
comp *appsv1.Component,
1497+
shardingName string) error {
1498+
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil || shardingDef.Spec.LifecycleActions.PostProvision == nil {
1499+
return nil
14741500
}
14751501

1476-
return nil
1502+
if checkShardingActionDone(comp, false, kbShardingPostProvisionKey) {
1503+
return nil
1504+
}
1505+
1506+
lfa, err := lifecycleAction4Sharding(transCtx, comp)
1507+
if err != nil {
1508+
return err
1509+
}
1510+
1511+
err = lfa.UserDefined(transCtx.Context, transCtx.Client, &lifecycle.Options{
1512+
PreConditionObjectSelector: constant.GetClusterLabels(transCtx.Cluster.Name, map[string]string{constant.KBAppShardingNameLabelKey: shardingName}),
1513+
}, kbShardingPostProvisionAction, &shardingDef.Spec.LifecycleActions.PostProvision.Action, nil)
1514+
if err != nil {
1515+
return lifecycle.IgnoreNotDefined(err)
1516+
}
1517+
markShardingActionDone(transCtx, dag, comp, false, kbShardingPostProvisionKey)
1518+
return ictrlutil.NewErrorf(ictrlutil.ErrorTypeRequeue, "requeue to waiting for %s annotation to be removed", kbShardingPostProvisionAction)
14771519
}
14781520

1479-
func doShardingLifecycleAction(transCtx *clusterTransformContext,
1480-
dag *graph.DAG,
1481-
shardingDef *appsv1.ShardingDefinition,
1521+
func shardingPreTerminate(transCtx *clusterTransformContext, dag *graph.DAG,
1522+
preTerminate *appsv1.ShardingAction,
14821523
comp *appsv1.Component,
1483-
shardingName string,
1484-
actionName string) error {
1485-
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil {
1524+
shardingName string) error {
1525+
if checkShardingActionDone(comp, true, kbShardingPreTerminateDoneKey) {
14861526
return nil
14871527
}
14881528

1489-
var checkAnnotationExist bool
1490-
var annotation string
1491-
var needsAction func() bool
1492-
var action *appsv1.Action
1493-
var args map[string]string
1494-
switch actionName {
1495-
case kbShardingPostProvisionAction:
1496-
checkAnnotationExist = false
1497-
annotation = kbShardingPostProvisionKey
1498-
action = &shardingDef.Spec.LifecycleActions.PostProvision.Action
1499-
needsAction = func() bool {
1500-
return action != nil
1501-
}
1502-
case kbShardingPreTerminateAction:
1503-
checkAnnotationExist = true
1504-
annotation = kbShardingPreTerminateDoneKey
1505-
action = &shardingDef.Spec.LifecycleActions.PreTerminate.Action
1506-
needsAction = func() bool {
1507-
return action != nil
1508-
}
1509-
case kbShardingRemoveAction:
1510-
checkAnnotationExist = true
1511-
annotation = kbShardingRemoveDoneKey
1512-
action = &shardingDef.Spec.LifecycleActions.ShardRemove.Action
1513-
needsAction = func() bool {
1514-
if action == nil {
1515-
return false
1516-
}
1517-
if comp.Annotations != nil && comp.Annotations[kbShardingAddKey] != "" {
1518-
// shardAdd is not done yet, skip shardRemove
1519-
return false
1520-
}
1521-
return true
1522-
}
1523-
args = map[string]string{
1524-
shardRemoveShardNameVar: shardingName,
1525-
}
1526-
case kbShardingAddAction:
1527-
checkAnnotationExist = false
1528-
annotation = kbShardingAddKey
1529-
action = &shardingDef.Spec.LifecycleActions.ShardAdd.Action
1530-
needsAction = func() bool {
1531-
return action != nil
1532-
}
1533-
args = map[string]string{
1534-
shardAddShardNameVar: shardingName,
1535-
}
1536-
default:
1537-
return fmt.Errorf("unknown sharding lifecycle action: %s", actionName)
1529+
lfa, err := lifecycleAction4Sharding(transCtx, comp)
1530+
if err != nil {
1531+
return err
15381532
}
15391533

1540-
if !needsAction() {
1534+
err = lfa.UserDefined(transCtx.Context, transCtx.Client, &lifecycle.Options{
1535+
PreConditionObjectSelector: constant.GetClusterLabels(transCtx.Cluster.Name, map[string]string{constant.KBAppShardingNameLabelKey: shardingName}),
1536+
}, kbShardingPreTerminateAction, &preTerminate.Action, nil)
1537+
if err != nil {
1538+
return lifecycle.IgnoreNotDefined(err)
1539+
}
1540+
markShardingActionDone(transCtx, dag, comp, true, kbShardingPreTerminateDoneKey)
1541+
return ictrlutil.NewErrorf(ictrlutil.ErrorTypeRequeue, "requeue to waiting for %s annotation to be set", kbShardingPreTerminateAction)
1542+
}
1543+
1544+
func shardAdd(transCtx *clusterTransformContext, dag *graph.DAG,
1545+
shardingDef *appsv1.ShardingDefinition, comp *appsv1.Component, shardingName string) error {
1546+
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil || shardingDef.Spec.LifecycleActions.ShardAdd == nil {
15411547
return nil
15421548
}
15431549

1544-
if checkShardingActionDone(comp, checkAnnotationExist, annotation) {
1550+
if checkShardingActionDone(comp, false, kbShardingAddKey) {
15451551
return nil
15461552
}
15471553

@@ -1552,19 +1558,47 @@ func doShardingLifecycleAction(transCtx *clusterTransformContext,
15521558

15531559
err = lfa.UserDefined(transCtx.Context, transCtx.Client, &lifecycle.Options{
15541560
PreConditionObjectSelector: constant.GetClusterLabels(transCtx.Cluster.Name, map[string]string{constant.KBAppShardingNameLabelKey: shardingName}),
1555-
}, actionName, action, args)
1561+
}, kbShardingAddAction, &shardingDef.Spec.LifecycleActions.ShardAdd.Action, map[string]string{shardAddShardNameVar: shardingName})
15561562
if err != nil {
1557-
err = lifecycle.IgnoreNotDefined(err)
1558-
if errors.Is(err, lifecycle.ErrPreconditionFailed) {
1559-
err = fmt.Errorf("%w: %w", ictrlutil.NewDelayedRequeueError(time.Second*10, fmt.Sprintf("wait for %s action precondition", actionName)), err)
1560-
} else {
1561-
err = fmt.Errorf("%w: %w", ictrlutil.NewRequeueError(time.Second*5, fmt.Sprintf("%s action failed", actionName)), err)
1563+
return lifecycle.IgnoreNotDefined(err)
1564+
}
1565+
1566+
markShardingActionDone(transCtx, dag, comp, false, kbShardingAddKey)
1567+
return ictrlutil.NewErrorf(ictrlutil.ErrorTypeRequeue, "requeue to waiting for %s annotation to be removed", kbShardingAddAction)
1568+
}
1569+
1570+
func shardRemove(transCtx *clusterTransformContext, dag *graph.DAG,
1571+
shardingDef *appsv1.ShardingDefinition,
1572+
comp *appsv1.Component,
1573+
shardingName string) error {
1574+
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil || shardingDef.Spec.LifecycleActions.ShardRemove == nil {
1575+
return nil
1576+
}
1577+
1578+
if shardingDef.Spec.LifecycleActions.ShardAdd != nil {
1579+
if comp.Annotations != nil && comp.Annotations[kbShardingAddKey] != "" {
1580+
return fmt.Errorf("sharding add action not done yet, waiting for completion")
15621581
}
1582+
}
1583+
1584+
if checkShardingActionDone(comp, true, kbShardingRemoveDoneKey) {
1585+
return nil
1586+
}
1587+
1588+
lfa, err := lifecycleAction4Sharding(transCtx, comp)
1589+
if err != nil {
15631590
return err
15641591
}
15651592

1566-
markShardingActionDone(transCtx, dag, comp, checkAnnotationExist, annotation)
1567-
return ictrlutil.NewErrorf(ictrlutil.ErrorTypeRequeue, "requeue to waiting for %s annotation to be set", actionName)
1593+
err = lfa.UserDefined(transCtx.Context, transCtx.Client, &lifecycle.Options{
1594+
PreConditionObjectSelector: constant.GetClusterLabels(transCtx.Cluster.Name, map[string]string{constant.KBAppShardingNameLabelKey: shardingName}),
1595+
}, kbShardingRemoveAction, &shardingDef.Spec.LifecycleActions.ShardRemove.Action, map[string]string{shardRemoveShardNameVar: shardingName})
1596+
if err != nil {
1597+
return lifecycle.IgnoreNotDefined(err)
1598+
}
1599+
1600+
markShardingActionDone(transCtx, dag, comp, true, kbShardingRemoveDoneKey)
1601+
return ictrlutil.NewErrorf(ictrlutil.ErrorTypeRequeue, "requeue to waiting for %s annotation to be set", kbShardingRemoveAction)
15681602
}
15691603

15701604
func checkShardingActionDone(comp *appsv1.Component, checkExist bool, annotation string) bool {

controllers/apps/cluster/transformer_cluster_component_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,7 +1986,8 @@ var _ = Describe("cluster component transformer test", func() {
19861986
mockKBAgent(kbShardingPostProvisionAction)
19871987

19881988
err := transformer.Transform(transCtx, dag)
1989-
Expect(err).Should(BeNil())
1989+
Expect(err).ShouldNot(BeNil())
1990+
Expect(err.Error()).Should(ContainSubstring("requeue to waiting for shardPostProvision annotation to be removed"))
19901991
Expect(actionDone).Should(BeTrue())
19911992
graphCli := transCtx.Client.(model.GraphClient)
19921993
objs := graphCli.FindAll(dag, &appsv1.Component{})
@@ -2022,7 +2023,8 @@ var _ = Describe("cluster component transformer test", func() {
20222023
mockKBAgent(kbShardingPreTerminateAction)
20232024

20242025
err := transformer.Transform(transCtx, dag)
2025-
Expect(err).Should(BeNil())
2026+
Expect(err).ShouldNot(BeNil())
2027+
Expect(err.Error()).Should(ContainSubstring("requeue to waiting for shardPreTerminate annotation to be set"))
20262028
graphCli := transCtx.Client.(model.GraphClient)
20272029
objs := graphCli.FindAll(dag, &appsv1.Component{})
20282030
Expect(len(objs)).Should(Equal(1))

0 commit comments

Comments
 (0)