Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
// Karpenter specific annotations
const (
DoNotDisruptAnnotationKey = apis.Group + "/do-not-disrupt"
DisruptionScheduleAnnotationKey = apis.Group + "/disruption-schedule"
DisruptionScheduleDurationAnnotationKey = apis.Group + "/disruption-schedule-duration"
ProviderCompatibilityAnnotationKey = apis.CompatibilityGroup + "/provider"
NodePoolHashAnnotationKey = apis.Group + "/nodepool-hash"
NodePoolHashVersionAnnotationKey = apis.Group + "/nodepool-hash-version"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
var err error
// Run scheduling simulation to compute consolidation option
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, c.clock, candidates...)
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewMethods(clk clock.Clock, cluster *state.Cluster, kubeClient client.Clien
// Terminate and create replacement for drifted NodeClaims in Static NodePool
NewStaticDrift(cluster, provisioner, cp),
// Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner, recorder),
NewDrift(clk, kubeClient, cluster, provisioner, recorder),
// Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn
NewMultiNodeConsolidation(c),
// And finally fall back our single NodeClaim consolidation to further reduce cluster cost.
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"

"github.com/samber/lo"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/utils/pretty"
Expand All @@ -36,14 +37,16 @@ import (

// Drift is a subreconciler that deletes drifted candidates.
type Drift struct {
clock clock.Clock
kubeClient client.Client
cluster *state.Cluster
provisioner *provisioning.Provisioner
recorder events.Recorder
}

func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder) *Drift {
func NewDrift(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder) *Drift {
return &Drift{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
Expand Down Expand Up @@ -78,7 +81,7 @@ func (d *Drift) ComputeCommands(ctx context.Context, disruptionBudgetMapping map
continue
}
// Check if we need to create any NodeClaims.
results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, candidate)
results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, d.clock, candidate)
if err != nil {
// if a candidate is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ var _ = Describe("Drift", func() {
for _, nc := range nodeClaims {
nc.StatusConditions().SetTrue(v1.ConditionTypeDrifted)
}
drift := disruption.NewDrift(env.Client, cluster, prov, recorder)
drift := disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder)

ExpectApplied(ctx, env.Client, staticNp, nodeClaims[0], nodeClaims[1], nodes[0], nodes[1])

Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
var errCandidateDeleting = fmt.Errorf("candidate is deleting")

//nolint:gocyclo
func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, clk clock.Clock,
candidates ...*Candidate,
) (scheduling.Results, error) {
candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...)
Expand Down Expand Up @@ -83,13 +83,13 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
}
for _, n := range candidates {
currentlyReschedulablePods := lo.Filter(n.reschedulablePods, func(p *corev1.Pod, _ int) bool {
return pdbs.IsCurrentlyReschedulable(p)
return pdbs.IsCurrentlyReschedulable(p, clk)
})
pods = append(pods, currentlyReschedulablePods...)
}

