Skip to content

Commit 3546d62

Browse files
authored
Merge pull request kubernetes#90560 from wojtek-t/frequent_bookmarks
Send watch bookmarks every minute
2 parents ac963ad + 9f1e462 commit 3546d62

File tree

3 files changed

+112
-22
lines changed

3 files changed

+112
-22
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ const (
5151
// storageWatchListPageSize is the cacher's request chunk size of
5252
// initial and resync watch lists to storage.
5353
storageWatchListPageSize = int64(10000)
54+
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
55+
// in addition to sending a bookmark right before watch deadline
56+
defaultBookmarkFrequency = time.Minute
5457
)
5558

5659
// Config contains the configuration for a given Cache.
@@ -154,24 +157,26 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
154157
// second in a bucket, and pop up them once at the timeout. To be more specific,
155158
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
156159
type watcherBookmarkTimeBuckets struct {
157-
lock sync.Mutex
158-
watchersBuckets map[int64][]*cacheWatcher
159-
startBucketID int64
160-
clock clock.Clock
160+
lock sync.Mutex
161+
watchersBuckets map[int64][]*cacheWatcher
162+
startBucketID int64
163+
clock clock.Clock
164+
bookmarkFrequency time.Duration
161165
}
162166

163-
func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets {
167+
func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *watcherBookmarkTimeBuckets {
164168
return &watcherBookmarkTimeBuckets{
165-
watchersBuckets: make(map[int64][]*cacheWatcher),
166-
startBucketID: clock.Now().Unix(),
167-
clock: clock,
169+
watchersBuckets: make(map[int64][]*cacheWatcher),
170+
startBucketID: clock.Now().Unix(),
171+
clock: clock,
172+
bookmarkFrequency: bookmarkFrequency,
168173
}
169174
}
170175

171176
// adds a watcher to the bucket, if the deadline is before the start, it will be
172177
// added to the first one.
173178
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
174-
nextTime, ok := w.nextBookmarkTime(t.clock.Now())
179+
nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency)
175180
if !ok {
176181
return false
177182
}
@@ -339,7 +344,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
339344
stopCh: stopCh,
340345
clock: clock,
341346
timer: time.NewTimer(time.Duration(0)),
342-
bookmarkWatchers: newTimeBucketWatchers(clock),
347+
bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency),
343348
}
344349

345350
// Ensure that timer is stopped.
@@ -914,9 +919,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
914919
continue
915920
}
916921
c.watchersBuffer = append(c.watchersBuffer, watcher)
917-
// Given that we send bookmark event once at deadline-2s, never push again
918-
// after the watcher pops up from the buckets. Once we decide to change the
919-
// strategy to more sophisticated, we may need it here.
922+
// Requeue the watcher for the next bookmark if needed.
923+
c.bookmarkWatchers.addWatcher(watcher)
920924
}
921925
}
922926
}
@@ -1219,13 +1223,28 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
12191223
}
12201224
}
12211225

1222-
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
1223-
// For now we return 2s before deadline (and maybe +infinity is now already passed this time)
1224-
// but it gives us extensibility for the future(false when deadline is not set).
1226+
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
1227+
// We try to send bookmarks:
1228+
// (a) roughly every minute
1229+
// (b) right before the watcher timeout - for now we simply set it 2s before
1230+
// the deadline
1231+
// The former gives us periodicity if the watch breaks due to unexpected
1232+
// conditions, the later ensures that on timeout the watcher is as close to
1233+
// now as possible - this covers 99% of cases.
1234+
heartbeatTime := now.Add(bookmarkFrequency)
12251235
if c.deadline.IsZero() {
1226-
return c.deadline, false
1236+
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
1237+
// apiserver if properly configured. So this shoudln't happen in practice.
1238+
return heartbeatTime, true
1239+
}
1240+
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
1241+
heartbeatTime = pretimeoutTime
1242+
}
1243+
1244+
if heartbeatTime.Before(now) {
1245+
return time.Time{}, false
12271246
}
1228-
return c.deadline.Add(-2 * time.Second), true
1247+
return heartbeatTime, true
12291248
}
12301249

