Skip to content

Commit a50456d

Browse files
committed
fix: add FilterFunc to skip deleted Pods in ResisterResultSavingToInformer
1 parent 5356260 commit a50456d

File tree

2 files changed

+104
-4
lines changed

2 files changed

+104
-4
lines changed

simulator/scheduler/storereflector/storereflector.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package storereflector
55
import (
66
"context"
77
"encoding/json"
8-
98
"golang.org/x/xerrors"
109
corev1 "k8s.io/api/core/v1"
1110
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -57,11 +56,19 @@ func (s *reflector) ResisterResultSavingToInformer(client clientset.Interface, s
5756
informerFactory := scheduler.NewInformerFactory(client, 0)
5857
// Reflector adds scheduling results when pod is updating.
5958
// This is because Extenders doesn't have any phase to hook scheduling finished. (both successfully and non-successfully)
60-
_, err := informerFactory.Core().V1().Pods().Informer().AddEventHandler(
61-
cache.ResourceEventHandlerFuncs{
59+
_, err := informerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{
60+
FilterFunc: func(obj interface{}) bool {
61+
if pod, ok := obj.(*corev1.Pod); ok {
62+
if pod.DeletionTimestamp != nil {
63+
return false
64+
}
65+
}
66+
return true
67+
},
68+
Handler: cache.ResourceEventHandlerFuncs{
6269
UpdateFunc: s.storeAllResultToPodFunc(client),
6370
},
64-
)
71+
})
6572
if err != nil {
6673
return xerrors.Errorf("failed to AddEventHandler of Informer: %w", err)
6774
}
@@ -91,6 +98,9 @@ func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(int
9198
// the shared informer.
9299
newPod, err := client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
93100
if err != nil {
101+
if apierrors.IsNotFound(err) {
102+
return true, nil
103+
}
94104
return false, xerrors.Errorf("get pod: %w", err)
95105
}
96106
if newPod.UID != pod.UID {
@@ -124,6 +134,9 @@ func (s *reflector) storeAllResultToPodFunc(client clientset.Interface) func(int
124134

125135
_, err = client.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})
126136
if err != nil {
137+
if apierrors.IsNotFound(err) {
138+
return true, nil
139+
}
127140
// Even though we fetched the latest Pod object, we still might get a conflict
128141
// because of a concurrent update. Retrying these conflict errors will usually help
129142
// as long as we re-fetch the latest Pod object each time.

simulator/scheduler/storereflector/storereflector_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strings"
77
"testing"
8+
"time"
89

910
"github.com/google/go-cmp/cmp"
1011
"github.com/stretchr/testify/assert"
@@ -200,3 +201,89 @@ func Test_updateResultHistory(t *testing.T) {
200201
})
201202
}
202203
}
204+
205+
type fakeStore struct{}
206+
207+
func (f fakeStore) GetStoredResult(_ *corev1.Pod) map[string]string {
208+
return map[string]string{"foo": "bar"}
209+
}
210+
func (f fakeStore) DeleteData(_ corev1.Pod) { /* no-op */ }
211+
212+
func TestResisterResultSavingToInformer_FilterFunc(t *testing.T) {
213+
t.Parallel()
214+
215+
ctx := context.Background()
216+
now := metav1.NewTime(time.Now())
217+
218+
podAlive := &corev1.Pod{
219+
ObjectMeta: metav1.ObjectMeta{
220+
Name: "pod-alive",
221+
Namespace: "default",
222+
},
223+
}
224+
podDeleting := &corev1.Pod{
225+
ObjectMeta: metav1.ObjectMeta{
226+
Name: "pod-deleting",
227+
Namespace: "default",
228+
DeletionTimestamp: &now,
229+
},
230+
}
231+
232+
client := fake.NewSimpleClientset(podAlive, podDeleting)
233+
234+
r, ok := New().(*reflector)
235+
if !ok {
236+
t.Fatalf("reflector failed")
237+
}
238+
r.AddResultStore(fakeStore{}, "fake")
239+
240+
stopCh := make(chan struct{})
241+
defer close(stopCh)
242+
243+
if err := r.ResisterResultSavingToInformer(client, stopCh); err != nil {
244+
t.Fatalf("register failed: %v", err)
245+
}
246+
247+
// Update both Pods to trigger the Update event
248+
patchPods := func(podName string) {
249+
_ = retry(100*time.Millisecond, 50, func() error {
250+
pod, err := client.CoreV1().Pods("default").Get(ctx, podName, metav1.GetOptions{})
251+
if err != nil {
252+
return err
253+
}
254+
// Modify a label to ensure the update is triggered
255+
if pod.Labels == nil {
256+
pod.Labels = map[string]string{}
257+
}
258+
pod.Labels["touched"] = "true"
259+
_, err = client.CoreV1().Pods("default").Update(ctx, pod, metav1.UpdateOptions{})
260+
return err
261+
})
262+
}
263+
patchPods("pod-alive")
264+
patchPods("pod-deleting")
265+
266+
time.Sleep(500 * time.Millisecond)
267+
268+
// Assert that pod-alive has the annotation written
269+
pod1, _ := client.CoreV1().Pods("default").Get(ctx, "pod-alive", metav1.GetOptions{})
270+
if v := pod1.Annotations["foo"]; v != "bar" {
271+
t.Fatalf("pod-alive should have annotation foo=bar, got %#v", pod1.Annotations)
272+
}
273+
274+
// Assert that pod-deleting does NOT have the annotation
275+
pod2, _ := client.CoreV1().Pods("default").Get(ctx, "pod-deleting", metav1.GetOptions{})
276+
if pod2.Annotations != nil && pod2.Annotations["foo"] == "bar" {
277+
t.Fatalf("pod-deleting should NOT have annotation foo=bar, but it has: %#v", pod2.Annotations)
278+
}
279+
}
280+
281+
func retry(interval time.Duration, maxTry int, f func() error) (err error) {
282+
for i := 0; i < maxTry; i++ {
283+
if err = f(); err == nil {
284+
return nil
285+
}
286+
time.Sleep(interval)
287+
}
288+
return
289+
}

0 commit comments

Comments
 (0)