Skip to content

Commit cfcbd77

Browse files
committed
Update WorkloadDeploymentReconciler to leverage statful instance control for directly managing instances.
1 parent c0e0d26 commit cfcbd77

File tree

7 files changed

+186
-43
lines changed

7 files changed

+186
-43
lines changed

cmd/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ func main() {
178178
os.Exit(1)
179179
}
180180

181+
if err = controller.AddIndexers(ctx, mgr); err != nil {
182+
setupLog.Error(err, "unable to add indexers")
183+
os.Exit(1)
184+
}
185+
181186
// +kubebuilder:scaffold:builder
182187

183188
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {

internal/controller/indexers.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"k8s.io/apimachinery/pkg/types"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
11+
12+
computev1alpha "go.datum.net/workload-operator/api/v1alpha"
13+
)
14+
15+
const (
16+
deploymentWorkloadUIDIndex = "deploymentWorkloadUIDIndex"
17+
workloadNetworksIndex = "workloadNetworksIndex"
18+
)
19+
20+
func AddIndexers(ctx context.Context, mgr mcmanager.Manager) error {
21+
return errors.Join(
22+
addWorkloadDeploymentIndexers(ctx, mgr),
23+
addWorkloadIndexers(ctx, mgr),
24+
)
25+
}
26+
27+
func addWorkloadDeploymentIndexers(ctx context.Context, mgr mcmanager.Manager) error {
28+
if err := mgr.GetFieldIndexer().IndexField(ctx, &computev1alpha.WorkloadDeployment{}, deploymentWorkloadUIDIndex, deploymentWorkloadUIDIndexFunc); err != nil {
29+
return fmt.Errorf("failed to add workload deployment indexer %q: %w", deploymentWorkloadUIDIndex, err)
30+
}
31+
32+
return nil
33+
}
34+
35+
func deploymentWorkloadUIDIndexFunc(o client.Object) []string {
36+
return []string{
37+
string(o.(*computev1alpha.WorkloadDeployment).Spec.WorkloadRef.UID),
38+
}
39+
}
40+
41+
func addWorkloadIndexers(ctx context.Context, mgr mcmanager.Manager) error {
42+
if err := mgr.GetFieldIndexer().IndexField(ctx, &computev1alpha.Workload{}, workloadNetworksIndex, workloadNetworksIndexFunc); err != nil {
43+
return fmt.Errorf("failed to add workload indexer %q: %w", workloadNetworksIndex, err)
44+
}
45+
46+
return nil
47+
}
48+
49+
func workloadNetworksIndexFunc(o client.Object) []string {
50+
workload := o.(*computev1alpha.Workload)
51+
52+
networks := make([]string, 0, len(workload.Spec.Template.Spec.NetworkInterfaces))
53+
for _, network := range workload.Spec.Template.Spec.NetworkInterfaces {
54+
namespacedName := types.NamespacedName{
55+
Namespace: network.Network.Namespace,
56+
Name: network.Network.Name,
57+
}
58+
59+
if namespacedName.Namespace == "" {
60+
namespacedName.Namespace = workload.GetNamespace()
61+
}
62+
63+
networks = append(networks, namespacedName.String())
64+
}
65+
66+
return networks
67+
}

internal/controller/instancecontrol/instancecontrol.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ type Strategy interface {
2323
) ([]Action, error)
2424
}
2525

26-
type ActionType int
26+
type ActionType string
2727

2828
const (
29-
ActionTypeCreate ActionType = iota
30-
ActionTypeUpdate
31-
ActionTypeDelete
32-
ActionTypeWait
29+
ActionTypeCreate ActionType = "Create"
30+
ActionTypeUpdate ActionType = "Update"
31+
ActionTypeDelete ActionType = "Delete"
32+
ActionTypeWait ActionType = "Wait"
3333
)
3434

