Skip to content

Commit d8b5fc1

Browse files
Add node watcher to MachinePool controller
1 parent 01f619f commit d8b5fc1

File tree

7 files changed

+321
-1
lines changed

7 files changed

+321
-1
lines changed

api/v1beta1/index/index.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,15 @@ func AddDefaultIndexes(ctx context.Context, mgr ctrl.Manager) error {
4141
}
4242
}
4343

44+
if feature.Gates.Enabled(feature.MachinePool) {
45+
if err := ByMachinePoolNode(ctx, mgr); err != nil {
46+
return err
47+
}
48+
49+
if err := ByMachinePoolProviderID(ctx, mgr); err != nil {
50+
return err
51+
}
52+
}
53+
4454
return nil
4555
}

api/v1beta1/index/machinepool.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package index
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"github.com/pkg/errors"
24+
ctrl "sigs.k8s.io/controller-runtime"
25+
"sigs.k8s.io/controller-runtime/pkg/client"
26+
27+
"sigs.k8s.io/cluster-api/controllers/noderefutil"
28+
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
29+
)
30+
31+
const (
32+
// MachinePoolNodeNameField is used by the MachinePool Controller to index MachinePools by Node name, and add a watch on Nodes.
33+
MachinePoolNodeNameField = "status.nodeRefs.name"
34+
35+
// MachinePoolProviderIDField is used to index MachinePools by ProviderID. It's useful to find MachinePools
36+
// in a management cluster from Nodes in a workload cluster.
37+
MachinePoolProviderIDField = "spec.providerIDList"
38+
)
39+
40+
// ByMachinePoolNode adds the machinepool node name index to the
41+
// managers cache.
42+
func ByMachinePoolNode(ctx context.Context, mgr ctrl.Manager) error {
43+
if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{},
44+
MachinePoolNodeNameField,
45+
MachinePoolByNodeName,
46+
); err != nil {
47+
return errors.Wrap(err, "error setting index field")
48+
}
49+
50+
return nil
51+
}
52+
53+
// MachinePoolByNodeName contains the logic to index MachinePools by Node name.
54+
func MachinePoolByNodeName(o client.Object) []string {
55+
machinepool, ok := o.(*expv1.MachinePool)
56+
if !ok {
57+
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
58+
}
59+
60+
if len(machinepool.Status.NodeRefs) == 0 {
61+
return nil
62+
}
63+
64+
nodeNames := make([]string, 0, len(machinepool.Status.NodeRefs))
65+
for _, ref := range machinepool.Status.NodeRefs {
66+
nodeNames = append(nodeNames, ref.Name)
67+
}
68+
return nodeNames
69+
}
70+
71+
// ByMachinePoolProviderID adds the machinepool providerID index to the
72+
// managers cache.
73+
func ByMachinePoolProviderID(ctx context.Context, mgr ctrl.Manager) error {
74+
if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{},
75+
MachinePoolProviderIDField,
76+
machinePoolByProviderID,
77+
); err != nil {
78+
return errors.Wrap(err, "error setting index field")
79+
}
80+
81+
return nil
82+
}
83+
84+
func machinePoolByProviderID(o client.Object) []string {
85+
machinepool, ok := o.(*expv1.MachinePool)
86+
if !ok {
87+
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
88+
}
89+
90+
if len(machinepool.Spec.ProviderIDList) == 0 {
91+
return nil
92+
}
93+
94+
providerIDs := make([]string, 0, len(machinepool.Spec.ProviderIDList))
95+
for _, id := range machinepool.Spec.ProviderIDList {
96+
providerID, err := noderefutil.NewProviderID(id)
97+
if err != nil {
98+
// Failed to create providerID, skipping.
99+
continue
100+
}
101+
providerIDs = append(providerIDs, providerID.IndexKey())
102+
}
103+
104+
return providerIDs
105+
}

