Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions simulator/scheduler/storereflector/storereflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ func (s *reflector) ResisterResultSavingToInformer(client clientset.Interface, s
informerFactory := scheduler.NewInformerFactory(client, 0)
// Reflector adds scheduling results when pod is updating.
// This is because Extenders doesn't have any phase to hook scheduling finished. (both successfully and non-successfully)
_, err := informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
_, err := informerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
if pod, ok := obj.(*corev1.Pod); ok {
if pod.DeletionTimestamp != nil {
return false
}
}
return true
},
Handler: cache.ResourceEventHandlerFuncs{
UpdateFunc: s.storeAllResultToPodFunc(client),
},
)
})
if err != nil {
return xerrors.Errorf("failed to AddEventHandler of Informer: %w", err)
}
Expand All @@ -75,7 +83,7 @@ func (s *reflector) ResisterResultSavingToInformer(client clientset.Interface, s
// storeAllResultToPodFunc returns the function that reflects all results on the pod annotation when the scheduling is finished.
// It will be used as the even handler of resource updating.
//
//nolint:gocognit,cyclop
//nolint:gocognit,cyclop,funlen
func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(interface{}, interface{}) {
return func(_, newObj interface{}) {
ctx := context.Background()
Expand All @@ -91,6 +99,9 @@ func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(int
// the shared informer.
newPod, err := client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, xerrors.Errorf("get pod: %w", err)
}
if newPod.UID != pod.UID {
Expand Down Expand Up @@ -124,6 +135,9 @@ func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(int

_, err = client.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
// Even though we fetched the latest Pod object, we still might get a conflict
// because of a concurrent update. Retrying these conflict errors will usually help
// as long as we re-fetch the latest Pod object each time.
Expand Down
87 changes: 87 additions & 0 deletions simulator/scheduler/storereflector/storereflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -200,3 +201,89 @@ func Test_updateResultHistory(t *testing.T) {
})
}
}

type fakeStore struct{}

func (f fakeStore) GetStoredResult(_ *corev1.Pod) map[string]string {
return map[string]string{"foo": "bar"}
}
func (f fakeStore) DeleteData(_ corev1.Pod) { /* no-op */ }

func TestResisterResultSavingToInformer_FilterFunc(t *testing.T) {
t.Parallel()

ctx := context.Background()
now := metav1.NewTime(time.Now())

podAlive := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-alive",
Namespace: "default",
},
}
podDeleting := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-deleting",
Namespace: "default",
DeletionTimestamp: &now,
},
}

client := fake.NewSimpleClientset(podAlive, podDeleting)

r, ok := New().(*reflector)
if !ok {
t.Fatalf("reflector failed")
}
r.AddResultStore(fakeStore{}, "fake")

stopCh := make(chan struct{})
defer close(stopCh)

if err := r.ResisterResultSavingToInformer(client, stopCh); err != nil {
t.Fatalf("register failed: %v", err)
}

// Update both Pods to trigger the Update event
patchPods := func(podName string) {
_ = retry(100*time.Millisecond, 50, func() error {
pod, err := client.CoreV1().Pods("default").Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return err
}
// Modify a label to ensure the update is triggered
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels["touched"] = "true"
_, err = client.CoreV1().Pods("default").Update(ctx, pod, metav1.UpdateOptions{})
return err
})
}
patchPods("pod-alive")
patchPods("pod-deleting")

time.Sleep(500 * time.Millisecond)

// Assert that pod-alive has the annotation written
pod1, _ := client.CoreV1().Pods("default").Get(ctx, "pod-alive", metav1.GetOptions{})
if v := pod1.Annotations["foo"]; v != "bar" {
t.Fatalf("pod-alive should have annotation foo=bar, got %#v", pod1.Annotations)
}

// Assert that pod-deleting does NOT have the annotation
pod2, _ := client.CoreV1().Pods("default").Get(ctx, "pod-deleting", metav1.GetOptions{})
if pod2.Annotations != nil && pod2.Annotations["foo"] == "bar" {
t.Fatalf("pod-deleting should NOT have annotation foo=bar, but it has: %#v", pod2.Annotations)
}
}

func retry(interval time.Duration, maxTry int, f func() error) (err error) {
for i := 0; i < maxTry; i++ {
if err = f(); err == nil {
return nil
}
time.Sleep(interval)
}
return
}