3535
type Action struct {

internal/controller/instancecontrol/stateful/stateful_control.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
type statefulControl struct {
2121
}
2222

23-
func NewStatefulControl() instancecontrol.Strategy {
23+
func New() instancecontrol.Strategy {
2424
return &statefulControl{}
2525
}
2626

@@ -80,11 +80,7 @@ func (c *statefulControl) GetActions(
8080
},
8181
}
8282

83-
if desiredInstances[i].Labels == nil {
84-
desiredInstances[i].Labels = map[string]string{}
85-
}
86-
87-
desiredInstances[i].Labels[v1alpha.InstanceIndexLabel] = strconv.Itoa(int(i))
83+
addInstanceControllerLabels(desiredInstances[i], getInstanceOrdinal(desiredInstances[i].Name), deployment)
8884

8985
if err := controllerutil.SetControllerReference(deployment, desiredInstances[i], scheme); err != nil {
9086
return nil, fmt.Errorf("failed to set controller reference: %w", err)
@@ -111,12 +107,7 @@ func (c *statefulControl) GetActions(
111107
updatedInstance.Annotations = deployment.Spec.Template.Annotations
112108
updatedInstance.Labels = deployment.Spec.Template.Labels
113109

114-
if updatedInstance.Labels == nil {
115-
updatedInstance.Labels = map[string]string{}
116-
}
117-
118-
// Shouldn't get removed, but just in case.
119-
updatedInstance.Labels[v1alpha.InstanceIndexLabel] = strconv.Itoa(getInstanceOrdinal(instance.Name))
110+
addInstanceControllerLabels(updatedInstance, getInstanceOrdinal(updatedInstance.Name), deployment)
120111

121112
updatedInstance.Spec = deployment.Spec.Template.Spec
122113
updateActions = append(updateActions, instancecontrol.NewUpdateAction(updatedInstance))
@@ -156,3 +147,13 @@ func (c *statefulControl) GetActions(
156147

157148
return actions, nil
158149
}
150+
151+
func addInstanceControllerLabels(instance *v1alpha.Instance, index int, deployment *v1alpha.WorkloadDeployment) {
152+
if instance.Labels == nil {
153+
instance.Labels = map[string]string{}
154+
}
155+
156+
instance.Labels[v1alpha.InstanceIndexLabel] = strconv.Itoa(index)
157+
instance.Labels[v1alpha.WorkloadUIDLabel] = string(deployment.Spec.WorkloadRef.UID)
158+
instance.Labels[v1alpha.WorkloadDeploymentUIDLabel] = string(deployment.GetUID())
159+
}

internal/controller/instancecontrol/stateful/stateful_control_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func init() {
2727

2828
func TestFreshDeployment(t *testing.T) {
2929
ctx := context.Background()
30-
control := NewStatefulControl()
30+
control := New()
3131

3232
deployment := getWorkloadDeployment("test-fresh-deploy", 2)
3333

@@ -49,7 +49,7 @@ func TestFreshDeployment(t *testing.T) {
4949

5050
func TestUpdateWithAllReadyInstances(t *testing.T) {
5151
ctx := context.Background()
52-
control := NewStatefulControl()
52+
control := New()
5353

5454
deployment := getWorkloadDeployment("test-deploy", 2)
5555

@@ -75,7 +75,7 @@ func TestUpdateWithAllReadyInstances(t *testing.T) {
7575

7676
func TestScaleUpWithNotReadyInstance(t *testing.T) {
7777
ctx := context.Background()
78-
control := NewStatefulControl()
78+
control := New()
7979

8080
deployment := getWorkloadDeployment("test-deploy", 3)
8181

@@ -105,7 +105,7 @@ func TestScaleUpWithNotReadyInstance(t *testing.T) {
105105

106106
func TestScaleUpWithDeletingReadyInstance(t *testing.T) {
107107
ctx := context.Background()
108-
control := NewStatefulControl()
108+
control := New()
109109

110110
deployment := getWorkloadDeployment("test-deploy", 3)
111111

@@ -132,7 +132,7 @@ func TestScaleUpWithDeletingReadyInstance(t *testing.T) {
132132

133133
func TestScaleDownWithAllReadyInstances(t *testing.T) {
134134
ctx := context.Background()
135-
control := NewStatefulControl()
135+
control := New()
136136

137137
deployment := getWorkloadDeployment("test-deploy", 1)
138138

internal/controller/workload_controller.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ import (
1212
apierrors "k8s.io/apimachinery/pkg/api/errors"
1313
apimeta "k8s.io/apimachinery/pkg/api/meta"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/types"
1516
kerrors "k8s.io/apimachinery/pkg/util/errors"
1617
"k8s.io/apimachinery/pkg/util/sets"
1718
ctrl "sigs.k8s.io/controller-runtime"
1819
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/controller-runtime/pkg/cluster"
1921
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2022
"sigs.k8s.io/controller-runtime/pkg/finalizer"
23+
"sigs.k8s.io/controller-runtime/pkg/handler"
2124
"sigs.k8s.io/controller-runtime/pkg/log"
25+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2226
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
2327
mccontext "sigs.k8s.io/multicluster-runtime/pkg/context"
2428
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
@@ -29,7 +33,6 @@ import (
2933
)
3034

3135
const workloadControllerFinalizer = "compute.datumapis.com/workload-controller"
32-
const deploymentWorkloadUID = "spec.workloadRef.uid"
3336

3437
// WorkloadReconciler reconciles a Workload object
3538
type WorkloadReconciler struct {
@@ -296,7 +299,7 @@ func (r *WorkloadReconciler) reconcileWorkloadStatus(
296299

297300
workload.Status = *newWorkloadStatus
298301
if err := upstreamClient.Status().Update(ctx, workload); err != nil {
299-
return fmt.Errorf("failed updating workload status")
302+
return fmt.Errorf("failed updating workload status: %w", err)
300303
}
301304

302305
return nil
@@ -317,7 +320,7 @@ func (r *WorkloadReconciler) Finalize(ctx context.Context, obj client.Object) (f
317320
}
318321

319322
listOpts := client.MatchingFields{
320-
deploymentWorkloadUID: string(obj.GetUID()),
323+
deploymentWorkloadUIDIndex: string(obj.GetUID()),
321324
}
322325
var deployments computev1alpha.WorkloadDeploymentList
323326
if err := cl.GetClient().List(ctx, &deployments, listOpts); err != nil {
@@ -368,7 +371,7 @@ func (r *WorkloadReconciler) getDeploymentsForWorkload(
368371
) (desired []computev1alpha.WorkloadDeployment, orphaned []computev1alpha.WorkloadDeployment, err error) {
369372

370373
listOpts := client.MatchingFields{
371-
deploymentWorkloadUID: string(workload.UID),
374+
deploymentWorkloadUIDIndex: string(workload.UID),
372375
}
373376
var deployments computev1alpha.WorkloadDeploymentList
374377
if err := upstreamClient.List(ctx, &deployments, listOpts); err != nil {
@@ -462,22 +465,46 @@ func (r *WorkloadReconciler) SetupWithManager(mgr mcmanager.Manager) error {
462465
return fmt.Errorf("failed to register finalizer: %w", err)
463466
}
464467

465-
// TODO(jreese) move to indexer package
466-
467-
err := mgr.GetFieldIndexer().IndexField(context.Background(), &computev1alpha.WorkloadDeployment{}, deploymentWorkloadUID, func(o client.Object) []string {
468-
return []string{
469-
string(o.(*computev1alpha.WorkloadDeployment).Spec.WorkloadRef.UID),
470-
}
471-
})
472-
if err != nil {
473-
return fmt.Errorf("failed to add workload deployment field indexer: %w", err)
474-
}
475-
476-
// TODO(jreese) add watch against networks that triggers a reconcile for
477-
// workloads that are attached and are in an error state for networks not
478-
// existing.
479468
return mcbuilder.ControllerManagedBy(mgr).
480469
For(&computev1alpha.Workload{}, mcbuilder.WithEngageWithLocalCluster(false)).
481470
Owns(&computev1alpha.WorkloadDeployment{}, mcbuilder.WithEngageWithLocalCluster(false)).
471+
Watches(&networkingv1alpha.Network{}, func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
472+
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, network client.Object) []mcreconcile.Request {
473+
logger := log.FromContext(ctx)
474+
475+
cluster, err := mgr.GetCluster(ctx, clusterName)
476+
if err != nil {
477+
logger.Error(err, "failed to get cluster")
478+
return nil
479+
}
480+
clusterClient := cluster.GetClient()
481+
482+
networkName := client.ObjectKeyFromObject(network).String()
483+
listOpts := client.MatchingFields{
484+
workloadNetworksIndex: networkName,
485+
}
486+
487+
var workloads computev1alpha.WorkloadList
488+
if err := clusterClient.List(ctx, &workloads, listOpts); err != nil {
489+
logger.Error(err, "failed to list workloads")
490+
return nil
491+
}
492+
493+
var requests []mcreconcile.Request
494+
for _, workload := range workloads.Items {
495+
requests = append(requests, mcreconcile.Request{
496+
Request: reconcile.Request{
497+
NamespacedName: types.NamespacedName{
498+
Namespace: workload.Namespace,
499+
Name: workload.Name,
500+
},
501+
},
502+
ClusterName: clusterName,
503+
})
504+
}
505+
506+
return requests
507+
})
508+
}).
482509
Complete(r)
483510
}

internal/controller/workloaddeployment_controller.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88

99
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
apimeta "k8s.io/apimachinery/pkg/api/meta"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
ctrl "sigs.k8s.io/controller-runtime"
1213
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -19,6 +20,8 @@ import (
1920

2021
networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha"
2122
computev1alpha "go.datum.net/workload-operator/api/v1alpha"
23+
24+
instancecontrolstateful "go.datum.net/workload-operator/internal/controller/instancecontrol/stateful"
2225
)
2326

2427
// WorkloadDeploymentReconciler reconciles a WorkloadDeployment object
@@ -62,9 +65,7 @@ func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req mcreco
6265
return ctrl.Result{}, nil
6366
}
6467

65-
// TODO(jreese) shortcut work on a status condition for network bindings
66-
// being ready
67-
68+
allNetworkBindingsReady := true
6869
for i, networkInterface := range deployment.Spec.Template.Spec.NetworkInterfaces {
6970
var networkBinding networkingv1alpha.NetworkBinding
7071
networkBindingObjectKey := client.ObjectKey{
@@ -97,6 +98,46 @@ func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req mcreco
9798
}
9899
}
99100

101+
if !apimeta.IsStatusConditionTrue(networkBinding.Status.Conditions, networkingv1alpha.NetworkBindingReady) {
102+
allNetworkBindingsReady = false
103+
}
104+
}
105+
106+
if !allNetworkBindingsReady {
107+
logger.Info("waiting for network bindings to be ready")
108+
return ctrl.Result{}, nil
109+
}
110+
111+
// Collect all instances for this deployment
112+
listOpts := client.MatchingLabels{
113+
computev1alpha.WorkloadDeploymentUIDLabel: string(deployment.GetUID()),
114+
}
115+
116+
var instances computev1alpha.InstanceList
117+
if err := cl.GetClient().List(ctx, &instances, listOpts); err != nil {
118+
return ctrl.Result{}, fmt.Errorf("failed listing instances: %w", err)
119+
}
120+
121+
instanceControl := instancecontrolstateful.New()
122+
123+
actions, err := instanceControl.GetActions(ctx, cl.GetScheme(), &deployment, instances.Items)
124+
if err != nil {
125+
return ctrl.Result{}, fmt.Errorf("failed getting instance control actions: %w", err)
126+
}
127+
128+
logger.Info("collected instance control actions", "count", len(actions))
129+
130+
for _, action := range actions {
131+
// We don't need to actually check this, but it'll reduce log noise.
132+
if action.IsSkipped() {
133+
continue
134+
}
135+
136+
logger.Info("instance control action", "instance", action.Object.GetName(), "action", action.ActionType())
137+
138+
if err := action.Execute(ctx, cl.GetClient()); err != nil {
139+
return ctrl.Result{}, fmt.Errorf("failed executing instance control action: %w", err)
140+
}
100141
}
101142

102143
return ctrl.Result{}, nil
@@ -108,5 +149,7 @@ func (r *WorkloadDeploymentReconciler) SetupWithManager(mgr mcmanager.Manager) e
108149
// TODO(jreese) finalizers
109150
return mcbuilder.ControllerManagedBy(mgr).
110151
For(&computev1alpha.WorkloadDeployment{}, mcbuilder.WithEngageWithLocalCluster(false)).
152+
Owns(&computev1alpha.Instance{}).
153+
Owns(&networkingv1alpha.NetworkBinding{}).
111154
Complete(r)
112155
}

0 commit comments

Comments
 (0)