Skip to content

Commit b69cc78

Browse files
authored
Merge pull request kubernetes#76239 from hormes/delivery_watch_events_nonblocking_first
delivery event non blocking firstly
2 parents 303509c + 5e493ab commit b69cc78

File tree

2 files changed

+124
-21
lines changed

2 files changed

+124
-21
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ type Cacher struct {
276276
// watchersBuffer is a list of watchers potentially interested in currently
277277
// dispatched event.
278278
watchersBuffer []*cacheWatcher
279+
// blockedWatchers is a list of watchers whose buffer is currently full.
280+
blockedWatchers []*cacheWatcher
279281
// watchersToStop is a list of watchers that were supposed to be stopped
280282
// during current dispatching, but stopping was deferred to the end of
281283
// dispatching that event to avoid race with closing channels in watchers.
@@ -789,13 +791,45 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
789791
// Watchers stopped after startDispatching will be delayed to finishDispatching,
790792

791793
// Since add() can block, we explicitly add when cacher is unlocked.
794+
// Dispatching event in nonblocking way first, which make faster watchers
795+
// not be blocked by slower ones.
792796
if event.Type == watch.Bookmark {
793797
for _, watcher := range c.watchersBuffer {
794798
watcher.nonblockingAdd(event)
795799
}
796800
} else {
801+
c.blockedWatchers = c.blockedWatchers[:0]
797802
for _, watcher := range c.watchersBuffer {
798-
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
803+
if !watcher.nonblockingAdd(event) {
804+
c.blockedWatchers = append(c.blockedWatchers, watcher)
805+
}
806+
}
807+
808+
if len(c.blockedWatchers) > 0 {
809+
// dispatchEvent is called very often, so arrange
810+
// to reuse timers instead of constantly allocating.
811+
startTime := time.Now()
812+
timeout := c.dispatchTimeoutBudget.takeAvailable()
813+
c.timer.Reset(timeout)
814+
815+
// Make sure every watcher will try to send event without blocking first,
816+
// even if the timer has already expired.
817+
timer := c.timer
818+
for _, watcher := range c.blockedWatchers {
819+
if !watcher.add(event, timer) {
820+
// fired, clean the timer by set it to nil.
821+
timer = nil
822+
}
823+
}
824+
825+
// Stop the timer if it is not fired
826+
if timer != nil && !timer.Stop() {
827+
// Consume triggered (but not yet received) timer event
828+
// so that future reuse does not get a spurious timeout.
829+
<-timer.C
830+
}
831+
832+
c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
799833
}
800834
}
801835
}
@@ -1078,7 +1112,6 @@ func (c *cacheWatcher) stop() {
10781112
}
10791113

10801114
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
1081-
// If we can't send it, don't block on it.
10821115
select {
10831116
case c.input <- event:
10841117
return true
@@ -1087,36 +1120,34 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
10871120
}
10881121
}
10891122

1090-
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
1123+
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
1124+
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
10911125
// Try to send the event immediately, without blocking.
10921126
if c.nonblockingAdd(event) {
1093-
return
1127+
return true
10941128
}
10951129

1096-
// OK, block sending, but only for up to <timeout>.
1097-
// cacheWatcher.add is called very often, so arrange
1098-
// to reuse timers instead of constantly allocating.
1099-
startTime := time.Now()
1100-
timeout := budget.takeAvailable()
1101-
1102-
timer.Reset(timeout)
1103-
1104-
select {
1105-
case c.input <- event:
1106-
if !timer.Stop() {
1107-
// Consume triggered (but not yet received) timer event
1108-
// so that future reuse does not get a spurious timeout.
1109-
<-timer.C
1110-
}
1111-
case <-timer.C:
1130+
closeFunc := func() {
11121131
// This means that we couldn't send event to that watcher.
11131132
// Since we don't want to block on it infinitely,
11141133
// we simply terminate it.
11151134
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
11161135
c.forget()
11171136
}
11181137

1119-
budget.returnUnused(timeout - time.Since(startTime))
1138+
if timer == nil {
1139+
closeFunc()
1140+
return false
1141+
}
1142+
1143+
// OK, block sending, but only until timer fires.
1144+
select {
1145+
case c.input <- event:
1146+
return true
1147+
case <-timer.C:
1148+
closeFunc()
1149+
return false
1150+
}
11201151
}
11211152

11221153
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,3 +743,75 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
743743
wg.Wait()
744744
}
745745
}
746+
747+
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
748+
backingStorage := &dummyStorage{}
749+
cacher, _ := newTestCacher(backingStorage, 1000)
750+
defer cacher.Stop()
751+
752+
// Wait until cacher is initialized.
753+
cacher.ready.wait()
754+
755+
// Ensure there is some budget for slowing down processing.
756+
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
757+
758+
makePod := func(i int) *examplev1.Pod {
759+
return &examplev1.Pod{
760+
ObjectMeta: metav1.ObjectMeta{
761+
Name: fmt.Sprintf("pod-%d", 1000+i),
762+
Namespace: "ns",
763+
ResourceVersion: fmt.Sprintf("%d", 1000+i),
764+
},
765+
}
766+
}
767+
if err := cacher.watchCache.Add(makePod(0)); err != nil {
768+
t.Errorf("error: %v", err)
769+
}
770+
771+
totalPods := 50
772+
773+
// Create watcher that will be blocked.
774+
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
775+
if err != nil {
776+
t.Fatalf("Failed to create watch: %v", err)
777+
}
778+
defer w1.Stop()
779+
780+
// Create fast watcher and ensure it will get all objects.
781+
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
782+
if err != nil {
783+
t.Fatalf("Failed to create watch: %v", err)
784+
}
785+
defer w2.Stop()
786+
787+
// Now push a ton of object to cache.
788+
for i := 1; i < totalPods; i++ {
789+
cacher.watchCache.Add(makePod(i))
790+
}
791+
792+
shouldContinue := true
793+
eventsCount := 0
794+
for shouldContinue {
795+
select {
796+
case event, ok := <-w2.ResultChan():
797+
if !ok {
798+
shouldContinue = false
799+
break
800+
}
801+
// Ensure there is some budget for fast watcher after slower one is blocked.
802+
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
803+
if event.Type == watch.Added {
804+
eventsCount++
805+
if eventsCount == totalPods {
806+
shouldContinue = false
807+
}
808+
}
809+
case <-time.After(2 * time.Second):
810+
shouldContinue = false
811+
w2.Stop()
812+
}
813+
}
814+
if eventsCount != totalPods {
815+
t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount)
816+
}
817+
}

0 commit comments

Comments
 (0)