Skip to content

Commit 81ebfb3

Browse files
authored
Merge pull request kubernetes#127012 from Chaunceyctx/new-send-bookmark
send bookmark right now after sending all items in watchCache store
2 parents 9770283 + 7239202 commit 81ebfb3

File tree

6 files changed

+173
-10
lines changed

6 files changed

+173
-10
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,10 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
510510
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
511511
}
512512

513+
// send bookmark after sending all events in cacheInterval for watchlist request
514+
if cacheInterval.initialEventsEndBookmark != nil {
515+
c.sendWatchCacheEvent(cacheInterval.initialEventsEndBookmark)
516+
}
513517
c.process(ctx, resourceVersion)
514518
}
515519

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
653653
return newErrWatcher(err), nil
654654
}
655655

656+
c.setInitialEventsEndBookmarkIfRequested(cacheInterval, opts, c.watchCache.resourceVersion)
657+
656658
addedWatcher := false
657659
func() {
658660
c.Lock()
@@ -1448,6 +1450,26 @@ func (c *Cacher) Wait(ctx context.Context) error {
14481450
return c.ready.wait(ctx)
14491451
}
14501452

1453+
// setInitialEventsEndBookmarkIfRequested sets initialEventsEndBookmark field in watchCacheInterval for watchlist request
1454+
func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCacheInterval, opts storage.ListOptions, currentResourceVersion uint64) {
1455+
if opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks {
1456+
// We don't need to set the InitialEventsAnnotation for this bookmark event,
1457+
// because this will be automatically set during event conversion in cacheWatcher.convertToWatchEvent method
1458+
initialEventsEndBookmark := &watchCacheEvent{
1459+
Type: watch.Bookmark,
1460+
Object: c.newFunc(),
1461+
ResourceVersion: currentResourceVersion,
1462+
}
1463+
1464+
if err := c.versioner.UpdateObject(initialEventsEndBookmark.Object, initialEventsEndBookmark.ResourceVersion); err != nil {
1465+
klog.Errorf("failure to set resourceVersion to %d on initialEventsEndBookmark event %+v for watchlist request and wait for bookmark trigger to send", initialEventsEndBookmark.ResourceVersion, initialEventsEndBookmark.Object)
1466+
initialEventsEndBookmark = nil
1467+
}
1468+
1469+
cacheInterval.initialEventsEndBookmark = initialEventsEndBookmark
1470+
}
1471+
}
1472+
14511473
// errWatcher implements watch.Interface to return a single error
14521474
type errWatcher struct {
14531475
result chan watch.Event

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"k8s.io/apiserver/pkg/storage/cacher/metrics"
5050
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
5151
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
52+
storagetesting "k8s.io/apiserver/pkg/storage/testing"
5253
utilfeature "k8s.io/apiserver/pkg/util/feature"
5354
featuregatetesting "k8s.io/component-base/featuregate/testing"
5455
k8smetrics "k8s.io/component-base/metrics"
@@ -1171,6 +1172,106 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
11711172
}
11721173
}
11731174

1175+
func TestInitialEventsEndBookmark(t *testing.T) {
1176+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
1177+
forceRequestWatchProgressSupport(t)
1178+
1179+
backingStorage := &dummyStorage{}
1180+
cacher, _, err := newTestCacher(backingStorage)
1181+
if err != nil {
1182+
t.Fatalf("Couldn't create cacher: %v", err)
1183+
}
1184+
defer cacher.Stop()
1185+
1186+
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
1187+
if err := cacher.ready.wait(context.Background()); err != nil {
1188+
t.Fatalf("unexpected error waiting for the cache to be ready")
1189+
}
1190+
}
1191+
1192+
makePod := func(index uint64) *example.Pod {
1193+
return &example.Pod{
1194+
ObjectMeta: metav1.ObjectMeta{
1195+
Name: fmt.Sprintf("pod-%d", index),
1196+
Namespace: "ns",
1197+
ResourceVersion: fmt.Sprintf("%v", 100+index),
1198+
},
1199+
}
1200+
}
1201+
1202+
numberOfPods := 3
1203+
var expectedPodEvents []watch.Event
1204+
for i := 1; i <= numberOfPods; i++ {
1205+
pod := makePod(uint64(i))
1206+
if err := cacher.watchCache.Add(pod); err != nil {
1207+
t.Fatalf("failed to add a pod: %v", err)
1208+
}
1209+
expectedPodEvents = append(expectedPodEvents, watch.Event{Type: watch.Added, Object: pod})
1210+
}
1211+
var currentResourceVersion uint64 = 100 + 3
1212+
1213+
trueVal, falseVal := true, false
1214+
1215+
scenarios := []struct {
1216+
name string
1217+
allowWatchBookmarks bool
1218+
sendInitialEvents *bool
1219+
}{
1220+
{
1221+
name: "allowWatchBookmarks=false, sendInitialEvents=false",
1222+
allowWatchBookmarks: false,
1223+
sendInitialEvents: &falseVal,
1224+
},
1225+
{
1226+
name: "allowWatchBookmarks=false, sendInitialEvents=true",
1227+
allowWatchBookmarks: false,
1228+
sendInitialEvents: &trueVal,
1229+
},
1230+
{
1231+
name: "allowWatchBookmarks=true, sendInitialEvents=true",
1232+
allowWatchBookmarks: true,
1233+
sendInitialEvents: &trueVal,
1234+
},
1235+
{
1236+
name: "allowWatchBookmarks=true, sendInitialEvents=false",
1237+
allowWatchBookmarks: true,
1238+
sendInitialEvents: &falseVal,
1239+
},
1240+
{
1241+
name: "allowWatchBookmarks=false, sendInitialEvents=nil",
1242+
allowWatchBookmarks: true,
1243+
},
1244+
}
1245+
1246+
for _, scenario := range scenarios {
1247+
t.Run(scenario.name, func(t *testing.T) {
1248+
expectedWatchEvents := expectedPodEvents
1249+
if scenario.allowWatchBookmarks && scenario.sendInitialEvents != nil && *scenario.sendInitialEvents {
1250+
expectedWatchEvents = append(expectedWatchEvents, watch.Event{
1251+
Type: watch.Bookmark,
1252+
Object: &example.Pod{
1253+
ObjectMeta: metav1.ObjectMeta{
1254+
ResourceVersion: strconv.FormatUint(currentResourceVersion, 10),
1255+
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
1256+
},
1257+
},
1258+
})
1259+
}
1260+
1261+
pred := storage.Everything
1262+
pred.AllowWatchBookmarks = scenario.allowWatchBookmarks
1263+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
1264+
defer cancel()
1265+
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", SendInitialEvents: scenario.sendInitialEvents, Predicate: pred})
1266+
if err != nil {
1267+
t.Fatalf("Failed to create watch: %v", err)
1268+
}
1269+
storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents)
1270+
storagetesting.TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
1271+
})
1272+
}
1273+
}
1274+
11741275
func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
11751276
backingStorage := &dummyStorage{}
11761277
cacher, _, err := newTestCacher(backingStorage)

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ type watchCacheInterval struct {
101101
// Given that indexer and indexValidator only read state, if
102102
// possible, Locker obtained through RLocker() is provided.
103103
lock sync.Locker
104+
105+
// initialEventsEndBookmark will be sent after sending all events in cacheInterval
106+
initialEventsEndBookmark *watchCacheEvent
104107
}
105108

