Skip to content

Commit 32e8b9b

Browse files
authored
Merge pull request #3875 from maxpain/fix/cached-resource-initial-events-bookmark
fix: send initial-events-end bookmark for CachedResource virtual storage
2 parents 4e73ead + 738d519 commit 32e8b9b

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

pkg/virtual/replication/builder/unwrap.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"net/http"
2323
"sync"
24+
"sync/atomic"
2425

2526
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2627
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -151,7 +152,13 @@ func withUnwrapping(apiResourceSchema *apisv1alpha1.APIResourceSchema, version s
151152
return nil, err
152153
}
153154

154-
return newUnwrappingWatch(ctx, innerGVR, options, namespaced, genericapirequest.NamespaceValue(ctx),
155+
innerGVK := schema.GroupVersionKind{
156+
Group: wrappedGVR.Group,
157+
Version: wrappedGVR.Version,
158+
Kind: apiResourceSchema.Spec.Names.Kind,
159+
}
160+
161+
return newUnwrappingWatch(ctx, innerGVR, innerGVK, options, namespaced, genericapirequest.NamespaceValue(ctx),
155162
globalKcpInformers.Cache().V1alpha1().CachedObjects().Cluster(parsedKey.CachedResourceCluster).Informer(), syntheticClustersProvider(
156163
*genericapirequest.ClusterFrom(ctx),
157164
cr.Status.IdentityHash,
@@ -258,6 +265,7 @@ type unwrappingWatch struct {
258265
func newUnwrappingWatch(
259266
ctx context.Context,
260267
innerObjGVR schema.GroupVersionResource,
268+
innerObjGVK schema.GroupVersionKind,
261269
innerListOpts *metainternalversion.ListOptions,
262270
namespaced bool,
263271
namespace string,
@@ -283,6 +291,11 @@ func newUnwrappingWatch(
283291
}
284292
}()
285293

294+
// Track the latest resource version seen during initial list replay.
295+
// Used to set the resourceVersion on the initial-events-end bookmark.
296+
var latestInitialRV atomic.Value
297+
latestInitialRV.Store("0")
298+
286299
label := labels.Everything()
287300
if innerListOpts != nil && innerListOpts.LabelSelector != nil {
288301
label = innerListOpts.LabelSelector
@@ -345,6 +358,11 @@ func newUnwrappingWatch(
345358
}
346359

347360
if isInInitialList {
361+
// Track the latest resource version for the initial-events-end bookmark.
362+
if rv := innerObj.GetResourceVersion(); rv > latestInitialRV.Load().(string) {
363+
latestInitialRV.Store(rv)
364+
}
365+
348366
if innerObj.GetResourceVersion() <= innerListOpts.ResourceVersion {
349367
// This resource is older than the one we want to start from on isInInitial list replay.
350368
return
@@ -422,6 +440,28 @@ func newUnwrappingWatch(
422440
}
423441
w.handler = handler
424442

443+
// Send the initial-events-end bookmark after the informer has replayed all
444+
// existing items. Clients using WatchList (sendInitialEvents=true) expect a
445+
// BOOKMARK with the k8s.io/initial-events-end annotation to signal that the
446+
// initial state has been fully delivered.
447+
if innerListOpts != nil && innerListOpts.SendInitialEvents != nil && *innerListOpts.SendInitialEvents && innerListOpts.AllowWatchBookmarks {
448+
go func() {
449+
if !clientgocache.WaitForCacheSync(ctx.Done(), handler.HasSynced) {
450+
return
451+
}
452+
bookmark := &unstructured.Unstructured{}
453+
bookmark.SetGroupVersionKind(innerObjGVK)
454+
bookmark.SetResourceVersion(latestInitialRV.Load().(string))
455+
bookmark.SetAnnotations(map[string]string{
456+
metav1.InitialEventsAnnotationKey: "true",
457+
})
458+
w.safeWrite(watch.Event{
459+
Type: watch.Bookmark,
460+
Object: bookmark,
461+
})
462+
}()
463+
}
464+
425465
return w, nil
426466
}
427467

0 commit comments

Comments
 (0)