diff --git a/pkg/virtual/replication/builder/unwrap.go b/pkg/virtual/replication/builder/unwrap.go index 45f51156047..88d474b1a26 100644 --- a/pkg/virtual/replication/builder/unwrap.go +++ b/pkg/virtual/replication/builder/unwrap.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "sync" + "sync/atomic" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -151,7 +152,13 @@ func withUnwrapping(apiResourceSchema *apisv1alpha1.APIResourceSchema, version s return nil, err } - return newUnwrappingWatch(ctx, innerGVR, options, namespaced, genericapirequest.NamespaceValue(ctx), + innerGVK := schema.GroupVersionKind{ + Group: wrappedGVR.Group, + Version: wrappedGVR.Version, + Kind: apiResourceSchema.Spec.Names.Kind, + } + + return newUnwrappingWatch(ctx, innerGVR, innerGVK, options, namespaced, genericapirequest.NamespaceValue(ctx), globalKcpInformers.Cache().V1alpha1().CachedObjects().Cluster(parsedKey.CachedResourceCluster).Informer(), syntheticClustersProvider( *genericapirequest.ClusterFrom(ctx), cr.Status.IdentityHash, @@ -258,6 +265,7 @@ type unwrappingWatch struct { func newUnwrappingWatch( ctx context.Context, innerObjGVR schema.GroupVersionResource, + innerObjGVK schema.GroupVersionKind, innerListOpts *metainternalversion.ListOptions, namespaced bool, namespace string, @@ -283,6 +291,11 @@ func newUnwrappingWatch( } }() + // Track the latest resource version seen during initial list replay. + // Used to set the resourceVersion on the initial-events-end bookmark. + var latestInitialRV atomic.Value + latestInitialRV.Store("0") + label := labels.Everything() if innerListOpts != nil && innerListOpts.LabelSelector != nil { label = innerListOpts.LabelSelector @@ -345,6 +358,11 @@ func newUnwrappingWatch( } if isInInitialList { + // Track the latest resource version for the initial-events-end bookmark. + if rv := innerObj.GetResourceVersion(); rv > latestInitialRV.Load().(string) { + latestInitialRV.Store(rv) + } + if innerObj.GetResourceVersion() <= innerListOpts.ResourceVersion { // This resource is older than the one we want to start from on isInInitial list replay. return @@ -422,6 +440,28 @@ func newUnwrappingWatch( } w.handler = handler + // Send the initial-events-end bookmark after the informer has replayed all + // existing items. Clients using WatchList (sendInitialEvents=true) expect a + // BOOKMARK with the k8s.io/initial-events-end annotation to signal that the + // initial state has been fully delivered. + if innerListOpts != nil && innerListOpts.SendInitialEvents != nil && *innerListOpts.SendInitialEvents && innerListOpts.AllowWatchBookmarks { + go func() { + if !clientgocache.WaitForCacheSync(ctx.Done(), handler.HasSynced) { + return + } + bookmark := &unstructured.Unstructured{} + bookmark.SetGroupVersionKind(innerObjGVK) + bookmark.SetResourceVersion(latestInitialRV.Load().(string)) + bookmark.SetAnnotations(map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }) + w.safeWrite(watch.Event{ + Type: watch.Bookmark, + Object: bookmark, + }) + }() + } + return w, nil }