Skip to content

Commit df9e666

Browse files
authored
Merge pull request kubernetes#75717 from wojtek-t/reduce_critical_sections
Reduce critical sections in cacher::Watch function
2 parents 4b3eb60 + 010cb44 commit df9e666

File tree

2 files changed

+35
-27
lines changed

2 files changed

+35
-27
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ var (
5353
},
5454
[]string{"resource"},
5555
)
56+
emptyFunc = func() {}
5657
)
5758

5859
func init() {
@@ -339,21 +340,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
339340

340341
c.ready.wait()
341342

342-
// We explicitly use thread unsafe version and do locking ourself to ensure that
343-
// no new events will be processed in the meantime. The watchCache will be unlocked
344-
// on return from this function.
345-
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
346-
// underlying watchCache is calling processEvent under its lock.
347-
c.watchCache.RLock()
348-
defer c.watchCache.RUnlock()
349-
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
350-
if err != nil {
351-
// To match the uncached watch implementation, once we have passed authn/authz/admission,
352-
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
353-
// rather than a directly returned error.
354-
return newErrWatcher(err), nil
355-
}
356-
357343
triggerValue, triggerSupported := "", false
358344
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
359345
// passed here is aware of exactly the same trigger (at most one).
@@ -377,20 +363,42 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
377363
chanSize = 1000
378364
}
379365

366+
// Create a watcher here to reduce memory allocations under lock,
367+
// given that memory allocation may trigger GC and block the thread.
368+
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner)
369+
370+
// We explicitly use thread unsafe version and do locking ourself to ensure that
371+
// no new events will be processed in the meantime. The watchCache will be unlocked
372+
// on return from this function.
373+
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
374+
// underlying watchCache is calling processEvent under its lock.
375+
c.watchCache.RLock()
376+
defer c.watchCache.RUnlock()
377+
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
378+
if err != nil {
379+
// To match the uncached watch implementation, once we have passed authn/authz/admission,
380+
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
381+
// rather than a directly returned error.
382+
return newErrWatcher(err), nil
383+
}
384+
380385
// With some events already sent, update resourceVersion so that
381386
// events that were buffered and not yet processed won't be delivered
382387
// to this watcher second time causing going back in time.
383388
if len(initEvents) > 0 {
384389
watchRV = initEvents[len(initEvents)-1].ResourceVersion
385390
}
386391

387-
c.Lock()
388-
defer c.Unlock()
389-
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
390-
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
392+
func() {
393+
c.Lock()
394+
defer c.Unlock()
395+
// Update watcher.forget function once we can compute it.
396+
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
397+
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
398+
c.watcherIdx++
399+
}()
391400

392-
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
393-
c.watcherIdx++
401+
go watcher.process(initEvents, watchRV)
394402
return watcher, nil
395403
}
396404

@@ -879,8 +887,8 @@ type cacheWatcher struct {
879887
versioner storage.Versioner
880888
}
881889

882-
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
883-
watcher := &cacheWatcher{
890+
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
891+
return &cacheWatcher{
884892
input: make(chan *watchCacheEvent, chanSize),
885893
result: make(chan watch.Event, chanSize),
886894
done: make(chan struct{}),
@@ -889,8 +897,6 @@ func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCa
889897
forget: forget,
890898
versioner: versioner,
891899
}
892-
go watcher.process(initEvents, resourceVersion)
893-
return watcher
894900
}
895901

896902
// Implements watch.Interface.

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
6363
}
6464
// set the size of the buffer of w.result to 0, so that the writes to
6565
// w.result is blocked.
66-
w = newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
66+
w = newCacheWatcher(0, filter, forget, testVersioner{})
67+
go w.process(initEvents, 0)
6768
w.Stop()
6869
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
6970
lock.RLock()
@@ -181,7 +182,8 @@ TestCase:
181182
for j := range testCase.events {
182183
testCase.events[j].ResourceVersion = uint64(j) + 1
183184
}
184-
w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{})
185+
w := newCacheWatcher(0, filter, forget, testVersioner{})
186+
go w.process(testCase.events, 0)
185187
ch := w.ResultChan()
186188
for j, event := range testCase.expected {
187189
e := <-ch

0 commit comments

Comments
 (0)