api/v1beta1/index/machinepool_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package index
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/gomega"
23+
corev1 "k8s.io/api/core/v1"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
25+
26+
"sigs.k8s.io/cluster-api/controllers/noderefutil"
27+
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
28+
)
29+
30+
func TestIndexMachinePoolByNodeName(t *testing.T) {
31+
testCases := []struct {
32+
name string
33+
object client.Object
34+
expected []string
35+
}{
36+
{
37+
name: "when the machinepool has no NodeRef",
38+
object: &expv1.MachinePool{},
39+
expected: []string{},
40+
},
41+
{
42+
name: "when the machinepool has valid NodeRefs",
43+
object: &expv1.MachinePool{
44+
Status: expv1.MachinePoolStatus{
45+
NodeRefs: []corev1.ObjectReference{
46+
{
47+
Name: "node1",
48+
},
49+
{
50+
Name: "node2",
51+
},
52+
},
53+
},
54+
},
55+
expected: []string{"node1", "node2"},
56+
},
57+
}
58+
59+
for _, tc := range testCases {
60+
t.Run(tc.name, func(t *testing.T) {
61+
g := NewWithT(t)
62+
got := MachinePoolByNodeName(tc.object)
63+
g.Expect(got).To(ConsistOf(tc.expected))
64+
})
65+
}
66+
}
67+
68+
func TestIndexMachinePoolByProviderID(t *testing.T) {
69+
g := NewWithT(t)
70+
validProviderID, err := noderefutil.NewProviderID("aws://region/zone/1")
71+
g.Expect(err).ToNot(HaveOccurred())
72+
otherValidProviderID, err := noderefutil.NewProviderID("aws://region/zone/2")
73+
g.Expect(err).ToNot(HaveOccurred())
74+
75+
testCases := []struct {
76+
name string
77+
object client.Object
78+
expected []string
79+
}{
80+
{
81+
name: "MachinePool has no providerID",
82+
object: &expv1.MachinePool{},
83+
expected: nil,
84+
},
85+
{
86+
name: "MachinePool has invalid providerID",
87+
object: &expv1.MachinePool{
88+
Spec: expv1.MachinePoolSpec{
89+
ProviderIDList: []string{"invalid"},
90+
},
91+
},
92+
expected: []string{},
93+
},
94+
{
95+
name: "MachinePool has valid providerIDs",
96+
object: &expv1.MachinePool{
97+
Spec: expv1.MachinePoolSpec{
98+
ProviderIDList: []string{validProviderID.String(), otherValidProviderID.String()},
99+
},
100+
},
101+
expected: []string{validProviderID.IndexKey(), otherValidProviderID.IndexKey()},
102+
},
103+
}
104+
105+
for _, tc := range testCases {
106+
t.Run(tc.name, func(t *testing.T) {
107+
g := NewWithT(t)
108+
got := machinePoolByProviderID(tc.object)
109+
g.Expect(got).To(BeEquivalentTo(tc.expected))
110+
})
111+
}
112+
}

exp/controllers/alias.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ import (
2323
"sigs.k8s.io/controller-runtime/pkg/client"
2424
"sigs.k8s.io/controller-runtime/pkg/controller"
2525

26+
"sigs.k8s.io/cluster-api/controllers/remote"
2627
machinepool "sigs.k8s.io/cluster-api/exp/internal/controllers"
2728
)
2829

2930
// MachinePoolReconciler reconciles a MachinePool object.
3031
type MachinePoolReconciler struct {
3132
Client client.Client
3233
APIReader client.Reader
34+
Tracker *remote.ClusterCacheTracker
3335

3436
// WatchFilterValue is the label value used to filter events prior to reconciliation.
3537
WatchFilterValue string
@@ -39,6 +41,7 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M
3941
return (&machinepool.MachinePoolReconciler{
4042
Client: r.Client,
4143
APIReader: r.APIReader,
44+
Tracker: r.Tracker,
4245
WatchFilterValue: r.WatchFilterValue,
4346
}).SetupWithManager(ctx, mgr, options)
4447
}

exp/internal/controllers/machinepool_controller.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controllers
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"sync"
2223

2324
"github.com/pkg/errors"
@@ -34,10 +35,13 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/controller"
3536
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3637
"sigs.k8s.io/controller-runtime/pkg/handler"
38+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3739
"sigs.k8s.io/controller-runtime/pkg/source"
3840

