Skip to content

Commit bd12b01

Browse files
authored
Merge pull request kubernetes#76903 from hormes/refactor_watch_cache
add object type to cacheWatcher
2 parents baa8b39 + 6c6d472 commit bd12b01

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
443443
// given that memory allocation may trigger GC and block the thread.
444444
// Also note that emptyFunc is a placeholder, until we will be able
445445
// to compute watcher.forget function (which has to happen under lock).
446-
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks)
446+
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
447447

448448
// We explicitly use thread unsafe version and do locking ourself to ensure that
449449
// no new events will be processed in the meantime. The watchCache will be unlocked
@@ -1033,9 +1033,11 @@ type cacheWatcher struct {
10331033
// save it here to send bookmark events before that.
10341034
deadline time.Time
10351035
allowWatchBookmarks bool
1036+
// Object type of the cache watcher interests
1037+
objectType reflect.Type
10361038
}
10371039

1038-
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool) *cacheWatcher {
1040+
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type) *cacheWatcher {
10391041
return &cacheWatcher{
10401042
input: make(chan *watchCacheEvent, chanSize),
10411043
result: make(chan watch.Event, chanSize),
@@ -1046,6 +1048,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve
10461048
versioner: versioner,
10471049
deadline: deadline,
10481050
allowWatchBookmarks: allowWatchBookmarks,
1051+
objectType: objectType,
10491052
}
10501053
}
10511054

@@ -1208,16 +1211,12 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
12081211
for _, event := range initEvents {
12091212
c.sendWatchCacheEvent(event)
12101213
}
1214+
objType := c.objectType.String()
12111215
if len(initEvents) > 0 {
1212-
objType := reflect.TypeOf(initEvents[0].Object).String()
12131216
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
12141217
}
12151218
processingTime := time.Since(startTime)
12161219
if processingTime > initProcessThreshold {
1217-
objType := "<null>"
1218-
if len(initEvents) > 0 {
1219-
objType = reflect.TypeOf(initEvents[0].Object).String()
1220-
}
12211220
klog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
12221221
}
12231222

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ import (
4545
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
4646
)
4747

48+
var (
49+
objectType = reflect.TypeOf(&v1.Pod{})
50+
)
51+
4852
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
4953
// the writes to cacheWatcher.result channel is blocked.
5054
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
@@ -67,7 +71,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
6771
}
6872
// set the size of the buffer of w.result to 0, so that the writes to
6973
// w.result is blocked.
70-
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
74+
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
7175
go w.process(context.Background(), initEvents, 0)
7276
w.Stop()
7377
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
@@ -187,7 +191,7 @@ TestCase:
187191
testCase.events[j].ResourceVersion = uint64(j) + 1
188192
}
189193

190-
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
194+
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
191195
go w.process(context.Background(), testCase.events, 0)
192196

193197
ch := w.ResultChan()
@@ -466,7 +470,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
466470
// timeout to zero and run the Stop goroutine concurrently.
467471
// May sure that the watch will not be blocked on Stop.
468472
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
469-
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
473+
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
470474
go w.Stop()
471475
select {
472476
case <-done:
@@ -478,7 +482,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
478482
deadline := time.Now().Add(time.Hour)
479483
// After that, verifies the cacheWatcher.process goroutine works correctly.
480484
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
481-
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false)
485+
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType)
482486
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
483487
ctx, _ := context.WithDeadline(context.Background(), deadline)
484488
go w.process(ctx, nil, 0)
@@ -498,7 +502,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
498502
forget := func() {}
499503

500504
newWatcher := func(deadline time.Time) *cacheWatcher {
501-
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true)
505+
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType)
502506
}
503507

504508
clock := clock.NewFakeClock(time.Now())

0 commit comments

Comments
 (0)