// We get the pods that are on nodes that are deleting
deletingNodePods, err := deletingNodes.CurrentlyReschedulablePods(ctx, kubeClient)
deletingNodePods, err := deletingNodes.CurrentlyReschedulablePods(ctx, kubeClient, clk)
if err != nil {
return scheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/controllers/disruption/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ var _ = Describe("Queue", func() {

stateNode := ExpectStateNodeExists(cluster, node1)
cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down Expand Up @@ -132,7 +132,7 @@ var _ = Describe("Queue", func() {
}

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand All @@ -157,7 +157,7 @@ var _ = Describe("Queue", func() {
}

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down Expand Up @@ -188,7 +188,7 @@ var _ = Describe("Queue", func() {
}

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down Expand Up @@ -218,7 +218,7 @@ var _ = Describe("Queue", func() {
}

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down Expand Up @@ -271,7 +271,7 @@ var _ = Describe("Queue", func() {
}

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down Expand Up @@ -315,7 +315,7 @@ var _ = Describe("Queue", func() {
stateNode := ExpectStateNodeExistsForNodeClaim(cluster, nodeClaim1)

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down Expand Up @@ -352,7 +352,7 @@ var _ = Describe("Queue", func() {
}}

cmd := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand All @@ -361,7 +361,7 @@ var _ = Describe("Queue", func() {
}
Expect(queue.StartCommand(ctx, cmd)).To(BeNil())
cmd2 := &disruption.Command{
Method: disruption.NewDrift(env.Client, cluster, prov, recorder),
Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
CreationTimestamp: fakeClock.Now(),
ID: uuid.New(),
Results: scheduling.Results{},
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ var _ = Describe("Simulate Scheduling", func() {
candidate, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, stateNode, pdbs, nodePoolMap, nodePoolToInstanceTypesMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(Succeed())

results, err := disruption.SimulateScheduling(ctx, env.Client, cluster, prov, candidate)
results, err := disruption.SimulateScheduling(ctx, env.Client, cluster, prov, fakeClock, candidate)
Expect(err).To(Succeed())
Expect(results.PodErrors[pod]).To(BeNil())
})
Expand Down Expand Up @@ -1879,7 +1879,7 @@ var _ = Describe("Candidate Filtering", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})

Expect(cluster.DeepCopyNodes()).To(HaveLen(1))
cmd := &disruption.Command{Method: disruption.NewDrift(env.Client, cluster, prov, recorder), Results: pscheduling.Results{}, Candidates: []*disruption.Candidate{{StateNode: cluster.DeepCopyNodes()[0], NodePool: nodePool}}, Replacements: nil}
cmd := &disruption.Command{Method: disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder), Results: pscheduling.Results{}, Candidates: []*disruption.Candidate{{StateNode: cluster.DeepCopyNodes()[0], NodePool: nodePool}}, Replacements: nil}
Expect(queue.StartCommand(ctx, cmd))

_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.DeepCopyNodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
}
// We only care if instanceType in non-empty consolidation to do price-comparison.
instanceType := instanceTypeMap[node.Labels()[corev1.LabelInstanceTypeStable]]
if pods, err = node.ValidatePodsDisruptable(ctx, kubeClient, pdbs); err != nil {
if pods, err = node.ValidatePodsDisruptable(ctx, kubeClient, pdbs, clk); err != nil {
// If the NodeClaim has a TerminationGracePeriod set and the disruption class is eventual, the node should be
// considered a candidate even if there's a pod that will block eviction. Other error types should still cause
// failure creating the candidate.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (v *validation) validateCommand(ctx context.Context, cmd Command, candidate
if len(candidates) == 0 {
return NewValidationError(fmt.Errorf("no candidates"))
}
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, v.clock, candidates...)
if err != nil {
return fmt.Errorf("simluating scheduling, %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMethodsWithNopValidator() []disruption.Method {
return []disruption.Method{
emptiness,
disruption.NewStaticDrift(cluster, prov, cloudProvider),
disruption.NewDrift(env.Client, cluster, prov, recorder),
disruption.NewDrift(fakeClock, env.Client, cluster, prov, recorder),
multiNodeConsolidation,
singleNodeConsolidation,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/node/termination/terminator/terminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (t *Terminator) Drain(ctx context.Context, node *corev1.Node, nodeGracePeri
for _, group := range podGroups {
if len(group) > 0 {
// Only add pods to the eviction queue that haven't been evicted yet
t.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...)
t.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p, t.clock) })...)
return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", lo.SumBy(podGroups, func(pods []*corev1.Pod) int { return len(pods) })))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
// We do this after getting the pending pods so that we undershoot if pods are
// actively migrating from a node that is being deleted
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
deletingNodePods, err := nodes.Deleting().CurrentlyReschedulablePods(ctx, p.kubeClient)
deletingNodePods, err := nodes.Deleting().CurrentlyReschedulablePods(ctx, p.kubeClient, p.clock)
if err != nil {
return scheduler.Results{}, err
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/controllers/state/statenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -99,10 +100,10 @@ func (n StateNodes) Pods(ctx context.Context, kubeClient client.Client) ([]*core
return pods, nil
}

func (n StateNodes) CurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client) ([]*corev1.Pod, error) {
func (n StateNodes) CurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client, clk clock.Clock) ([]*corev1.Pod, error) {
var pods []*corev1.Pod
for _, node := range n {
p, err := node.CurrentlyReschedulablePods(ctx, kubeClient)
p, err := node.CurrentlyReschedulablePods(ctx, kubeClient, clk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,23 +228,28 @@ func (in *StateNode) ValidateNodeDisruptable() error {
}

// ValidatePodDisruptable returns an error if the StateNode contains a pod that cannot be disrupted
// This checks associated PDBs and do-not-disrupt annotations for each pod on the node.
// This checks associated PDBs, do-not-disrupt annotations, and disruption schedules for each pod on the node.
// ValidatePodDisruptable takes in a recorder to emit events on the nodeclaims when the state node is not a candidate
//
//nolint:gocyclo
func (in *StateNode) ValidatePodsDisruptable(ctx context.Context, kubeClient client.Client, pdbs pdb.Limits) ([]*corev1.Pod, error) {
func (in *StateNode) ValidatePodsDisruptable(ctx context.Context, kubeClient client.Client, pdbs pdb.Limits, clk clock.Clock) ([]*corev1.Pod, error) {
pods, err := in.Pods(ctx, kubeClient)
if err != nil {
return nil, fmt.Errorf("getting pods from node, %w", err)
}
for _, po := range pods {
// We only consider pods that are actively running for "karpenter.sh/do-not-disrupt"
// This means that we will allow Mirror Pods and DaemonSets to block disruption using this annotation
if !podutils.IsDisruptable(po) {
return pods, NewPodBlockEvictionError(serrors.Wrap(fmt.Errorf(`pod has "karpenter.sh/do-not-disrupt" annotation`), "Pod", klog.KObj(po)))
// We only consider pods that are actively running for disruption checks
// This means that we will allow Mirror Pods and DaemonSets to block disruption using these annotations
if !podutils.IsDisruptable(po, clk) {
// Check the specific reason for blocking to provide a better error message
if podutils.HasDoNotDisrupt(po) {
return pods, NewPodBlockEvictionError(serrors.Wrap(fmt.Errorf(`pod has "karpenter.sh/do-not-disrupt" annotation`), "Pod", klog.KObj(po)))
}
// Must be blocked by disruption schedule
return pods, NewPodBlockEvictionError(serrors.Wrap(fmt.Errorf("pod is outside its disruption schedule window"), "Pod", klog.KObj(po)))
}
}
if pdbKeys, ok := pdbs.CanEvictPods(pods); !ok {
if pdbKeys, ok := pdbs.CanEvictPods(pods, clk); !ok {
if len(pdbKeys) > 1 {
return pods, NewPodBlockEvictionError(serrors.Wrap(fmt.Errorf("eviction does not support multiple PDBs"), "PodDisruptionBudget(s)", pdbKeys))
}
Expand All @@ -254,11 +260,11 @@ func (in *StateNode) ValidatePodsDisruptable(ctx context.Context, kubeClient cli
}

// CurrentlyReschedulablePods gets the pods assigned to the Node that are currently reschedulable based on the kubernetes api-server bindings
func (in *StateNode) CurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client) ([]*corev1.Pod, error) {
func (in *StateNode) CurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client, clk clock.Clock) ([]*corev1.Pod, error) {
if in.Node == nil {
return nil, nil
}
return nodeutils.GetCurrentlyReschedulablePods(ctx, kubeClient, in.Node)
return nodeutils.GetCurrentlyReschedulablePods(ctx, kubeClient, clk, in.Node)
}

func (in *StateNode) HostName() string {
Expand Down
5 changes: 3 additions & 2 deletions pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -147,7 +148,7 @@ func NodeClaimForNode(ctx context.Context, c client.Client, node *corev1.Node) (
}

// GetCurrentlyReschedulablePods grabs all pods from the passed nodes that satisfy the IsReschedulable criteria
func GetCurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client, nodes ...*corev1.Node) ([]*corev1.Pod, error) {
func GetCurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client, clk clock.Clock, nodes ...*corev1.Node) ([]*corev1.Pod, error) {
pods, err := GetPods(ctx, kubeClient, nodes...)
if err != nil {
return nil, fmt.Errorf("listing pods, %w", err)
Expand All @@ -159,7 +160,7 @@ func GetCurrentlyReschedulablePods(ctx context.Context, kubeClient client.Client
}

return lo.Filter(pods, func(p *corev1.Pod, _ int) bool {
return pdbs.IsCurrentlyReschedulable(p)
return pdbs.IsCurrentlyReschedulable(p, clk)
}), nil
}

Expand Down
Loading
Loading