Skip to content

Commit dc6049e

Browse files
committed
Introduce an instance control strategy, with a stateful implementation. Controller to follow.
1 parent ef7dca4 commit dc6049e

File tree

11 files changed

+606
-5
lines changed

11 files changed

+606
-5
lines changed

api/v1alpha/instance_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ type InstanceStatus struct {
339339
NetworkInterfaces []InstanceNetworkInterfaceStatus `json:"networkInterfaces,omitempty"`
340340
}
341341

342+
const (
343+
// InstanceReady indicates that the instance is ready
344+
InstanceReady = "Ready"
345+
)
346+
342347
type InstanceTemplateSpec struct {
343348
// Metadata of the instances created from this template
344349
//

api/v1alpha/labels.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ const (
55

66
WorkloadUIDLabel = LabelNamespace + "/workload-uid"
77
WorkloadDeploymentUIDLabel = LabelNamespace + "/workload-deployment-uid"
8+
9+
InstanceIndexLabel = LabelNamespace + "/instance-index"
810
)

api/v1alpha/workload_types.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,23 @@ type HorizontalScaleSettings struct {
165165

166166
// TODO(jreese) wire in behavior
167167
// See https://github.com/kubernetes/kubernetes/blob/dd87bc064631354885193fc1a97d0e7b603e77b4/staging/src/k8s.io/api/autoscaling/v2/types.go#L84
168+
// Defines the policy for managing instances.
169+
170+
// Controls how instances are managed during scale up and down, as well as
171+
// during maintenance events.
172+
//
173+
// +kubebuilder:validation:Required
174+
// +kubebuilder:default=OrderedReady
175+
InstanceManagementPolicy InstanceManagementPolicyType `json:"instanceManagementPolicy,omitempty"`
168176
}
169177

178+
type InstanceManagementPolicyType string
179+
180+
const (
181+
OrderedReadyInstanceManagementPolicyType InstanceManagementPolicyType = "OrderedReady"
182+
// ParallelInstanceManagementPolicyType InstanceManagementPolicyType = "Parallel"
183+
)
184+
170185
type MetricSpec struct {
171186
// Resource metrics known to Datum.
172187
//

config/crd/bases/compute.datumapis.com_workloaddeployments.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ spec:
6666
scaleSettings:
6767
description: Scale settings such as minimum and maximum replica counts.
6868
properties:
69+
instanceManagementPolicy:
70+
default: OrderedReady
71+
description: |-
72+
Controls how instances are managed during scale up and down, as well as
73+
during maintenance events.
74+
type: string
6975
maxReplicas:
7076
description: The maximum number of replicas.
7177
format: int32
@@ -120,6 +126,7 @@ spec:
120126
format: int32
121127
type: integer
122128
required:
129+
- instanceManagementPolicy
123130
- minReplicas
124131
type: object
125132
template:

config/crd/bases/compute.datumapis.com_workloads.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ spec:
6868
description: Scale settings such as minimum and maximum replica
6969
counts.
7070
properties:
71+
instanceManagementPolicy:
72+
default: OrderedReady
73+
description: |-
74+
Controls how instances are managed during scale up and down, as well as
75+
during maintenance events.
76+
type: string
7177
maxReplicas:
7278
description: The maximum number of replicas.
7379
format: int32
@@ -122,6 +128,7 @@ spec:
122128
format: int32
123129
type: integer
124130
required:
131+
- instanceManagementPolicy
125132
- minReplicas
126133
type: object
127134
required:
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package instancecontrol
2+
3+
import (
4+
"context"
5+
6+
"k8s.io/apimachinery/pkg/runtime"
7+
"sigs.k8s.io/controller-runtime/pkg/client"
8+
9+
"go.datum.net/workload-operator/api/v1alpha"
10+
)
11+
12+
type Strategy interface {
13+
// GetActions returns a set of actions that should be taken in order to drive
14+
// the workload deployment to the desired state. Some actions may be informational,
15+
// such as providing context into pending actions when instance management
16+
// policies may require waiting for pod readiness.
17+
GetActions(
18+
ctx context.Context,
19+
scheme *runtime.Scheme,
20+
deployment *v1alpha.WorkloadDeployment,
21+
currentInstances []v1alpha.Instance,
22+
) ([]Action, error)
23+
}
24+
25+
type ActionType int
26+
27+
const (
28+
ActionTypeCreate ActionType = iota
29+
ActionTypeUpdate
30+
ActionTypeDelete
31+
ActionTypeWait
32+
)
33+
34+
type Action struct {
35+
ObjectName string
36+
actionType ActionType
37+
skipExecution bool
38+
fn func(ctx context.Context, c client.Client) error
39+
}
40+
41+
func (a Action) Execute(ctx context.Context, c client.Client) error {
42+
if a.skipExecution {
43+
return nil
44+
}
45+
46+
return a.fn(ctx, c)
47+
}
48+
49+
func (a *Action) SkipExecution() {
50+
a.skipExecution = true
51+
}
52+
53+
func (a Action) IsSkipped() bool {
54+
return a.skipExecution
55+
}
56+
57+
func (a Action) ActionType() ActionType {
58+
return a.actionType
59+
}
60+
61+
func NewCreateAction(objectName string, f func(ctx context.Context, c client.Client) error) Action {
62+
return Action{
63+
ObjectName: objectName,
64+
actionType: ActionTypeCreate,
65+
fn: f,
66+
}
67+
}
68+
69+
func NewUpdateAction(objectName string, f func(ctx context.Context, c client.Client) error) Action {
70+
return Action{
71+
ObjectName: objectName,
72+
actionType: ActionTypeUpdate,
73+
fn: f,
74+
}
75+
}
76+
77+
func NewDeleteAction(objectName string, f func(ctx context.Context, c client.Client) error) Action {
78+
return Action{
79+
ObjectName: objectName,
80+
actionType: ActionTypeDelete,
81+
fn: f,
82+
}
83+
}
84+
85+
func NewWaitAction(objectName string) Action {
86+
return Action{
87+
ObjectName: objectName,
88+
actionType: ActionTypeWait,
89+
fn: func(ctx context.Context, c client.Client) error { return nil },
90+
}
91+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package stateful
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"slices"
7+
"strconv"
8+
9+
apimeta "k8s.io/apimachinery/pkg/api/meta"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
14+
15+
"go.datum.net/workload-operator/api/v1alpha"
16+
"go.datum.net/workload-operator/internal/controller/instancecontrol"
17+
)
18+
19+
// Behavior inspired by https://github.com/kubernetes/kubernetes/tree/master/pkg/controller/statefulset
20+
// Does not currently implement exact behavior.
21+
type statefulControl struct {
22+
}
23+
24+
func NewStatefulControl() instancecontrol.Strategy {
25+
return &statefulControl{}
26+
}
27+
28+
func (c *statefulControl) GetActions(
29+
ctx context.Context,
30+
scheme *runtime.Scheme,
31+
deployment *v1alpha.WorkloadDeployment,
32+
currentInstances []v1alpha.Instance,
33+
) ([]instancecontrol.Action, error) {
34+
// lowest -> highest
35+
var createActions []instancecontrol.Action
36+
var waitActions []instancecontrol.Action
37+
38+
// highest -> lowest
39+
var updateActions []instancecontrol.Action
40+
41+
// highest -> lowest
42+
var deleteActions []instancecontrol.Action
43+
44+
// Instances that are desired to exist. We do not currently support the
45+
// concept of a partition, so will fill the entire slice.
46+
desiredInstances := make([]*v1alpha.Instance, deployment.Spec.ScaleSettings.MinReplicas)
47+
48+
for _, instance := range currentInstances {
49+
instanceIndex := getInstanceOrdinal(instance.Name)
50+
if instanceIndex >= len(desiredInstances) {
51+
deleteActions = append(deleteActions, instancecontrol.NewDeleteAction(
52+
instance.Name,
53+
func(ctx context.Context, c client.Client) error {
54+
return c.Delete(ctx, &instance)
55+
},
56+
))
57+
} else {
58+
desiredInstances[instanceIndex] = &instance
59+
}
60+
}
61+
62+
// It's possible that the incoming currentInstances will have gaps in
63+
// instances, so fill them in.
64+
for i := range deployment.Spec.ScaleSettings.MinReplicas {
65+
if desiredInstances[i] == nil {
66+
desiredInstances[i] = &v1alpha.Instance{
67+
ObjectMeta: metav1.ObjectMeta{
68+
Labels: deployment.Spec.Template.Labels,
69+
Annotations: deployment.Spec.Template.Annotations,
70+
Name: fmt.Sprintf("%s-%d", deployment.Name, i),
71+
Namespace: deployment.Namespace,
72+
},
73+
Spec: deployment.Spec.Template.Spec,
74+
}
75+
76+
if desiredInstances[i].Labels == nil {
77+
desiredInstances[i].Labels = map[string]string{}
78+
}
79+
80+
desiredInstances[i].Labels[v1alpha.InstanceIndexLabel] = strconv.Itoa(int(i))
81+
82+
if err := controllerutil.SetControllerReference(deployment, desiredInstances[i], scheme); err != nil {
83+
return nil, fmt.Errorf("failed to set controller reference: %w", err)
84+
}
85+
}
86+
}
87+
88+
for _, instance := range desiredInstances {
89+
if instance.CreationTimestamp.IsZero() {
90+
action := instancecontrol.NewCreateAction(
91+
instance.Name,
92+
func(ctx context.Context, c client.Client) error {
93+
if err := c.Create(ctx, instance); err != nil {
94+
return fmt.Errorf("failed to create instance: %w", err)
95+
}
96+
97+
return nil
98+
},
99+
)
100+
101+
createActions = append(createActions, action)
102+
} else if !instance.DeletionTimestamp.IsZero() {
103+
// Wait for graceful deletion before continuing processing additional
104+
// instances.
105+
waitActions = append(waitActions, instancecontrol.NewWaitAction(
106+
instance.Name,
107+
))
108+
109+
} else if instance.DeletionTimestamp.IsZero() {
110+
// Wait for the instance to be ready before continuing processing
111+
if !apimeta.IsStatusConditionTrue(instance.Status.Conditions, v1alpha.InstanceReady) {
112+
waitActions = append(waitActions, instancecontrol.NewWaitAction(
113+
instance.Name,
114+
))
115+
} else if needsUpdate(instance, deployment) {
116+
updateActions = append(updateActions, instancecontrol.NewUpdateAction(
117+
instance.Name,
118+
func(ctx context.Context, c client.Client) error {
119+
instance.Annotations = deployment.Spec.Template.Annotations
120+
instance.Labels = deployment.Spec.Template.Labels
121+
instance.Spec = deployment.Spec.Template.Spec
122+
123+
if err := c.Update(ctx, instance); err != nil {
124+
return fmt.Errorf("failed to update instance: %w", err)
125+
}
126+
127+
return nil
128+
},
129+
))
130+
}
131+
}
132+
}
133+
134+
slices.SortFunc(updateActions, descendingOrdinal)
135+
slices.SortFunc(deleteActions, descendingOrdinal)
136+
137+
actions := make([]instancecontrol.Action, 0, len(createActions)+len(waitActions)+len(updateActions)+len(deleteActions))
138+
139+
switch deployment.Spec.ScaleSettings.InstanceManagementPolicy {
140+
case v1alpha.OrderedReadyInstanceManagementPolicyType:
141+
142+
// Add create and wait actions, and sort by ordinal. This allows us to wait
143+
// for instances to be processed in the correct order.
144+
//
145+
// For instance, we may have instance 0 that needs to wait to be ready, but
146+
// instance 1 wants to be created.
147+
actions = append(actions, createActions...)
148+
actions = append(actions, waitActions...)
149+
150+
slices.SortFunc(actions, ascendingOrdinal)
151+
152+
actions = append(actions, updateActions...)
153+
actions = append(actions, deleteActions...)
154+
155+
// Skip all actions except the first one.
156+
for i := range actions {
157+
if i > 0 {
158+
actions[i].SkipExecution()
159+
}
160+
}
161+
162+
}
163+
164+
return actions, nil
165+
}

0 commit comments

Comments
 (0)