Skip to content

Commit e6844c2

Browse files
authored
Merge pull request kubernetes#76002 from BenTheElder/revert-75835
Revert "add timeout suuport for watch"
2 parents de2bfc7 + 30a56ba commit e6844c2

File tree

3 files changed

+12
-59
lines changed

3 files changed

+12
-59
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,6 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
249249
}
250250
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
251251

252-
ctx, cancel := context.WithTimeout(ctx, timeout)
253-
defer cancel()
254252
watcher, err := rw.Watch(ctx, &opts)
255253
if err != nil {
256254
scope.err(err, w, req)

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package cacher
1919
import (
2020
"context"
2121
"fmt"
22-
"math"
2322
"net/http"
2423
"reflect"
2524
"sync"
@@ -365,16 +364,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
365364
chanSize = 1000
366365
}
367366

368-
// Determine watch timeout
369-
timeout := time.Duration(math.MaxInt64)
370-
if deadline, ok := ctx.Deadline(); ok {
371-
timeout = deadline.Sub(time.Now())
372-
}
373367
// Create a watcher here to reduce memory allocations under lock,
374368
// given that memory allocation may trigger GC and block the thread.
375369
// Also note that emptyFunc is a placeholder, until we will be able
376370
// to compute watcher.forget function (which has to happen under lock).
377-
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, timeout)
371+
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner)
378372

379373
// We explicitly use thread unsafe version and do locking ourself to ensure that
380374
// no new events will be processed in the meantime. The watchCache will be unlocked
@@ -407,7 +401,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
407401
c.watcherIdx++
408402
}()
409403

410-
go watcher.process(ctx, initEvents, watchRV)
404+
go watcher.process(initEvents, watchRV)
411405
return watcher, nil
412406
}
413407

@@ -894,34 +888,9 @@ type cacheWatcher struct {
894888
stopped bool
895889
forget func()
896890
versioner storage.Versioner
897-
timer *time.Timer
898-
}
899-
900-
var timerPool sync.Pool
901-
902-
func newTimer(d time.Duration) *time.Timer {
903-
t, ok := timerPool.Get().(*time.Timer)
904-
if ok {
905-
t.Reset(d)
906-
} else {
907-
t = time.NewTimer(d)
908-
}
909-
return t
910891
}
911892

912-
func freeTimer(timer *time.Timer) {
913-
if !timer.Stop() {
914-
// Consume triggered (but not yet received) timer event
915-
// so that future reuse does not get a spurious timeout.
916-
select {
917-
case <-timer.C:
918-
default:
919-
}
920-
}
921-
timerPool.Put(timer)
922-
}
923-
924-
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, timeout time.Duration) *cacheWatcher {
893+
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
925894
return &cacheWatcher{
926895
input: make(chan *watchCacheEvent, chanSize),
927896
result: make(chan watch.Event, chanSize),
@@ -930,7 +899,6 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve
930899
stopped: false,
931900
forget: forget,
932901
versioner: versioner,
933-
timer: newTimer(timeout),
934902
}
935903
}
936904

@@ -951,7 +919,6 @@ func (c *cacheWatcher) stop() {
951919
c.stopped = true
952920
close(c.done)
953921
close(c.input)
954-
freeTimer(c.timer)
955922
}
956923
}
957924

@@ -1040,7 +1007,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
10401007
}
10411008
}
10421009

1043-
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
1010+
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
10441011
defer utilruntime.HandleCrash()
10451012

10461013
// Check how long we are processing initEvents.
@@ -1076,20 +1043,10 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
10761043

10771044
defer close(c.result)
10781045
defer c.Stop()
1079-
for {
1080-
select {
1081-
case event, ok := <-c.input:
1082-
if !ok {
1083-
return
1084-
}
1085-
// only send events newer than resourceVersion
1086-
if event.ResourceVersion > resourceVersion {
1087-
c.sendWatchCacheEvent(event)
1088-
}
1089-
case <-ctx.Done():
1090-
return
1091-
case <-c.timer.C:
1092-
return
1046+
for event := range c.input {
1047+
// only send events newer than resourceVersion
1048+
if event.ResourceVersion > resourceVersion {
1049+
c.sendWatchCacheEvent(event)
10931050
}
10941051
}
10951052
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package cacher
1919
import (
2020
"context"
2121
"fmt"
22-
"math"
2322
"reflect"
2423
"strconv"
2524
"sync"
@@ -64,8 +63,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
6463
}
6564
// set the size of the buffer of w.result to 0, so that the writes to
6665
// w.result is blocked.
67-
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64))
68-
go w.process(context.Background(), initEvents, 0)
66+
w = newCacheWatcher(0, filter, forget, testVersioner{})
67+
go w.process(initEvents, 0)
6968
w.Stop()
7069
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
7170
lock.RLock()
@@ -183,9 +182,8 @@ TestCase:
183182
for j := range testCase.events {
184183
testCase.events[j].ResourceVersion = uint64(j) + 1
185184
}
186-
187-
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64))
188-
go w.process(context.Background(), testCase.events, 0)
185+
w := newCacheWatcher(0, filter, forget, testVersioner{})
186+
go w.process(testCase.events, 0)
189187
ch := w.ResultChan()
190188
for j, event := range testCase.expected {
191189
e := <-ch

0 commit comments

Comments
 (0)