Skip to content

Commit c121632

Browse files
committed
Reduce Pool contention in cacher
1 parent 999e2e0 commit c121632

File tree

1 file changed

+20
-17
lines changed
  • staging/src/k8s.io/apiserver/pkg/storage/cacher

1 file changed

+20
-17
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ type Cacher struct {
186186
stopped bool
187187
stopCh chan struct{}
188188
stopWg sync.WaitGroup
189+
190+
// Used to avoid unnecessary allocations in underlying watchers.
191+
timer *time.Timer
189192
}
190193

191194
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@@ -227,6 +230,7 @@ func NewCacherFromConfig(config Config) *Cacher {
227230
// and there are no guarantees on the order that they will stop.
228231
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
229232
stopCh: stopCh,
233+
timer: time.NewTimer(time.Duration(0)),
230234
}
231235
watchCache.SetOnEvent(cacher.processEvent)
232236
go cacher.dispatchEvents()
@@ -242,6 +246,14 @@ func NewCacherFromConfig(config Config) *Cacher {
242246
}, time.Second, stopCh,
243247
)
244248
}()
249+
250+
// Ensure that timer is stopped.
251+
if !cacher.timer.Stop() {
252+
// Consume triggered (but not yet received) timer event
253+
// so that future reuse does not get a spurious timeout.
254+
<-cacher.timer.C
255+
}
256+
245257
return cacher
246258
}
247259

@@ -621,13 +633,13 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
621633
defer c.Unlock()
622634
// Iterate over "allWatchers" no matter what the trigger function is.
623635
for _, watcher := range c.watchers.allWatchers {
624-
watcher.add(event, c.dispatchTimeoutBudget)
636+
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
625637
}
626638
if supported {
627639
// Iterate over watchers interested in the given values of the trigger.
628640
for _, triggerValue := range triggerValues {
629641
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
630-
watcher.add(event, c.dispatchTimeoutBudget)
642+
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
631643
}
632644
}
633645
} else {
@@ -640,7 +652,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
640652
// Iterate over watchers interested in exact values for all values.
641653
for _, watchers := range c.watchers.valueWatchers {
642654
for _, watcher := range watchers {
643-
watcher.add(event, c.dispatchTimeoutBudget)
655+
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
644656
}
645657
}
646658
}
@@ -826,9 +838,7 @@ func (c *cacheWatcher) stop() {
826838
}
827839
}
828840

829-
var timerPool sync.Pool
830-
831-
func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
841+
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
832842
// Try to send the event immediately, without blocking.
833843
select {
834844
case c.input <- event:
@@ -842,23 +852,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
842852
startTime := time.Now()
843853
timeout := budget.takeAvailable()
844854

845-
t, ok := timerPool.Get().(*time.Timer)
846-
if ok {
847-
t.Reset(timeout)
848-
} else {
849-
t = time.NewTimer(timeout)
850-
}
851-
defer timerPool.Put(t)
855+
timer.Reset(timeout)
852856

853857
select {
854858
case c.input <- event:
855-
stopped := t.Stop()
856-
if !stopped {
859+
if !timer.Stop() {
857860
// Consume triggered (but not yet received) timer event
858861
// so that future reuse does not get a spurious timeout.
859-
<-t.C
862+
<-timer.C
860863
}
861-
case <-t.C:
864+
case <-timer.C:
862865
// This means that we couldn't send event to that watcher.
863866
// Since we don't want to block on it infinitely,
864867
// we simply terminate it.

0 commit comments

Comments
 (0)