diff --git a/simulator/scheduler/storereflector/storereflector.go b/simulator/scheduler/storereflector/storereflector.go index ffa2dc7d..e59330d9 100644 --- a/simulator/scheduler/storereflector/storereflector.go +++ b/simulator/scheduler/storereflector/storereflector.go @@ -57,11 +57,19 @@ func (s *reflector) ResisterResultSavingToInformer(client clientset.Interface, s informerFactory := scheduler.NewInformerFactory(client, 0) // Reflector adds scheduling results when pod is updating. // This is because Extenders doesn't have any phase to hook scheduling finished. (both successfully and non-successfully) - _, err := informerFactory.Core().V1().Pods().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ + _, err := informerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + if pod, ok := obj.(*corev1.Pod); ok { + if pod.DeletionTimestamp != nil { + return false + } + } + return true + }, + Handler: cache.ResourceEventHandlerFuncs{ UpdateFunc: s.storeAllResultToPodFunc(client), }, - ) + }) if err != nil { return xerrors.Errorf("failed to AddEventHandler of Informer: %w", err) } @@ -75,7 +83,7 @@ func (s *reflector) ResisterResultSavingToInformer(client clientset.Interface, s // storeAllResultToPodFunc returns the function that reflects all results on the pod annotation when the scheduling is finished. // It will be used as the even handler of resource updating. // -//nolint:gocognit,cyclop +//nolint:gocognit,cyclop,funlen func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(interface{}, interface{}) { return func(_, newObj interface{}) { ctx := context.Background() @@ -91,6 +99,9 @@ func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(int // the shared informer. newPod, err := client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } return false, xerrors.Errorf("get pod: %w", err) } if newPod.UID != pod.UID { @@ -124,6 +135,9 @@ func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(int _, err = client.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}) if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } // Even though we fetched the latest Pod object, we still might get a conflict // because of a concurrent update. Retrying these conflict errors will usually help // as long as we re-fetch the latest Pod object each time. diff --git a/simulator/scheduler/storereflector/storereflector_test.go b/simulator/scheduler/storereflector/storereflector_test.go index 1af0300d..0714b7c9 100644 --- a/simulator/scheduler/storereflector/storereflector_test.go +++ b/simulator/scheduler/storereflector/storereflector_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -200,3 +201,89 @@ func Test_updateResultHistory(t *testing.T) { }) } } + +type fakeStore struct{} + +func (f fakeStore) GetStoredResult(_ *corev1.Pod) map[string]string { + return map[string]string{"foo": "bar"} +} +func (f fakeStore) DeleteData(_ corev1.Pod) { /* no-op */ } + +func TestResisterResultSavingToInformer_FilterFunc(t *testing.T) { + t.Parallel() + + ctx := context.Background() + now := metav1.NewTime(time.Now()) + + podAlive := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-alive", + Namespace: "default", + }, + } + podDeleting := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-deleting", + Namespace: "default", + DeletionTimestamp: &now, + }, + } + + client := fake.NewSimpleClientset(podAlive, podDeleting) + + r, ok := New().(*reflector) + if !ok { + t.Fatalf("reflector failed") + } + r.AddResultStore(fakeStore{}, "fake") + + stopCh := make(chan struct{}) + defer close(stopCh) + + if err := r.ResisterResultSavingToInformer(client, stopCh); err != nil { + t.Fatalf("register failed: %v", err) + } + + // Update both Pods to trigger the Update event + patchPods := func(podName string) { + _ = retry(100*time.Millisecond, 50, func() error { + pod, err := client.CoreV1().Pods("default").Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return err + } + // Modify a label to ensure the update is triggered + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + pod.Labels["touched"] = "true" + _, err = client.CoreV1().Pods("default").Update(ctx, pod, metav1.UpdateOptions{}) + return err + }) + } + patchPods("pod-alive") + patchPods("pod-deleting") + + time.Sleep(500 * time.Millisecond) + + // Assert that pod-alive has the annotation written + pod1, _ := client.CoreV1().Pods("default").Get(ctx, "pod-alive", metav1.GetOptions{}) + if v := pod1.Annotations["foo"]; v != "bar" { + t.Fatalf("pod-alive should have annotation foo=bar, got %#v", pod1.Annotations) + } + + // Assert that pod-deleting does NOT have the annotation + pod2, _ := client.CoreV1().Pods("default").Get(ctx, "pod-deleting", metav1.GetOptions{}) + if pod2.Annotations != nil && pod2.Annotations["foo"] == "bar" { + t.Fatalf("pod-deleting should NOT have annotation foo=bar, but it has: %#v", pod2.Annotations) + } +} + +func retry(interval time.Duration, maxTry int, f func() error) (err error) { + for i := 0; i < maxTry; i++ { + if err = f(); err == nil { + return nil + } + time.Sleep(interval) + } + return +}