Skip to content

Commit 52a622a

Browse files
authored
volumezone: scheduler queueing hints: pv (kubernetes#125001)
* volumezone: scheduler queueing hints * add_comment
1 parent a39f425 commit 52a622a

File tree

2 files changed

+148
-16
lines changed

2 files changed

+148
-16
lines changed

pkg/scheduler/framework/plugins/volumezone/volume_zone.go

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"reflect"
2324

2425
v1 "k8s.io/api/core/v1"
2526
storage "k8s.io/api/storage/v1"
@@ -158,21 +159,7 @@ func (pl *VolumeZone) getPVbyPod(logger klog.Logger, pod *v1.Pod) ([]pvTopology,
158159
if s := getErrorAsStatus(err); !s.IsSuccess() {
159160
return nil, s
160161
}
161-
162-
for _, key := range topologyLabels {
163-
if value, ok := pv.ObjectMeta.Labels[key]; ok {
164-
volumeVSet, err := volumehelpers.LabelZonesToSet(value)
165-
if err != nil {
166-
logger.Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
167-
continue
168-
}
169-
podPVTopologies = append(podPVTopologies, pvTopology{
170-
pvName: pv.Name,
171-
key: key,
172-
values: sets.Set[string](volumeVSet),
173-
})
174-
}
175-
}
162+
podPVTopologies = append(podPVTopologies, pl.getPVTopologies(logger, pv)...)
176163
}
177164
return podPVTopologies, nil
178165
}
@@ -292,7 +279,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
292279
// Also, if pvc's VolumeName is filled, that also could make a pod schedulable.
293280
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
294281
// A new pv or updating a pv's volume zone labels may make a pod schedulable.
295-
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
282+
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange},
296283
}
297284
}
298285

@@ -359,6 +346,52 @@ func (pl *VolumeZone) isSchedulableAfterStorageClassAdded(logger klog.Logger, po
359346
return framework.Queue, nil
360347
}
361348

349+
// isSchedulableAfterPersistentVolumeChange is invoked whenever a PersistentVolume added or updated.
350+
// It checks whether the change of PV has made a previously unschedulable pod schedulable.
351+
// Changing the PV topology labels could cause the pod to become schedulable.
352+
func (pl *VolumeZone) isSchedulableAfterPersistentVolumeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
353+
originalPV, modifiedPV, err := util.As[*v1.PersistentVolume](oldObj, newObj)
354+
if err != nil {
355+
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeChange: %w", err)
356+
}
357+
if originalPV == nil {
358+
logger.V(5).Info("PV is newly created, which might make the pod schedulable")
359+
return framework.Queue, nil
360+
}
361+
originalPVTopologies := pl.getPVTopologies(logger, originalPV)
362+
modifiedPVTopologies := pl.getPVTopologies(logger, modifiedPV)
363+
if !reflect.DeepEqual(originalPVTopologies, modifiedPVTopologies) {
364+
logger.V(5).Info("PV's topology was updated, which might make the pod schedulable.", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV))
365+
return framework.Queue, nil
366+
}
367+
368+
logger.V(5).Info("PV was updated, but the topology is unchanged, which it doesn't make the pod schedulable", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV))
369+
return framework.QueueSkip, nil
370+
}
371+
372+
// getPVTopologies retrieves pvTopology from a given PV and returns the array
373+
// This function doesn't check spec.nodeAffinity
374+
// because it's read-only after creation and thus cannot be updated
375+
// and nodeAffinity is being handled in node affinity plugin
376+
func (pl *VolumeZone) getPVTopologies(logger klog.Logger, pv *v1.PersistentVolume) []pvTopology {
377+
podPVTopologies := make([]pvTopology, 0)
378+
for _, key := range topologyLabels {
379+
if value, ok := pv.ObjectMeta.Labels[key]; ok {
380+
labelZonesSet, err := volumehelpers.LabelZonesToSet(value)
381+
if err != nil {
382+
logger.V(5).Info("failed to parse PV's topology label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
383+
continue
384+
}
385+
podPVTopologies = append(podPVTopologies, pvTopology{
386+
pvName: pv.Name,
387+
key: key,
388+
values: sets.Set[string](labelZonesSet),
389+
})
390+
}
391+
}
392+
return podPVTopologies
393+
}
394+
362395
// New initializes a new plugin and returns it.
363396
func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
364397
informerFactory := handle.SharedInformerFactory()

pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,105 @@ func TestIsSchedulableAfterStorageClassAdded(t *testing.T) {
675675
}
676676
}
677677

