Skip to content

Commit e18a9e0

Browse files
authored
feat: property-based scheduling: add AKS property provider (#731)
1 parent 37b986f commit e18a9e0

File tree

9 files changed

+1563
-3
lines changed

9 files changed

+1563
-3
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright (c) Microsoft Corporation.
3+
Licensed under the MIT license.
4+
*/
5+
6+
// Package controllers feature a number of controllers that are in use
7+
// by the AKS property provider.
8+
package controllers
9+
10+
import (
11+
"context"
12+
"time"
13+
14+
corev1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/api/errors"
16+
"k8s.io/klog/v2"
17+
ctrl "sigs.k8s.io/controller-runtime"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
19+
20+
"go.goms.io/fleet/pkg/propertyprovider/aks/trackers"
21+
)
22+
23+
// NodeReconciler reconciles Node objects.
24+
type NodeReconciler struct {
25+
NT *trackers.NodeTracker
26+
Client client.Client
27+
}
28+
29+
// Reconcile reconciles a node object.
30+
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
31+
nodeRef := klog.KRef(req.Namespace, req.Name)
32+
startTime := time.Now()
33+
klog.V(2).InfoS("Reconciliation starts for node objects in the AKS property provider", "node", nodeRef)
34+
defer func() {
35+
latency := time.Since(startTime).Milliseconds()
36+
klog.V(2).InfoS("Reconciliation ends for node objects in the AKS property provider", "node", nodeRef, "latency", latency)
37+
}()
38+
39+
// Retrieve the node object.
40+
node := &corev1.Node{}
41+
if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil {
42+
// Failed to get the node object; this signals that the node should untracked.
43+
if errors.IsNotFound(err) {
44+
// This branch essentially processes the node deletion event (the actual deletion).
45+
// At this point the node may have not been tracked by the tracker at all; if that's
46+
// the case, the removal (untracking) operation is a no-op.
47+
//
48+
// Note that this controller will not add any finalizer to node objects, so as to
49+
// avoid blocking normal Kuberneters operations under unexpected circumstances.
50+
klog.V(2).InfoS("Node is not found; untrack it from the property provider", "node", nodeRef)
51+
r.NT.Remove(req.Name)
52+
return ctrl.Result{}, nil
53+
}
54+
// For other errors, retry the reconciliation.
55+
klog.ErrorS(err, "Failed to get the node object", "node", nodeRef)
56+
return ctrl.Result{}, err
57+
}
58+
59+
// Note that this controller will not untrack a node when it is first marked for deletion;
60+
// instead, it performs the untracking when the node object is actually gone from the
61+
// etcd store. This is intentional, as when a node is marked for deletion, workloads might
62+
// not have been drained from it, and untracking the node too early might lead to a
63+
// case of temporary inconsistency where the amount of requested resource exceed the
64+
// allocatable capacity.
65+
66+
// Track the node. If it has been tracked, update its total and allocatable capacity
67+
// information with the tracker.
68+
//
69+
// Note that normally the capacity information remains immutable before object
70+
// creation; the tracker update only serves as a sanity check.
71+
//
72+
// Also note that the tracker will attempt to track the node even if it has been
73+
// marked for deletion, as cordoned, or as unschedulable. This behavior is consistent with
74+
// the original Fleet setup.
75+
klog.V(2).InfoS("Attempt to track the node", "node", nodeRef)
76+
r.NT.AddOrUpdate(node)
77+
78+
return ctrl.Result{}, nil
79+
}
80+
81+
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
82+
// Reconcile any node changes (create, update, delete).
83+
return ctrl.NewControllerManagedBy(mgr).
84+
For(&corev1.Node{}).
85+
Complete(r)
86+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
Copyright (c) Microsoft Corporation.
3+
Licensed under the MIT license.
4+
*/
5+
6+
// Package controllers feature a number of controllers that are in use
7+
// by the AKS property provider.
8+
package controllers
9+
10+
import (
11+
"context"
12+
"time"
13+
14+
corev1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/api/errors"
16+
"k8s.io/klog/v2"
17+
ctrl "sigs.k8s.io/controller-runtime"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
19+
20+
"go.goms.io/fleet/pkg/propertyprovider/aks/trackers"
21+
)
22+
23+
// TO-DO (chenyu1): this is a relatively expensive watcher, due to how frequent pods can change
24+
// in a Kubernetes cluster; unfortunately at this moment there does not seem to be a better way
25+
// to observe the changes of requested resources in a cluster. The alternative, which is to use
26+
// Lists, adds too much overhead to the API server.
27+
28+
// PodReconciler reconciles Pod objects.
29+
type PodReconciler struct {
30+
PT *trackers.PodTracker
31+
Client client.Client
32+
}
33+
34+
// Reconcile reconciles a pod object.
35+
func (p *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
36+
podRef := klog.KRef(req.Namespace, req.Name)
37+
startTime := time.Now()
38+
klog.V(2).InfoS("Reconciliation starts for pod objects in the AKS property provider", "pod", podRef)
39+
defer func() {
40+
latency := time.Since(startTime).Milliseconds()
41+
klog.V(2).InfoS("Reconciliation ends for pod objects in the AKS property provider", "pod", podRef, "latency", latency)
42+
}()
43+
44+
// Retrieve the pod object.
45+
pod := &corev1.Pod{}
46+
if err := p.Client.Get(ctx, req.NamespacedName, pod); err != nil {
47+
// Failed to get the pod object.
48+
if errors.IsNotFound(err) {
49+
// This branch essentially processes the pod deletion event (the actual deletion).
50+
// At this point the pod may have not been tracked by the tracker at all; if that's
51+
// the case, the removal (untracking) operation is a no-op.
52+
//
53+
// Note that this controller will not add any finalizer to pod objects, so as to
54+
// avoid blocking normal Kuberneters operations under unexpected circumstances.
55+
p.PT.Remove(req.NamespacedName.String())
56+
return ctrl.Result{}, nil
57+
}
58+
59+
// For other errors, retry the reconciliation.
60+
klog.ErrorS(err, "Failed to get the pod object", "pod", podRef)
61+
return ctrl.Result{}, err
62+
}
63+
64+
// Note that this controller will not untrack a pod when it is first marked for deletion;
65+
// instead, it performs the untracking when the pod object is actually gone from the
66+
// etcd store. This is intentional, as when a pod is marked for deletion, workloads might
67+
// not have been successfully terminated yet, and untracking the pod too early might lead to a
68+
// case of temporary inconsistency.
69+
70+
// Track the pod if:
71+
//
72+
// * it is **NOT** of the Succeeded or Failed state; and
73+
// * it has been assigned to a node.
74+
//
75+
// This behavior is consistent with how the Kubernetes CLI tool reports requested capacity
76+
// on a specific node (`kubectl describe node` command).
77+
//
78+
// Note that the tracker will attempt to track the pod even if it has been marked for deletion.
79+
if len(pod.Spec.NodeName) > 0 && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
80+
klog.V(2).InfoS("Attempt to track the pod", "pod", podRef)
81+
p.PT.AddOrUpdate(pod)
82+
} else {
83+
// Untrack the pod.
84+
//
85+
// It may have been descheduled, or transited into a terminal state.
86+
klog.V(2).InfoS("Untrack the pod", "pod", podRef)
87+
p.PT.Remove(req.NamespacedName.String())
88+
}
89+
90+
return ctrl.Result{}, nil
91+
}
92+
93+
func (p *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
94+
// Reconcile any pod changes (create, update, delete).
95+
return ctrl.NewControllerManagedBy(mgr).
96+
For(&corev1.Pod{}).
97+
Complete(p)
98+
}

0 commit comments

Comments
 (0)