12311250
func getEventObject(object runtime.Object) runtime.Object {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
558558
}
559559

560560
clock := clock.NewFakeClock(time.Now())
561-
watchers := newTimeBucketWatchers(clock)
561+
watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
562562
now := clock.Now()
563563
watchers.addWatcher(newWatcher(now.Add(10 * time.Second)))
564564
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
@@ -746,6 +746,77 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
746746
}
747747
}
748748

749+
func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
750+
backingStorage := &dummyStorage{}
751+
cacher, _, err := newTestCacher(backingStorage, 1000)
752+
if err != nil {
753+
t.Fatalf("Couldn't create cacher: %v", err)
754+
}
755+
defer cacher.Stop()
756+
// Update bookmarkFrequency to speed up test.
757+
// Note that the frequency lower than 1s doesn't change much due to
758+
// resolution how frequency we recompute.
759+
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
760+
761+
// Wait until cacher is initialized.
762+
cacher.ready.wait()
763+
pred := storage.Everything
764+
pred.AllowWatchBookmarks = true
765+
766+
makePod := func(index int) *examplev1.Pod {
767+
return &examplev1.Pod{
768+
ObjectMeta: metav1.ObjectMeta{
769+
Name: fmt.Sprintf("pod-%d", index),
770+
Namespace: "ns",
771+
ResourceVersion: fmt.Sprintf("%v", 100+index),
772+
},
773+
}
774+
}
775+
776+
// Create pod to initialize watch cache.
777+
if err := cacher.watchCache.Add(makePod(0)); err != nil {
778+
t.Fatalf("failed to add a pod: %v", err)
779+
}
780+
781+
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
782+
w, err := cacher.Watch(ctx, "pods/ns", "100", pred)
783+
if err != nil {
784+
t.Fatalf("Failed to create watch: %v", err)
785+
}
786+
787+
// Create one more pod, to ensure that current RV is higher and thus
788+
// bookmarks will be delievere (events are delivered for RV higher
789+
// than the max from init events).
790+
if err := cacher.watchCache.Add(makePod(1)); err != nil {
791+
t.Fatalf("failed to add a pod: %v", err)
792+
}
793+
794+
timeoutCh := time.After(5 * time.Second)
795+
lastObservedRV := uint64(0)
796+
// Ensure that a watcher gets two bookmarks.
797+
for observedBookmarks := 0; observedBookmarks < 2; {
798+
select {
799+
case event, ok := <-w.ResultChan():
800+
if !ok {
801+
t.Fatal("Unexpected closed")
802+
}
803+
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
804+
if err != nil {
805+
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
806+
}
807+
if event.Type == watch.Bookmark {
808+
observedBookmarks++
809+
if rv < lastObservedRV {
810+
t.Errorf("Unexpected bookmark event resource version %v (last %v)", rv, lastObservedRV)
811+
}
812+
}
813+
lastObservedRV = rv
814+
case <-timeoutCh:
815+
t.Fatal("Unexpected timeout to receive bookmark events")
816+
}
817+
}
818+
}
819+
749820
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
750821
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
751822
backingStorage := &dummyStorage{}

staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -855,12 +855,12 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
855855
allowWatchBookmark bool
856856
}{
857857
{ // test old client won't get Bookmark event
858-
timeout: 2 * time.Second,
858+
timeout: 3 * time.Second,
859859
expected: false,
860860
allowWatchBookmark: false,
861861
},
862862
{
863-
timeout: 2 * time.Second,
863+
timeout: 3 * time.Second,
864864
expected: true,
865865
allowWatchBookmark: true,
866866
},
@@ -909,7 +909,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
909909

910910
pred := storage.Everything
911911
pred.AllowWatchBookmarks = true
912-
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
912+
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
913913
watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred)
914914
if err != nil {
915915
t.Fatalf("Unexpected error: %v", err)

0 commit comments

Comments
 (0)