678+
func TestIsSchedulableAfterPersistentVolumeChange(t *testing.T) {
679+
testcases := map[string]struct {
680+
pod *v1.Pod
681+
oldObj, newObj interface{}
682+
expectedHint framework.QueueingHint
683+
expectedErr bool
684+
}{
685+
"error-wrong-new-object": {
686+
pod: createPodWithVolume("pod_1", "PVC_1"),
687+
newObj: "not-a-pv",
688+
expectedHint: framework.Queue,
689+
expectedErr: true,
690+
},
691+
"error-wrong-old-object": {
692+
pod: createPodWithVolume("pod_1", "PVC_1"),
693+
oldObj: "not-a-pv",
694+
newObj: st.MakePersistentVolume().Name("Vol_1").Obj(),
695+
expectedHint: framework.Queue,
696+
expectedErr: true,
697+
},
698+
"new-pv-was-added": {
699+
pod: createPodWithVolume("pod_1", "PVC_1"),
700+
newObj: &v1.PersistentVolume{
701+
ObjectMeta: metav1.ObjectMeta{
702+
Name: "Vol_1",
703+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-b"},
704+
},
705+
},
706+
expectedHint: framework.Queue,
707+
},
708+
"pv-was-updated-and-changed-topology": {
709+
pod: createPodWithVolume("pod_1", "PVC_1"),
710+
oldObj: &v1.PersistentVolume{
711+
ObjectMeta: metav1.ObjectMeta{
712+
Name: "Vol_1",
713+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a"},
714+
},
715+
},
716+
newObj: &v1.PersistentVolume{
717+
ObjectMeta: metav1.ObjectMeta{
718+
Name: "Vol_1",
719+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-b"},
720+
},
721+
},
722+
expectedHint: framework.Queue,
723+
},
724+
"pv-was-updated-and-added-topology-label": {
725+
pod: createPodWithVolume("pod_1", "PVC_1"),
726+
oldObj: &v1.PersistentVolume{
727+
ObjectMeta: metav1.ObjectMeta{
728+
Name: "Vol_1",
729+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a"},
730+
},
731+
},
732+
newObj: &v1.PersistentVolume{
733+
ObjectMeta: metav1.ObjectMeta{
734+
Name: "Vol_1",
735+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a",
736+
v1.LabelTopologyZone: "zone"},
737+
},
738+
},
739+
expectedHint: framework.Queue,
740+
},
741+
"pv-was-updated-but-no-topology-is-changed": {
742+
pod: createPodWithVolume("pod_1", "PVC_1"),
743+
oldObj: &v1.PersistentVolume{
744+
ObjectMeta: metav1.ObjectMeta{
745+
Name: "Vol_1",
746+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a",
747+
v1.LabelTopologyZone: "zone"},
748+
},
749+
},
750+
newObj: &v1.PersistentVolume{
751+
ObjectMeta: metav1.ObjectMeta{
752+
Name: "Vol_1",
753+
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a",
754+
v1.LabelTopologyZone: "zone"},
755+
},
756+
},
757+
expectedHint: framework.QueueSkip,
758+
},
759+
}
760+
761+
for name, tc := range testcases {
762+
t.Run(name, func(t *testing.T) {
763+
logger, _ := ktesting.NewTestContext(t)
764+
p := &VolumeZone{}
765+
766+
got, err := p.isSchedulableAfterPersistentVolumeChange(logger, tc.pod, tc.oldObj, tc.newObj)
767+
if err != nil && !tc.expectedErr {
768+
t.Errorf("unexpected error: %v", err)
769+
}
770+
if got != tc.expectedHint {
771+
t.Errorf("isSchedulableAfterPersistentVolumeChange() = %v, want %v", got, tc.expectedHint)
772+
}
773+
})
774+
}
775+
}
776+
678777
func BenchmarkVolumeZone(b *testing.B) {
679778
tests := []struct {
680779
Name string

0 commit comments

Comments
 (0)