3941
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
42+
"sigs.k8s.io/cluster-api/api/v1beta1/index"
4043
"sigs.k8s.io/cluster-api/controllers/external"
44+
"sigs.k8s.io/cluster-api/controllers/noderefutil"
4145
"sigs.k8s.io/cluster-api/controllers/remote"
4246
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
4347
"sigs.k8s.io/cluster-api/util"
@@ -62,6 +66,7 @@ const (
6266
type MachinePoolReconciler struct {
6367
Client client.Client
6468
APIReader client.Reader
69+
Tracker *remote.ClusterCacheTracker
6570

6671
// WatchFilterValue is the label value used to filter events prior to reconciliation.
6772
WatchFilterValue string
@@ -289,3 +294,80 @@ func (r *MachinePoolReconciler) reconcileDeleteExternal(ctx context.Context, m *
289294
// Return true if there are no more external objects.
290295
return len(objects) == 0, nil
291296
}
297+
298+
func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
299+
log := ctrl.LoggerFrom(ctx)
300+
301+
if !conditions.IsTrue(cluster, clusterv1.ControlPlaneInitializedCondition) {
302+
log.V(5).Info("Skipping node watching setup because control plane is not initialized")
303+
return nil
304+
}
305+
306+
// If there is no tracker, don't watch remote nodes
307+
if r.Tracker == nil {
308+
return nil
309+
}
310+
311+
return r.Tracker.Watch(ctx, remote.WatchInput{
312+
Name: "machinepool-watchNodes",
313+
Cluster: util.ObjectKey(cluster),
314+
Watcher: r.controller,
315+
Kind: &corev1.Node{},
316+
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
317+
})
318+
}
319+
320+
func (r *MachinePoolReconciler) nodeToMachinePool(o client.Object) []reconcile.Request {
321+
node, ok := o.(*corev1.Node)
322+
if !ok {
323+
panic(fmt.Sprintf("Expected a Node but got a %T", o))
324+
}
325+
326+
var filters []client.ListOption
327+
// Match by clusterName when the node has the annotation.
328+
if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok {
329+
filters = append(filters, client.MatchingLabels{
330+
clusterv1.ClusterNameLabel: clusterName,
331+
})
332+
}
333+
334+
// Match by namespace when the node has the annotation.
335+
if namespace, ok := node.GetAnnotations()[clusterv1.ClusterNamespaceAnnotation]; ok {
336+
filters = append(filters, client.InNamespace(namespace))
337+
}
338+
339+
// Match by nodeName and status.nodeRef.name.
340+
machinePoolList := &expv1.MachinePoolList{}
341+
if err := r.Client.List(
342+
context.TODO(),
343+
machinePoolList,
344+
append(filters, client.MatchingFields{index.MachinePoolNodeNameField: node.Name})...); err != nil {
345+
return nil
346+
}
347+
348+
// There should be exactly 1 MachinePool for the node.
349+
if len(machinePoolList.Items) == 1 {
350+
return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}}
351+
}
352+
353+
// Otherwise let's match by providerID. This is useful when e.g the NodeRef has not been set yet.
354+
// Match by providerID
355+
nodeProviderID, err := noderefutil.NewProviderID(node.Spec.ProviderID)
356+
if err != nil {
357+
return nil
358+
}
359+
machinePoolList = &expv1.MachinePoolList{}
360+
if err := r.Client.List(
361+
context.TODO(),
362+
machinePoolList,
363+
append(filters, client.MatchingFields{index.MachinePoolProviderIDField: nodeProviderID.IndexKey()})...); err != nil {
364+
return nil
365+
}
366+
367+
// There should be exactly 1 MachinePool for the node.
368+
if len(machinePoolList.Items) == 1 {
369+
return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}}
370+
}
371+
372+
return nil
373+
}

exp/internal/controllers/machinepool_controller_noderef.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ type getNodeReferencesResult struct {
4949

5050
func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) {
5151
log := ctrl.LoggerFrom(ctx)
52+
53+
// Create a watch on the nodes in the Cluster.
54+
if err := r.watchClusterNodes(ctx, cluster); err != nil {
55+
return ctrl.Result{}, err
56+
}
57+
5258
// Check that the MachinePool hasn't been deleted or in the process.
5359
if !mp.DeletionTimestamp.IsZero() {
5460
return ctrl.Result{}, nil
@@ -80,7 +86,8 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *
8086
if err != nil {
8187
if err == errNoAvailableNodes {
8288
log.Info("Cannot assign NodeRefs to MachinePool, no matching Nodes")
83-
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
89+
// No need to requeue here. Nodes emit an event that triggers reconciliation.
90+
return ctrl.Result{}, nil
8491
}
8592
r.recorder.Event(mp, corev1.EventTypeWarning, "FailedSetNodeRef", err.Error())
8693
return ctrl.Result{}, errors.Wrapf(err, "failed to get node references")

0 commit comments

Comments
 (0)