Skip to content

Commit a6caa0a

Browse files
committed
Treat replaced events that didn't change resourceVersion as resync events
1 parent 651b2cf commit a6caa0a

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

staging/src/k8s.io/client-go/tools/cache/shared_informer.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sync"
2222
"time"
2323

24+
"k8s.io/apimachinery/pkg/api/meta"
2425
"k8s.io/apimachinery/pkg/runtime"
2526
"k8s.io/apimachinery/pkg/util/clock"
2627
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -486,7 +487,21 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
486487
if err := s.indexer.Update(d.Object); err != nil {
487488
return err
488489
}
489-
isSync := d.Type == Sync
490+
491+
isSync := false
492+
switch {
493+
case d.Type == Sync:
494+
// Sync events are only propagated to listeners that requested resync
495+
isSync = true
496+
case d.Type == Replaced:
497+
if accessor, err := meta.Accessor(d.Object); err == nil {
498+
if oldAccessor, err := meta.Accessor(old); err == nil {
499+
// Replaced events that didn't change resourceVersion are treated as resync events
500+
// and only propagated to listeners that requested resync
501+
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
502+
}
503+
}
504+
}
490505
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
491506
} else {
492507
if err := s.indexer.Add(d.Object); err != nil {

staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
271271
// source simulates an apiserver object endpoint.
272272
source := fcache.NewFakeControllerSource()
273273

274-
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}})
275-
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
274+
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
275+
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
276276

277277
// create the shared informer and resync every 1s
278278
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -301,8 +301,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
301301
}
302302

303303
// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
304-
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}})
305-
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
304+
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}})
305+
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}})
306306

307307
// Ensure that nobody saw any changes
308308
for _, listener := range listeners {
@@ -315,7 +315,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
315315
listener.receivedItemNames = []string{}
316316
}
317317

318-
listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
318+
listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3")
319319
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
320320

321321
// This calls shouldSync, which deletes noResync from the list of syncingListeners

0 commit comments

Comments
 (0)