Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion pkg/virtual/replication/builder/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"sync"
"sync/atomic"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -151,7 +152,13 @@ func withUnwrapping(apiResourceSchema *apisv1alpha1.APIResourceSchema, version s
return nil, err
}

return newUnwrappingWatch(ctx, innerGVR, options, namespaced, genericapirequest.NamespaceValue(ctx),
innerGVK := schema.GroupVersionKind{
Group: wrappedGVR.Group,
Version: wrappedGVR.Version,
Kind: apiResourceSchema.Spec.Names.Kind,
}

return newUnwrappingWatch(ctx, innerGVR, innerGVK, options, namespaced, genericapirequest.NamespaceValue(ctx),
globalKcpInformers.Cache().V1alpha1().CachedObjects().Cluster(parsedKey.CachedResourceCluster).Informer(), syntheticClustersProvider(
*genericapirequest.ClusterFrom(ctx),
cr.Status.IdentityHash,
Expand Down Expand Up @@ -258,6 +265,7 @@ type unwrappingWatch struct {
func newUnwrappingWatch(
ctx context.Context,
innerObjGVR schema.GroupVersionResource,
innerObjGVK schema.GroupVersionKind,
innerListOpts *metainternalversion.ListOptions,
namespaced bool,
namespace string,
Expand All @@ -283,6 +291,11 @@ func newUnwrappingWatch(
}
}()

// Track the latest resource version seen during initial list replay.
// Used to set the resourceVersion on the initial-events-end bookmark.
var latestInitialRV atomic.Value
latestInitialRV.Store("0")

label := labels.Everything()
if innerListOpts != nil && innerListOpts.LabelSelector != nil {
label = innerListOpts.LabelSelector
Expand Down Expand Up @@ -345,6 +358,11 @@ func newUnwrappingWatch(
}

if isInInitialList {
// Track the latest resource version for the initial-events-end bookmark.
if rv := innerObj.GetResourceVersion(); rv > latestInitialRV.Load().(string) {
latestInitialRV.Store(rv)
}

if innerObj.GetResourceVersion() <= innerListOpts.ResourceVersion {
// This resource is older than the one we want to start from on isInInitial list replay.
return
Expand Down Expand Up @@ -422,6 +440,28 @@ func newUnwrappingWatch(
}
w.handler = handler

// Send the initial-events-end bookmark after the informer has replayed all
// existing items. Clients using WatchList (sendInitialEvents=true) expect a
// BOOKMARK with the k8s.io/initial-events-end annotation to signal that the
// initial state has been fully delivered.
if innerListOpts != nil && innerListOpts.SendInitialEvents != nil && *innerListOpts.SendInitialEvents && innerListOpts.AllowWatchBookmarks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if innerListOpts != nil && innerListOpts.SendInitialEvents != nil && *innerListOpts.SendInitialEvents && innerListOpts.AllowWatchBookmarks {
if ptr.Deref(innerListOpts.SendInitialEvents, false) && ptr.Deref(&innerListOpts.AllowWatchBookmarks, fasle) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make it read nicer. Otherwise lgtm, thanks for the PR!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add to follow-up por. will tag to get this ball rolling

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjudeikis-bot Create a follow-up PR implementing this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjudeikis-bot do the needed ^ create a PR for this please

go func() {
if !clientgocache.WaitForCacheSync(ctx.Done(), handler.HasSynced) {
return
}
bookmark := &unstructured.Unstructured{}
bookmark.SetGroupVersionKind(innerObjGVK)
bookmark.SetResourceVersion(latestInitialRV.Load().(string))
bookmark.SetAnnotations(map[string]string{
metav1.InitialEventsAnnotationKey: "true",
})
w.safeWrite(watch.Event{
Type: watch.Bookmark,
Object: bookmark,
})
}()
}

return w, nil
}

Expand Down