106109
type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)

staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,36 @@ func testCheckResultFunc(t *testing.T, w watch.Interface, check func(actualEvent
169169
}
170170
}
171171

172+
func testCheckResultWithIgnoreFunc(t *testing.T, w watch.Interface, expectedEvents []watch.Event, ignore func(watch.Event) bool) {
173+
checkIndex := 0
174+
for {
175+
select {
176+
case event := <-w.ResultChan():
177+
obj := event.Object
178+
if co, ok := obj.(runtime.CacheableObject); ok {
179+
event.Object = co.GetObject()
180+
}
181+
if ignore != nil && ignore(event) {
182+
continue
183+
}
184+
if checkIndex < len(expectedEvents) {
185+
expectNoDiff(t, "incorrect event", expectedEvents[checkIndex], event)
186+
checkIndex++
187+
} else {
188+
t.Fatalf("cannot receive correct event, expect no event, but get a event: %+v", event)
189+
}
190+
case <-time.After(100 * time.Millisecond):
191+
// wait 100ms forcibly in order to receive watchEvents including bookmark event.
192+
// we cannot guarantee that we will receive all bookmark events within 100ms,
193+
// but too large timeout value will lead to exceed the timeout of package test.
194+
if checkIndex < len(expectedEvents) {
195+
t.Fatalf("cannot receive enough events within specific time, rest expected events: %+v", expectedEvents[checkIndex:])
196+
}
197+
return
198+
}
199+
}
200+
}
201+
172202
func testCheckStop(t *testing.T, w watch.Interface) {
173203
select {
174204
case e, ok := <-w.ResultChan():
@@ -187,16 +217,18 @@ func testCheckStop(t *testing.T, w watch.Interface) {
187217
}
188218
}
189219

190-
func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
220+
func TestCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
191221
for _, expectedEvent := range expectedEvents {
192222
testCheckResult(t, w, expectedEvent)
193223
}
194224
}
195225

196-
func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
226+
func TestCheckNoMoreResultsWithIgnoreFunc(t *testing.T, w watch.Interface, ignore func(watch.Event) bool) {
197227
select {
198228
case e := <-w.ResultChan():
199-
t.Errorf("Unexpected: %#v event received, expected no events", e)
229+
if ignore == nil || !ignore(e) {
230+
t.Errorf("Unexpected: %#v event received, expected no events", e)
231+
}
200232
// We consciously make the timeout short here to speed up tests.
201233
case <-time.After(100 * time.Millisecond):
202234
return

staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,7 +1493,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
14931493
defer w.Stop()
14941494

14951495
// make sure we only get initial events
1496-
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
1496+
TestCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
14971497

14981498
// make sure that the actual bookmark has at least RV >= to the expected one
14991499
if scenario.expectedInitialEventsBookmarkWithMinimalRV != nil {
@@ -1527,8 +1527,9 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
15271527
require.NoError(t, err, "failed to add a pod: %v")
15281528
createdPods = append(createdPods, out)
15291529
}
1530-
testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods))
1531-
testCheckNoMoreResults(t, w)
1530+
ignoreEventsFn := func(event watch.Event) bool { return event.Type == watch.Bookmark }
1531+
testCheckResultWithIgnoreFunc(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods), ignoreEventsFn)
1532+
TestCheckNoMoreResultsWithIgnoreFunc(t, w, ignoreEventsFn)
15321533
})
15331534
}
15341535
}
@@ -1582,8 +1583,8 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st
15821583

15831584
// make sure we only get initial events from the first ns
15841585
// followed by the bookmark with the global RV
1585-
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
1586-
testCheckNoMoreResults(t, w)
1586+
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
1587+
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
15871588
}
15881589

15891590
func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) {
@@ -1637,8 +1638,8 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In
16371638

16381639
// make sure we only get a single pod matching the field selector
16391640
// followed by the bookmark with the global RV
1640-
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
1641-
testCheckNoMoreResults(t, w)
1641+
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
1642+
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
16421643
}
16431644

16441645
func makePod(namePrefix string) *example.Pod {

0 commit comments

Comments
 (0)