Skip to content

Commit 4af1328

Browse files
committed
Allow tracking resource version for reflector store
1 parent 56e7284 commit 4af1328

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ type Reflector struct {
101101
watchErrorHandler WatchErrorHandler
102102
}
103103

104+
// ResourceVersionUpdater is an interface that allows store implementation to
105+
// track the current resource version of the reflector. This is especially
106+
// important if storage bookmarks are enabled.
107+
type ResourceVersionUpdater interface {
108+
// UpdateResourceVersion is called each time current resource version of the reflector
109+
// is updated.
110+
UpdateResourceVersion(resourceVersion string)
111+
}
112+
104113
// The WatchErrorHandler is called whenever ListAndWatch drops the
105114
// connection with an error. After calling this handler, the informer
106115
// will backoff and retry.
@@ -507,6 +516,9 @@ loop:
507516
}
508517
*resourceVersion = newResourceVersion
509518
r.setLastSyncResourceVersion(newResourceVersion)
519+
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
520+
rvu.UpdateResourceVersion(newResourceVersion)
521+
}
510522
eventCount++
511523
}
512524
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,3 +910,59 @@ func TestReflectorSetExpectedType(t *testing.T) {
910910
})
911911
}
912912
}
913+
914+
type storeWithRV struct {
915+
Store
916+
917+
// resourceVersions tracks values passed by UpdateResourceVersion
918+
resourceVersions []string
919+
}
920+
921+
func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) {
922+
s.resourceVersions = append(s.resourceVersions, resourceVersion)
923+
}
924+
925+
func newStoreWithRV() *storeWithRV {
926+
return &storeWithRV{
927+
Store: NewStore(MetaNamespaceKeyFunc),
928+
}
929+
}
930+
931+
func TestReflectorResourceVersionUpdate(t *testing.T) {
932+
s := newStoreWithRV()
933+
934+
stopCh := make(chan struct{})
935+
fw := watch.NewFake()
936+
937+
lw := &testLW{
938+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
939+
return fw, nil
940+
},
941+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
942+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
943+
},
944+
}
945+
r := NewReflector(lw, &v1.Pod{}, s, 0)
946+
947+
makePod := func(rv string) *v1.Pod {
948+
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}}
949+
}
950+
951+
go func() {
952+
fw.Action(watch.Added, makePod("10"))
953+
fw.Action(watch.Modified, makePod("20"))
954+
fw.Action(watch.Bookmark, makePod("30"))
955+
fw.Action(watch.Deleted, makePod("40"))
956+
close(stopCh)
957+
}()
958+
959+
// Initial list should use RV=0
960+
if err := r.ListAndWatch(stopCh); err != nil {
961+
t.Fatal(err)
962+
}
963+
964+
expectedRVs := []string{"10", "20", "30", "40"}
965+
if !reflect.DeepEqual(s.resourceVersions, expectedRVs) {
966+
t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
967+
}
968+
}

0 commit comments

Comments
 (0)