Skip to content

Commit 9a1572b

Browse files
authored
Merge pull request kubernetes#76702 from wojtek-t/reduce_watchcache_contention
Reduce contention in watchcache by not calling event handler under lock
2 parents e0a1913 + e6e4382 commit 9a1572b

File tree

3 files changed

+59
-48
lines changed

3 files changed

+59
-48
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -291,30 +291,18 @@ type Cacher struct {
291291
// given configuration.
292292
func NewCacherFromConfig(config Config) *Cacher {
293293
stopCh := make(chan struct{})
294-
295-
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
296-
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
297-
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
298-
299294
obj := config.NewFunc()
300295
// Give this error when it is constructed rather than when you get the
301296
// first watch item, because it's much easier to track down that way.
302297
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
303298
panic("storage codec doesn't seem to match given type: " + err.Error())
304299
}
305300

306-
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
307-
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
308-
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
309-
reflector.WatchListPageSize = storageWatchListPageSize
310-
311301
clock := clock.RealClock{}
312302
cacher := &Cacher{
313303
ready: newReady(),
314304
storage: config.Storage,
315305
objectType: reflect.TypeOf(obj),
316-
watchCache: watchCache,
317-
reflector: reflector,
318306
versioner: config.Versioner,
319307
newFunc: config.NewFunc,
320308
triggerFunc: config.TriggerPublisherFunc,
@@ -337,7 +325,27 @@ func NewCacherFromConfig(config Config) *Cacher {
337325
bookmarkWatchers: newTimeBucketWatchers(clock),
338326
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
339327
}
340-
watchCache.SetOnEvent(cacher.processEvent)
328+
329+
// Ensure that timer is stopped.
330+
if !cacher.timer.Stop() {
331+
// Consume triggered (but not yet received) timer event
332+
// so that future reuse does not get a spurious timeout.
333+
<-cacher.timer.C
334+
}
335+
336+
watchCache := newWatchCache(
337+
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner)
338+
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
339+
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
340+
341+
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
342+
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
343+
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
344+
reflector.WatchListPageSize = storageWatchListPageSize
345+
346+
cacher.watchCache = watchCache
347+
cacher.reflector = reflector
348+
341349
go cacher.dispatchEvents()
342350

343351
cacher.stopWg.Add(1)
@@ -352,13 +360,6 @@ func NewCacherFromConfig(config Config) *Cacher {
352360
)
353361
}()
354362

355-
// Ensure that timer is stopped.
356-
if !cacher.timer.Stop() {
357-
// Consume triggered (but not yet received) timer event
358-
// so that future reuse does not get a spurious timeout.
359-
<-cacher.timer.C
360-
}
361-
362363
return cacher
363364
}
364365

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type watchCache struct {
125125

126126
// This handler is run at the end of every Add/Update/Delete method
127127
// and additionally gets the previous value of the object.
128-
onEvent func(*watchCacheEvent)
128+
eventHandler func(*watchCacheEvent)
129129

130130
// for testing timeouts.
131131
clock clock.Clock
@@ -137,6 +137,7 @@ type watchCache struct {
137137
func newWatchCache(
138138
capacity int,
139139
keyFunc func(runtime.Object) (string, error),
140+
eventHandler func(*watchCacheEvent),
140141
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
141142
versioner storage.Versioner) *watchCache {
142143
wc := &watchCache{
@@ -149,6 +150,7 @@ func newWatchCache(
149150
store: cache.NewStore(storeElementKey),
150151
resourceVersion: 0,
151152
listResourceVersion: 0,
153+
eventHandler: eventHandler,
152154
clock: clock.RealClock{},
153155
versioner: versioner,
154156
}
@@ -204,6 +206,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
204206
return object, resourceVersion, nil
205207
}
206208

209+
// processEvent is safe as long as there is at most one call to it in flight
210+
// at any point in time.
207211
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
208212
key, err := w.keyFunc(event.Object)
209213
if err != nil {
@@ -224,30 +228,41 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
224228
ResourceVersion: resourceVersion,
225229
}
226230

227-
// TODO: We should consider moving this lock below after the watchCacheEvent
228-
// is created. In such situation, the only problematic scenario is Replace(
229-
// happening after getting object from store and before acquiring a lock.
230-
// Maybe introduce another lock for this purpose.
231-
w.Lock()
232-
defer w.Unlock()
233-
previous, exists, err := w.store.Get(elem)
234-
if err != nil {
231+
if err := func() error {
232+
// TODO: We should consider moving this lock below after the watchCacheEvent
233+
// is created. In such situation, the only problematic scenario is Replace(
234+
// happening after getting object from store and before acquiring a lock.
235+
// Maybe introduce another lock for this purpose.
236+
w.Lock()
237+
defer w.Unlock()
238+
239+
previous, exists, err := w.store.Get(elem)
240+
if err != nil {
241+
return err
242+
}
243+
if exists {
244+
previousElem := previous.(*storeElement)
245+
watchCacheEvent.PrevObject = previousElem.Object
246+
watchCacheEvent.PrevObjLabels = previousElem.Labels
247+
watchCacheEvent.PrevObjFields = previousElem.Fields
248+
}
249+
250+
w.updateCache(watchCacheEvent)
251+
w.resourceVersion = resourceVersion
252+
defer w.cond.Broadcast()
253+
254+
return updateFunc(elem)
255+
}(); err != nil {
235256
return err
236257
}
237-
if exists {
238-
previousElem := previous.(*storeElement)
239-
watchCacheEvent.PrevObject = previousElem.Object
240-
watchCacheEvent.PrevObjLabels = previousElem.Labels
241-
watchCacheEvent.PrevObjFields = previousElem.Fields
242-
}
243-
w.updateCache(watchCacheEvent)
244-
w.resourceVersion = resourceVersion
245258

246-
if w.onEvent != nil {
247-
w.onEvent(watchCacheEvent)
259+
// Avoid calling event handler under lock.
260+
// This is safe as long as there is at most one call to processEvent in flight
261+
// at any point in time.
262+
if w.eventHandler != nil {
263+
w.eventHandler(watchCacheEvent)
248264
}
249-
w.cond.Broadcast()
250-
return updateFunc(elem)
265+
return nil
251266
}
252267

253268
// Assumes that lock is already held for write.
@@ -397,12 +412,6 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
397412
w.onReplace = onReplace
398413
}
399414

400-
func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
401-
w.Lock()
402-
defer w.Unlock()
403-
w.onEvent = onEvent
404-
}
405-
406415
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
407416
size := w.endIndex - w.startIndex
408417
var oldest uint64

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ func newTestWatchCache(capacity int) *watchCache {
7676
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
7777
}
7878
versioner := etcd.APIObjectVersioner{}
79-
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
79+
mockHandler := func(*watchCacheEvent) {}
80+
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner)
8081
wc.clock = clock.NewFakeClock(time.Now())
8182
return wc
8283
}

0 commit comments

Comments
 (0)