Skip to content

Commit b9d2df8

Browse files
authored
Merge pull request kubernetes#95145 from wojtek-t/send_bookmarks_on_changes
Watch bookmarks may contain version of objects of other types
2 parents a8e96c8 + 0bd8104 commit b9d2df8

File tree

4 files changed

+98
-25
lines changed

4 files changed

+98
-25
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,9 @@ go_test(
6565
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
6666
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
6767
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
68-
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
6968
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
7069
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
71-
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
7270
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
73-
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
7471
],
7572
)
7673

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,19 @@ func (c *Cacher) dispatchEvents() {
793793
if !ok {
794794
return
795795
}
796-
c.dispatchEvent(&event)
796+
// Don't dispatch bookmarks coming from the storage layer.
797+
// They can be very frequent (even to the level of subseconds)
798+
// to allow efficient watch resumption on kube-apiserver restarts,
799+
// and propagating them down may overload the whole system.
800+
//
801+
// TODO: If at some point we decide the performance and scalability
802+
// footprint is acceptable, this is the place to hook them in.
803+
// However, we then need to check if this was called as a result
804+
// of a bookmark event or regular Add/Update/Delete operation by
805+
// checking if resourceVersion here has changed.
806+
if event.Type != watch.Bookmark {
807+
c.dispatchEvent(&event)
808+
}
797809
lastProcessedResourceVersion = event.ResourceVersion
798810
case <-bookmarkTimer.C():
799811
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ import (
4141
"k8s.io/apimachinery/pkg/watch"
4242
"k8s.io/apiserver/pkg/apis/example"
4343
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
44-
"k8s.io/apiserver/pkg/features"
4544
"k8s.io/apiserver/pkg/storage"
46-
utilfeature "k8s.io/apiserver/pkg/util/feature"
47-
featuregatetesting "k8s.io/component-base/featuregate/testing"
4845
)
4946

5047
var (
@@ -637,7 +634,6 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
637634
}
638635

639636
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
640-
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
641637
backingStorage := &dummyStorage{}
642638
cacher, _, err := newTestCacher(backingStorage)
643639
if err != nil {
@@ -865,7 +861,6 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
865861
}
866862

867863
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
868-
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
869864
backingStorage := &dummyStorage{}
870865
cacher, _, err := newTestCacher(backingStorage)
871866
if err != nil {
@@ -938,6 +933,71 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
938933
}
939934
}
940935

936+
func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
937+
backingStorage := &dummyStorage{}
938+
cacher, _, err := newTestCacher(backingStorage)
939+
if err != nil {
940+
t.Fatalf("Couldn't create cacher: %v", err)
941+
}
942+
defer cacher.Stop()
943+
944+
// Ensure that bookmarks are sent more frequently than every 1m.
945+
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
946+
947+
// Wait until cacher is initialized.
948+
cacher.ready.wait()
949+
950+
makePod := func(i int) *examplev1.Pod {
951+
return &examplev1.Pod{
952+
ObjectMeta: metav1.ObjectMeta{
953+
Name: fmt.Sprintf("pod-%d", i),
954+
Namespace: "ns",
955+
ResourceVersion: fmt.Sprintf("%d", i),
956+
},
957+
}
958+
}
959+
if err := cacher.watchCache.Add(makePod(1000)); err != nil {
960+
t.Errorf("error: %v", err)
961+
}
962+
963+
pred := storage.Everything
964+
pred.AllowWatchBookmarks = true
965+
966+
w, err := cacher.Watch(context.TODO(), "/pods/ns", storage.ListOptions{
967+
ResourceVersion: "1000",
968+
Predicate: pred,
969+
})
970+
if err != nil {
971+
t.Fatalf("Failed to create watch: %v", err)
972+
}
973+
974+
expectedRV := 2000
975+
976+
wg := sync.WaitGroup{}
977+
wg.Add(1)
978+
go func() {
979+
defer wg.Done()
980+
for {
981+
event, ok := <-w.ResultChan()
982+
if !ok {
983+
t.Fatalf("Unexpected closed channel")
984+
}
985+
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
986+
if err != nil {
987+
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
988+
}
989+
if event.Type == watch.Bookmark && rv == uint64(expectedRV) {
990+
return
991+
}
992+
}
993+
}()
994+
995+
// Simulate progress notify event.
996+
cacher.watchCache.UpdateResourceVersion(strconv.Itoa(expectedRV))
997+
998+
wg.Wait()
999+
}
1000+
9411001
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
9421002
backingStorage := &dummyStorage{}
9431003
cacher, _, err := newTestCacher(backingStorage)

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,9 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
320320
}
321321

322322
// Avoid calling event handler under lock.
323-
// This is safe as long as there is at most one call to processEvent in flight
324-
// at any point in time.
323+
// This is safe as long as there is at most one call to Add/Update/Delete and
324+
// UpdateResourceVersion in flight at any point in time, which is true now,
325+
// because reflector calls them synchronously from its main thread.
325326
if w.eventHandler != nil {
326327
w.eventHandler(wcEvent)
327328
}
@@ -388,20 +389,23 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
388389
return
389390
}
390391

391-
w.Lock()
392-
defer w.Unlock()
393-
w.resourceVersion = rv
394-
395-
// Don't dispatch bookmarks coming from the storage layer.
396-
// They can be very frequent (even to the level of subseconds)
397-
// to allow efficient watch resumption on kube-apiserver restarts,
398-
// and propagating them down may overload the whole system.
399-
//
400-
// TODO: If at some point we decide the performance and scalability
401-
// footprint is acceptable, this is the place to hook them in.
402-
// However, we then need to check if this was called as a result
403-
// of a bookmark event or regular Add/Update/Delete operation by
404-
// checking if resourceVersion here has changed.
392+
func() {
393+
w.Lock()
394+
defer w.Unlock()
395+
w.resourceVersion = rv
396+
}()
397+
398+
// Avoid calling event handler under lock.
399+
// This is safe as long as there is at most one call to Add/Update/Delete and
400+
// UpdateResourceVersion in flight at any point in time, which is true now,
401+
// because reflector calls them synchronously from its main thread.
402+
if w.eventHandler != nil {
403+
wcEvent := &watchCacheEvent{
404+
Type: watch.Bookmark,
405+
ResourceVersion: rv,
406+
}
407+
w.eventHandler(wcEvent)
408+
}
405409
}
406410

407411
// List returns list of pointers to <storeElement> objects.

0 commit comments

Comments
 (0)