Skip to content

Commit c9d6fd9

Browse files
authored
Merge pull request kubernetes#127500 from p0lyn0mial/upstream-assign-rv-to-watchCacheInterval
cacher: prevents sending events with ResourceVersion < RequiredResourceVersion
2 parents 15d08bf + e7e2123 commit c9d6fd9

File tree

5 files changed

+40
-11
lines changed

5 files changed

+40
-11
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,13 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
454454
const initProcessThreshold = 500 * time.Millisecond
455455
startTime := time.Now()
456456

457+
// cacheInterval may be created from a version being more fresh than requested
458+
// (e.g. for NotOlderThan semantic). In such a case, we need to prevent watch event
459+
// with lower resourceVersion from being delivered to ensure watch contract.
460+
if cacheInterval.resourceVersion > resourceVersion {
461+
resourceVersion = cacheInterval.resourceVersion
462+
}
463+
457464
initEventCount := 0
458465
for {
459466
event, err := cacheInterval.Next()

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
746746
indexerFunc := func(i int) *watchCacheEvent {
747747
return w.cache[i%w.capacity]
748748
}
749-
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker())
749+
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker())
750750
return ci, nil
751751
}
752752

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ type watchCacheInterval struct {
9191
// lock on each invocation of Next().
9292
buffer *watchCacheIntervalBuffer
9393

94+
// resourceVersion is the resourceVersion from which
95+
// the interval was constructed.
96+
resourceVersion uint64
97+
9498
// lock effectively protects access to the underlying source
9599
// of events through - indexer and indexValidator.
96100
//
@@ -103,14 +107,15 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
103107
type indexerFunc func(int) *watchCacheEvent
104108
type indexValidator func(int) bool
105109

106-
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval {
110+
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval {
107111
return &watchCacheInterval{
108-
startIndex: startIndex,
109-
endIndex: endIndex,
110-
indexer: indexer,
111-
indexValidator: indexValidator,
112-
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
113-
lock: locker,
112+
startIndex: startIndex,
113+
endIndex: endIndex,
114+
indexer: indexer,
115+
indexValidator: indexValidator,
116+
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
117+
resourceVersion: resourceVersion,
118+
lock: locker,
114119
}
115120
}
116121

@@ -172,8 +177,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt
172177
ci := &watchCacheInterval{
173178
startIndex: 0,
174179
// Simulate that we already have all the events we're looking for.
175-
endIndex: 0,
176-
buffer: buffer,
180+
endIndex: 0,
181+
buffer: buffer,
182+
resourceVersion: resourceVersion,
177183
}
178184

179185
return ci, nil

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval {
4141
}
4242
indexValidator := func(_ int) bool { return true }
4343

44-
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker)
44+
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker)
4545
}
4646

4747
func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer {
@@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
300300
wc.endIndex,
301301
indexerFunc,
302302
wc.isIndexValidLocked,
303+
wc.resourceVersion,
303304
&wc.RWMutex,
304305
)
305306

staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,21 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
14601460
createdPods = append(createdPods, out)
14611461
}
14621462

1463+
if len(createdPods) > 0 {
1464+
// this list call ensures that the cache has seen the created pods.
1465+
// this makes the watch request below deterministic.
1466+
listObject := &example.PodList{}
1467+
opts := storage.ListOptions{
1468+
Predicate: storage.Everything,
1469+
Recursive: true,
1470+
ResourceVersion: createdPods[len(createdPods)-1].ResourceVersion,
1471+
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
1472+
}
1473+
err := store.GetList(ctx, fmt.Sprintf("/pods/%s", ns), opts, listObject)
1474+
require.NoError(t, err)
1475+
require.Len(t, listObject.Items, len(createdPods))
1476+
}
1477+
14631478
if scenario.useCurrentRV {
14641479
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(ctx, store, func() runtime.Object { return &example.PodList{} }, "/pods", "")
14651480
require.NoError(t, err)

0 commit comments

Comments
 (0)