Skip to content

Commit a79c711

Browse files
authored
Merge pull request kubernetes#91417 from wojtek-t/fix_watch_race
Fix the bug of watches being accepted instead of returning "too old resource version" in watchcache
2 parents f91c1ef + ef1e5b6 commit a79c711

File tree

2 files changed

+40
-11
lines changed

2 files changed

+40
-11
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -534,19 +534,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
534534
size := w.endIndex - w.startIndex
535535
var oldest uint64
536536
switch {
537-
case size >= w.capacity:
538-
// Once the watch event buffer is full, the oldest watch event we can deliver
539-
// is the first one in the buffer.
540-
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
541-
case w.listResourceVersion > 0:
542-
// If the watch event buffer isn't full, the oldest watch event we can deliver
543-
// is one greater than the resource version of the last full list.
537+
case w.listResourceVersion > 0 && w.startIndex == 0:
538+
// If no event was removed from the buffer since last relist, the oldest watch
539+
// event we can deliver is one greater than the resource version of the list.
544540
oldest = w.listResourceVersion + 1
545541
case size > 0:
546-
// If we've never completed a list, use the resourceVersion of the oldest event
547-
// in the buffer.
548-
// This should only happen in unit tests that populate the buffer without
549-
// performing list/replace operations.
542+
// If the previous condition is not satisfied: either some event was already
543+
// removed from the buffer or we've never completed a list (the latter can
544+
// only happen in unit tests that populate the buffer without performing
545+
// list/replace operations), the oldest watch event we can deliver is the first
546+
// one in the buffer.
550547
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
551548
default:
552549
return nil, fmt.Errorf("watch cache isn't correctly initialized")

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,38 @@ func checkCacheElements(cache *watchCache) bool {
782782
return true
783783
}
784784

785+
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
786+
store := newTestWatchCache(2, &cache.Indexers{})
787+
788+
now := store.clock.Now()
789+
addEvent := func(key string, rv uint64, t time.Time) {
790+
event := &watchCacheEvent{
791+
Key: key,
792+
ResourceVersion: rv,
793+
RecordTime: t,
794+
}
795+
store.updateCache(event)
796+
}
797+
798+
// Initial LIST comes from the moment of RV=10.
799+
store.Replace(nil, "10")
800+
801+
addEvent("key1", 20, now)
802+
803+
// Force "key1" to rotate our of cache.
804+
later := now.Add(2 * eventFreshDuration)
805+
addEvent("key2", 30, later)
806+
addEvent("key3", 40, later)
807+
808+
// Force cache resize.
809+
addEvent("key4", 50, later.Add(time.Second))
810+
811+
_, err := store.GetAllEventsSince(15)
812+
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
813+
t.Errorf("unexpected error: %v", err)
814+
}
815+
}
816+
785817
func BenchmarkWatchCache_updateCache(b *testing.B) {
786818
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
787819
store.cache = store.cache[:0]

0 commit comments

Comments
 (0)