Skip to content

Commit 41fb874

Browse files
committed
CachedResources: replicate full objects
DDSIF, the informer for retrieving local objects, yields only PartialMetadataObjects. This commit adds a live GET call to retrieve the full object for replication. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent 5a74717 commit 41fb874

File tree

6 files changed

+127
-119
lines changed

6 files changed

+127
-119
lines changed

pkg/reconciler/cache/cachedresources/cachedresources_controller.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ import (
4646
cachev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/cache/v1alpha1"
4747
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
4848
cachev1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/cache/v1alpha1"
49-
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
50-
cacheinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/cache/v1alpha1"
49+
cachev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/cache/v1alpha1"
5150
cachev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/cache/v1alpha1"
5251
)
5352

@@ -68,7 +67,6 @@ func NewController(
6867
kcpClusterClient kcpclientset.ClusterInterface,
6968
kcpCacheClient kcpclientset.ClusterInterface,
7069
dynamicClient kcpdynamic.ClusterInterface,
71-
cacheDynamicClient kcpdynamic.ClusterInterface,
7270

7371
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
7472
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
@@ -77,10 +75,10 @@ func NewController(
7775
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper,
7876

7977
discoveringDynamicKcpInformers *informer.DiscoveringDynamicSharedInformerFactory,
80-
cacheKcpInformers kcpinformers.SharedInformerFactory,
8178

82-
cachedResourceInformer cacheinformers.CachedResourceClusterInformer,
83-
cachedResourceEndpointSliceInformer cacheinformers.CachedResourceEndpointSliceClusterInformer,
79+
cachedObjectsInformer cachev1alpha1informers.CachedObjectClusterInformer,
80+
cachedResourceInformer cachev1alpha1informers.CachedResourceClusterInformer,
81+
cachedResourceEndpointSliceInformer cachev1alpha1informers.CachedResourceEndpointSliceClusterInformer,
8482
) (*Controller, error) {
8583
c := &Controller{
8684
shardName: shardName,
@@ -93,13 +91,12 @@ func NewController(
9391
kcpClient: kcpClusterClient,
9492
kcpCacheClient: kcpCacheClient,
9593

96-
dynamicClient: dynamicClient,
97-
cacheDynamicClient: cacheDynamicClient,
94+
dynamicClient: dynamicClient,
9895

9996
dynRESTMapper: dynRESTMapper,
10097

10198
discoveringDynamicKcpInformers: discoveringDynamicKcpInformers,
102-
cacheKcpInformers: cacheKcpInformers,
99+
cachedObjectsInformer: cachedObjectsInformer,
103100

104101
CachedResourceLister: cachedResourceInformer.Lister(),
105102
CachedResourceIndexer: cachedResourceInformer.Informer().GetIndexer(),
@@ -165,18 +162,17 @@ type Controller struct {
165162
kcpClient kcpclientset.ClusterInterface
166163
kcpCacheClient kcpclientset.ClusterInterface
167164

168-
dynamicClient kcpdynamic.ClusterInterface
169-
cacheDynamicClient kcpdynamic.ClusterInterface
165+
dynamicClient kcpdynamic.ClusterInterface
170166

171167
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
172168

173169
discoveringDynamicKcpInformers *informer.DiscoveringDynamicSharedInformerFactory
174-
cacheKcpInformers kcpinformers.SharedInformerFactory
170+
cachedObjectsInformer cachev1alpha1informers.CachedObjectClusterInformer
175171

176172
CachedResourceIndexer cache.Indexer
177173
CachedResourceLister cachev1alpha1listers.CachedResourceClusterLister
178174

179-
CachedResourceEndpointSliceInformer cacheinformers.CachedResourceEndpointSliceClusterInformer
175+
CachedResourceEndpointSliceInformer cachev1alpha1informers.CachedResourceEndpointSliceClusterInformer
180176

181177
commit func(ctx context.Context, new, old *CachedResourceResource) error
182178

pkg/reconciler/cache/cachedresources/cachedresources_reconcile.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ func (c *Controller) reconcile(ctx context.Context, cluster logicalcluster.Name,
8383
},
8484
&replication{
8585
shardName: c.shardName,
86-
dynamicCacheClient: c.dynamicClient,
86+
dynamicClusterClient: c.dynamicClient,
8787
kcpCacheClient: c.kcpCacheClient,
8888
dynRESTMapper: c.dynRESTMapper,
89-
cacheKcpInformers: c.cacheKcpInformers,
89+
cachedObjectsInformer: c.cachedObjectsInformer,
9090
discoveringDynamicKcpInformers: c.discoveringDynamicKcpInformers,
9191
callback: c.enqueue,
9292
controllerRegistry: c.controllerRegistry,

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,17 @@ import (
3434
cachev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/cache/v1alpha1"
3535
"github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions"
3636
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
37-
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
37+
cachev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/cache/v1alpha1"
3838
)
3939

4040
// replication starts the replication machinery for a published resource.
4141
// Or deletes the replication controller if the published resource is being deleted.
4242
type replication struct {
4343
shardName string
44-
dynamicCacheClient kcpdynamic.ClusterInterface
44+
dynamicClusterClient kcpdynamic.ClusterInterface
4545
kcpCacheClient kcpclientset.ClusterInterface
4646
dynRESTMapper *dynamicrestmapper.DynamicRESTMapper
47-
cacheKcpInformers kcpinformers.SharedInformerFactory
47+
cachedObjectsInformer cachev1alpha1informers.CachedObjectClusterInformer
4848
discoveringDynamicKcpInformers *informer.DiscoveringDynamicSharedInformerFactory
4949
callback func(obj interface{})
5050
controllerRegistry *controllerRegistry
@@ -53,7 +53,6 @@ type replication struct {
5353
func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alpha1.CachedResource) (reconcileStatus, error) {
5454
logger := klog.FromContext(ctx)
5555
logger.Info("reconciling published resource", "CachedResource", cachedResource.Name)
56-
5756
gvr := schema.GroupVersionResource{
5857
Group: cachedResource.Spec.Group,
5958
Version: cachedResource.Spec.Version,
@@ -76,8 +75,6 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
7675
// Global informer is based on the CachedResource type and we construct index based on the schema labels.
7776
controllerCtx, cancel := context.WithCancel(ctx)
7877

79-
global := r.cacheKcpInformers.Cache().V1alpha1().CachedObjects()
80-
8178
// Local informer is based on the specific types we want to replicate.
8279
local, err := r.discoveringDynamicKcpInformers.ClusterWithContext(ctx, cluster).ForResource(gvr)
8380
if err != nil {
@@ -92,19 +89,18 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
9289
return reconcileStatusStopAndRequeue, err
9390
}
9491
replicated := &replicationcontroller.ReplicatedGVR{
95-
Kind: replicatedKind.Kind,
96-
Local: local.Informer(),
97-
Global: global.Informer(),
92+
Kind: replicatedKind.Kind,
93+
Local: local.Informer(),
9894
}
99-
replicationcontroller.InstallIndexers(replicated)
10095
callback := func() {
10196
r.callback(cachedResource)
10297
}
10398

10499
c, err := replicationcontroller.NewController(
105100
r.shardName,
106-
r.dynamicCacheClient,
101+
r.dynamicClusterClient,
107102
r.kcpCacheClient,
103+
r.cachedObjectsInformer,
108104
gvr,
109105
replicated,
110106
callback,
@@ -116,9 +112,8 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
116112
}
117113

118114
go replicated.Local.Run(ctx.Done())
119-
go replicated.Global.Run(ctx.Done())
120115

121-
if !cache.WaitForCacheSync(ctx.Done(), replicated.Local.HasSynced, replicated.Global.HasSynced) {
116+
if !cache.WaitForCacheSync(ctx.Done(), replicated.Local.HasSynced) {
122117
cancel()
123118
return reconcileStatusContinue, fmt.Errorf("failed to wait for informers to sync")
124119
}

pkg/reconciler/cache/cachedresources/replication/replication_controller.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@ import (
3232

3333
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
3434
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
35+
"github.com/kcp-dev/logicalcluster/v3"
3536

3637
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
3738
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
38-
"github.com/kcp-dev/kcp/pkg/indexers"
3939
"github.com/kcp-dev/kcp/pkg/logging"
4040
cachev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/cache/v1alpha1"
4141
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
42+
cachev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/cache/v1alpha1"
4243
)
4344

4445
// Locally we store object with the original form of the object.
@@ -53,8 +54,9 @@ const (
5354
// NewController returns a new replication controller.
5455
func NewController(
5556
shardName string,
56-
dynamicCacheClient kcpdynamic.ClusterInterface,
57+
dynamicClusterClient kcpdynamic.ClusterInterface,
5758
kcpCacheClient kcpclientset.ClusterInterface,
59+
cachedObjectsInformer cachev1alpha1informers.CachedObjectClusterInformer,
5860
gvr schema.GroupVersionResource,
5961
replicated *ReplicatedGVR,
6062
callback func(),
@@ -68,12 +70,13 @@ func NewController(
6870
Name: ControllerName,
6971
},
7072
),
71-
dynamicCacheClient: dynamicCacheClient,
72-
kcpCacheClient: kcpCacheClient,
73-
replicated: replicated,
74-
callback: callback,
75-
cleanupFuncs: make([]func(), 0),
76-
localLabelSelector: localLabelSelector,
73+
dynamicClusterClient: dynamicClusterClient,
74+
kcpCacheClient: kcpCacheClient,
75+
cachedObjectsInformer: cachedObjectsInformer,
76+
replicated: replicated,
77+
callback: callback,
78+
cleanupFuncs: make([]func(), 0),
79+
localLabelSelector: localLabelSelector,
7780
}
7881

7982
localHandler, err := c.replicated.Local.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -88,7 +91,7 @@ func NewController(
8891
_ = c.replicated.Local.RemoveEventHandler(localHandler)
8992
})
9093

91-
globalHandler, err := c.replicated.Global.AddEventHandler(cache.ResourceEventHandlerFuncs{
94+
globalHandler, err := c.cachedObjectsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
9295
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
9396
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj) },
9497
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj) },
@@ -97,7 +100,7 @@ func NewController(
97100
return nil, err
98101
}
99102
c.cleanupFuncs = append(c.cleanupFuncs, func() {
100-
_ = c.replicated.Global.RemoveEventHandler(globalHandler)
103+
_ = c.cachedObjectsInformer.Informer().RemoveEventHandler(globalHandler)
101104
})
102105

103106
return c, nil
@@ -198,8 +201,9 @@ type Controller struct {
198201
shardName string
199202
queue workqueue.TypedRateLimitingInterface[string]
200203

201-
dynamicCacheClient kcpdynamic.ClusterInterface
202-
kcpCacheClient kcpclientset.ClusterInterface
204+
dynamicClusterClient kcpdynamic.ClusterInterface
205+
kcpCacheClient kcpclientset.ClusterInterface
206+
cachedObjectsInformer cachev1alpha1informers.CachedObjectClusterInformer
203207

204208
replicated *ReplicatedGVR
205209

@@ -223,17 +227,7 @@ type Controller struct {
223227
}
224228

225229
type ReplicatedGVR struct {
226-
Kind string
227-
Filter func(u *unstructured.Unstructured) bool
228-
Global, Local cache.SharedIndexInformer
229-
}
230-
231-
// InstallIndexers adds the additional indexers that this controller requires to the informers.
232-
func InstallIndexers(replicated *ReplicatedGVR) {
233-
indexers.AddIfNotPresentOrDie(
234-
replicated.Global.GetIndexer(),
235-
cache.Indexers{
236-
ByGVRAndShardAndLogicalClusterAndNamespaceAndName: IndexByGVRAndShardAndLogicalClusterAndNamespace,
237-
},
238-
)
230+
Kind string
231+
Filter func(u *unstructured.Unstructured) bool
232+
Local cache.SharedIndexInformer
239233
}

0 commit comments

Comments
 (0)