Skip to content

Commit 98461be

Browse files
committed
Implement conditionalProgressRequester that allows requesting watch progress notification if watch cache is not fresh
1 parent ce7fd46 commit 98461be

File tree

11 files changed

+331
-9
lines changed

11 files changed

+331
-9
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ type Config struct {
104104

105105
Codec runtime.Codec
106106

107-
Clock clock.Clock
107+
Clock clock.WithTicker
108108
}
109109

110110
type watchersMap map[int]*cacheWatcher
@@ -329,6 +329,10 @@ type Cacher struct {
329329
expiredBookmarkWatchers []*cacheWatcher
330330
}
331331

332+
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
333+
return c.storage.RequestWatchProgress(ctx)
334+
}
335+
332336
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
333337
// its internal cache and updating its cache in the background based on the
334338
// given configuration.
@@ -397,9 +401,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
397401
// so that future reuse does not get a spurious timeout.
398402
<-cacher.timer.C
399403
}
400-
404+
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
401405
watchCache := newWatchCache(
402-
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
406+
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
403407
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
404408
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
405409

@@ -419,6 +423,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
419423
cacher.reflector = reflector
420424

421425
go cacher.dispatchEvents()
426+
go progressRequester.Run(stopCh)
422427

423428
cacher.stopWg.Add(1)
424429
go func() {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ type setupOptions struct {
328328
keyFunc func(runtime.Object) (string, error)
329329
indexerFuncs map[string]storage.IndexerFunc
330330
pagingEnabled bool
331-
clock clock.Clock
331+
clock clock.WithTicker
332332
}
333333

334334
type setupOption func(*setupOptions)

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ type dummyStorage struct {
9090
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
9191
}
9292

93+
func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error {
94+
return nil
95+
}
96+
9397
type dummyWatch struct {
9498
ch chan watch.Event
9599
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ type watchCache struct {
196196

197197
// For testing cache interval invalidation.
198198
indexValidator indexValidator
199+
200+
// Requests progress notification if there are requests waiting for watch
201+
// to be fresh
202+
waitingUntilFresh *conditionalProgressRequester
199203
}
200204

201205
func newWatchCache(
@@ -204,8 +208,9 @@ func newWatchCache(
204208
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
205209
versioner storage.Versioner,
206210
indexers *cache.Indexers,
207-
clock clock.Clock,
208-
groupResource schema.GroupResource) *watchCache {
211+
clock clock.WithTicker,
212+
groupResource schema.GroupResource,
213+
progressRequester *conditionalProgressRequester) *watchCache {
209214
wc := &watchCache{
210215
capacity: defaultLowerBoundCapacity,
211216
keyFunc: keyFunc,
@@ -222,6 +227,7 @@ func newWatchCache(
222227
clock: clock,
223228
versioner: versioner,
224229
groupResource: groupResource,
230+
waitingUntilFresh: progressRequester,
225231
}
226232
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
227233
wc.cond = sync.NewCond(wc.RLocker())

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
287287
for _, c := range cases {
288288
t.Run(c.name, func(t *testing.T) {
289289
wc := newTestWatchCache(capacity, &cache.Indexers{})
290+
defer wc.Stop()
290291
for i := 0; i < c.eventsAddedToWatchcache; i++ {
291292
wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))
292293
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement {
6868

6969
type testWatchCache struct {
7070
*watchCache
71+
72+
bookmarkRevision chan int64
73+
stopCh chan struct{}
7174
}
7275

7376
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
@@ -112,19 +115,42 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
112115
}
113116
versioner := storage.APIObjectVersioner{}
114117
mockHandler := func(*watchCacheEvent) {}
115-
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"})
118+
wc := &testWatchCache{}
119+
wc.bookmarkRevision = make(chan int64, 1)
120+
wc.stopCh = make(chan struct{})
121+
clock := testingclock.NewFakeClock(time.Now())
122+
pr := newConditionalProgressRequester(wc.RequestWatchProgress, clock)
123+
go pr.Run(wc.stopCh)
124+
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock, schema.GroupResource{Resource: "pods"}, pr)
116125
// To preserve behavior of tests that assume a given capacity,
117126
// resize it to th expected size.
118127
wc.capacity = capacity
119128
wc.cache = make([]*watchCacheEvent, capacity)
120129
wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
121130
wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
122131

123-
return &testWatchCache{watchCache: wc}
132+
return wc
133+
}
134+
135+
func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {
136+
go func() {
137+
select {
138+
case rev := <-w.bookmarkRevision:
139+
w.UpdateResourceVersion(fmt.Sprintf("%d", rev))
140+
case <-ctx.Done():
141+
return
142+
}
143+
}()
144+
return nil
145+
}
146+
147+
func (w *testWatchCache) Stop() {
148+
close(w.stopCh)
124149
}
125150

126151
func TestWatchCacheBasic(t *testing.T) {
127152
store := newTestWatchCache(2, &cache.Indexers{})
153+
defer store.Stop()
128154

129155
// Test Add/Update/Delete.
130156
pod1 := makeTestPod("pod", 1)
@@ -202,6 +228,7 @@ func TestWatchCacheBasic(t *testing.T) {
202228

203229
func TestEvents(t *testing.T) {
204230
store := newTestWatchCache(5, &cache.Indexers{})
231+
defer store.Stop()
205232

206233
// no dynamic-size cache to fit old tests.
207234
store.lowerBoundCapacity = 5
@@ -326,6 +353,7 @@ func TestEvents(t *testing.T) {
326353

327354
func TestMarker(t *testing.T) {
328355
store := newTestWatchCache(3, &cache.Indexers{})
356+
defer store.Stop()
329357

330358
// First thing that is called when propagated from storage is Replace.
331359
store.Replace([]interface{}{
@@ -380,7 +408,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
380408
return []string{pod.Spec.NodeName}, nil
381409
},
382410
})
383-
411+
defer store.Stop()
384412
// In background, update the store.
385413
go func() {
386414
store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"}))
@@ -463,6 +491,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
463491
func TestWaitUntilFreshAndGet(t *testing.T) {
464492
ctx := context.Background()
465493
store := newTestWatchCache(3, &cache.Indexers{})
494+
defer store.Stop()
466495

467496
// In background, update the store.
468497
go func() {
@@ -489,6 +518,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
489518
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
490519
ctx := context.Background()
491520
store := newTestWatchCache(3, &cache.Indexers{})
521+
defer store.Stop()
492522
fc := store.clock.(*testingclock.FakeClock)
493523

494524
// In background, step clock after the below call starts the timer.
@@ -529,6 +559,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
529559
func TestReflectorForWatchCache(t *testing.T) {
530560
ctx := context.Background()
531561
store := newTestWatchCache(5, &cache.Indexers{})
562+
defer store.Stop()
532563

533564
{
534565
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
@@ -792,6 +823,7 @@ func TestDynamicCache(t *testing.T) {
792823
for _, test := range tests {
793824
t.Run(test.name, func(t *testing.T) {
794825
store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
826+
defer store.Stop()
795827
store.cache = make([]*watchCacheEvent, test.cacheCapacity)
796828
store.startIndex = test.startIndex
797829
store.lowerBoundCapacity = test.lowerBoundCapacity
@@ -840,6 +872,7 @@ func checkCacheElements(cache *testWatchCache) bool {
840872

841873
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
842874
store := newTestWatchCache(2, &cache.Indexers{})
875+
defer store.Stop()
843876

844877
now := store.clock.Now()
845878
addEvent := func(key string, rv uint64, t time.Time) {
@@ -988,6 +1021,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
9881021
for _, test := range testCases {
9891022
t.Run(test.name, func(t *testing.T) {
9901023
store := newTestWatchCache(test.capacity, &cache.Indexers{})
1024+
defer store.Stop()
9911025
got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
9921026
if got != test.expected {
9931027
t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected)
@@ -998,6 +1032,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
9981032

9991033
func BenchmarkWatchCache_updateCache(b *testing.B) {
10001034
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
1035+
defer store.Stop()
10011036
store.cache = store.cache[:0]
10021037
store.upperBoundCapacity = defaultUpperBoundCapacity
10031038
loadEventWithDuration(store, defaultUpperBoundCapacity, 0)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cacher
18+
19+
import (
20+
"context"
21+
"sync"
22+
"time"
23+
24+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25+
"k8s.io/apimachinery/pkg/util/wait"
26+
27+
"k8s.io/klog/v2"
28+
"k8s.io/utils/clock"
29+
)
30+
31+
const (
32+
// progressRequestPeriod determines period of requesting progress
33+
// from etcd when there is a request waiting for watch cache to be fresh.
34+
progressRequestPeriod = 100 * time.Millisecond
35+
)
36+
37+
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock clock.WithTicker) *conditionalProgressRequester {
38+
pr := &conditionalProgressRequester{
39+
clock: clock,
40+
requestWatchProgress: requestWatchProgress,
41+
}
42+
pr.cond = sync.NewCond(pr.mux.RLocker())
43+
return pr
44+
}
45+
46+
type WatchProgressRequester func(ctx context.Context) error
47+
48+
// conditionalProgressRequester will request progress notification if there
49+
// is a request waiting for watch cache to be fresh.
50+
type conditionalProgressRequester struct {
51+
clock clock.WithTicker
52+
requestWatchProgress WatchProgressRequester
53+
54+
mux sync.RWMutex
55+
cond *sync.Cond
56+
waiting int
57+
stopped bool
58+
}
59+
60+
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
61+
ctx := wait.ContextForChannel(stopCh)
62+
go func() {
63+
defer utilruntime.HandleCrash()
64+
<-stopCh
65+
pr.mux.Lock()
66+
defer pr.mux.Unlock()
67+
pr.stopped = true
68+
pr.cond.Signal()
69+
}()
70+
ticker := pr.clock.NewTicker(progressRequestPeriod)
71+
defer ticker.Stop()
72+
for {
73+
stopped := func() bool {
74+
pr.mux.RLock()
75+
defer pr.mux.RUnlock()
76+
for pr.waiting == 0 && !pr.stopped {
77+
pr.cond.Wait()
78+
}
79+
return pr.stopped
80+
}()
81+
if stopped {
82+
return
83+
}
84+
85+
select {
86+
case <-ticker.C():
87+
shouldRequest := func() bool {
88+
pr.mux.RLock()
89+
defer pr.mux.RUnlock()
90+
return pr.waiting > 0 && !pr.stopped
91+
}()
92+
if !shouldRequest {
93+
continue
94+
}
95+
err := pr.requestWatchProgress(ctx)
96+
if err != nil {
97+
klog.V(4).InfoS("Error requesting bookmark", "err", err)
98+
}
99+
case <-stopCh:
100+
return
101+
}
102+
}
103+
}
104+
105+
func (pr *conditionalProgressRequester) Add() {
106+
pr.mux.Lock()
107+
defer pr.mux.Unlock()
108+
pr.waiting += 1
109+
pr.cond.Signal()
110+
}
111+
112+
func (pr *conditionalProgressRequester) Remove() {
113+
pr.mux.Lock()
114+
defer pr.mux.Unlock()
115+
pr.waiting -= 1
116+
pr.cond.Signal()
117+
}

0 commit comments

Comments
 (0)