@@ -21,6 +21,7 @@ import (
2121 "fmt"
2222 "net/http"
2323 "sync"
24+ "sync/atomic"
2425
2526 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2627 apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -151,7 +152,13 @@ func withUnwrapping(apiResourceSchema *apisv1alpha1.APIResourceSchema, version s
151152 return nil , err
152153 }
153154
154- return newUnwrappingWatch (ctx , innerGVR , options , namespaced , genericapirequest .NamespaceValue (ctx ),
155+ innerGVK := schema.GroupVersionKind {
156+ Group : wrappedGVR .Group ,
157+ Version : wrappedGVR .Version ,
158+ Kind : apiResourceSchema .Spec .Names .Kind ,
159+ }
160+
161+ return newUnwrappingWatch (ctx , innerGVR , innerGVK , options , namespaced , genericapirequest .NamespaceValue (ctx ),
155162 globalKcpInformers .Cache ().V1alpha1 ().CachedObjects ().Cluster (parsedKey .CachedResourceCluster ).Informer (), syntheticClustersProvider (
156163 * genericapirequest .ClusterFrom (ctx ),
157164 cr .Status .IdentityHash ,
@@ -258,6 +265,7 @@ type unwrappingWatch struct {
258265func newUnwrappingWatch (
259266 ctx context.Context ,
260267 innerObjGVR schema.GroupVersionResource ,
268+ innerObjGVK schema.GroupVersionKind ,
261269 innerListOpts * metainternalversion.ListOptions ,
262270 namespaced bool ,
263271 namespace string ,
@@ -283,6 +291,11 @@ func newUnwrappingWatch(
283291 }
284292 }()
285293
294+ // Track the latest resource version seen during initial list replay.
295+ // Used to set the resourceVersion on the initial-events-end bookmark.
296+ var latestInitialRV atomic.Value
297+ latestInitialRV .Store ("0" )
298+
286299 label := labels .Everything ()
287300 if innerListOpts != nil && innerListOpts .LabelSelector != nil {
288301 label = innerListOpts .LabelSelector
@@ -345,6 +358,11 @@ func newUnwrappingWatch(
345358 }
346359
347360 if isInInitialList {
361+ // Track the latest resource version for the initial-events-end bookmark.
362+ if rv := innerObj .GetResourceVersion (); rv > latestInitialRV .Load ().(string ) {
363+ latestInitialRV .Store (rv )
364+ }
365+
348366 if innerObj .GetResourceVersion () <= innerListOpts .ResourceVersion {
349367 // This resource is older than the one we want to start from on isInInitial list replay.
350368 return
@@ -422,6 +440,28 @@ func newUnwrappingWatch(
422440 }
423441 w .handler = handler
424442
443+ // Send the initial-events-end bookmark after the informer has replayed all
444+ // existing items. Clients using WatchList (sendInitialEvents=true) expect a
445+ // BOOKMARK with the k8s.io/initial-events-end annotation to signal that the
446+ // initial state has been fully delivered.
447+ if innerListOpts != nil && innerListOpts .SendInitialEvents != nil && * innerListOpts .SendInitialEvents && innerListOpts .AllowWatchBookmarks {
448+ go func () {
449+ if ! clientgocache .WaitForCacheSync (ctx .Done (), handler .HasSynced ) {
450+ return
451+ }
452+ bookmark := & unstructured.Unstructured {}
453+ bookmark .SetGroupVersionKind (innerObjGVK )
454+ bookmark .SetResourceVersion (latestInitialRV .Load ().(string ))
455+ bookmark .SetAnnotations (map [string ]string {
456+ metav1 .InitialEventsAnnotationKey : "true" ,
457+ })
458+ w .safeWrite (watch.Event {
459+ Type : watch .Bookmark ,
460+ Object : bookmark ,
461+ })
462+ }()
463+ }
464+
425465 return w , nil
426466}
427467
0 commit comments