Skip to content

Commit f87e4a1

Browse files
committed
storage/cacher/cache_watcher: add RV to watchCacheInterval
1 parent 767d28d commit f87e4a1

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
746746
indexerFunc := func(i int) *watchCacheEvent {
747747
return w.cache[i%w.capacity]
748748
}
749-
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker())
749+
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker())
750750
return ci, nil
751751
}
752752

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ type watchCacheInterval struct {
9191
// lock on each invocation of Next().
9292
buffer *watchCacheIntervalBuffer
9393

94+
// resourceVersion is the resourceVersion from which
95+
// the interval was constructed.
96+
resourceVersion uint64
97+
9498
// lock effectively protects access to the underlying source
9599
// of events through - indexer and indexValidator.
96100
//
@@ -103,14 +107,15 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
103107
type indexerFunc func(int) *watchCacheEvent
104108
type indexValidator func(int) bool
105109

106-
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval {
110+
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval {
107111
return &watchCacheInterval{
108-
startIndex: startIndex,
109-
endIndex: endIndex,
110-
indexer: indexer,
111-
indexValidator: indexValidator,
112-
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
113-
lock: locker,
112+
startIndex: startIndex,
113+
endIndex: endIndex,
114+
indexer: indexer,
115+
indexValidator: indexValidator,
116+
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
117+
resourceVersion: resourceVersion,
118+
lock: locker,
114119
}
115120
}
116121

@@ -172,8 +177,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt
172177
ci := &watchCacheInterval{
173178
startIndex: 0,
174179
// Simulate that we already have all the events we're looking for.
175-
endIndex: 0,
176-
buffer: buffer,
180+
endIndex: 0,
181+
buffer: buffer,
182+
resourceVersion: resourceVersion,
177183
}
178184

179185
return ci, nil

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval {
4141
}
4242
indexValidator := func(_ int) bool { return true }
4343

44-
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker)
44+
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker)
4545
}
4646

4747
func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer {
@@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
300300
wc.endIndex,
301301
indexerFunc,
302302
wc.isIndexValidLocked,
303+
wc.resourceVersion,
303304
&wc.RWMutex,
304305
)
305306

0 commit comments

Comments
 (0)