Skip to content

Commit 7a112ac

Browse files
committed
Replication VW now resolves resources across shards
* Retrieves CachedResource from its global informer. * Wrapped resource's schema is now replicated, and retrieved from its global informer. On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent e9a8945 commit 7a112ac

File tree

4 files changed

+57
-107
lines changed

4 files changed

+57
-107
lines changed

cmd/virtual-workspaces/command/cmd.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ func Run(ctx context.Context, o *options.Options) error {
149149
go http.ListenAndServe(o.ProfilerAddress, nil)
150150
}
151151

152-
// Start the CachedObjects informer against the cache server.
153-
_ = cacheKcpInformers.Cache().V1alpha1().CachedObjects().Informer()
154-
155152
// create apiserver
156153
scheme := runtime.NewScheme()
157154
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Group: "", Version: "v1"})

pkg/cache/server/bootstrap/bootstrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func Bootstrap(ctx context.Context, apiExtensionsClusterClient kcpapiextensionsc
5252
{"core.kcp.io", "logicalclusters"},
5353
{"core.kcp.io", "shards"},
5454
{"cache.kcp.io", "cachedobjects"},
55+
{"cache.kcp.io", "cachedresources"},
5556
{"tenancy.kcp.io", "workspacetypes"},
5657
{"rbac.authorization.k8s.io", "roles"},
5758
{"rbac.authorization.k8s.io", "clusterroles"},

pkg/virtual/replication/builder/build.go

Lines changed: 56 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"strings"
2424

25-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2625
"k8s.io/apimachinery/pkg/runtime/schema"
2726
"k8s.io/apiserver/pkg/authorization/authorizer"
2827
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
@@ -37,9 +36,9 @@ import (
3736

3837
"github.com/kcp-dev/kcp/pkg/authorization"
3938
"github.com/kcp-dev/kcp/pkg/indexers"
40-
"github.com/kcp-dev/kcp/pkg/reconciler/apis/apibinding"
39+
"github.com/kcp-dev/kcp/pkg/informer"
40+
cachedresources "github.com/kcp-dev/kcp/pkg/reconciler/cache/cachedresources"
4141
cachedresourcesreplication "github.com/kcp-dev/kcp/pkg/reconciler/cache/cachedresources/replication"
42-
"github.com/kcp-dev/kcp/pkg/virtual/apiexport/schemas/builtin"
4342
"github.com/kcp-dev/kcp/pkg/virtual/framework"
4443
virtualworkspacesdynamic "github.com/kcp-dev/kcp/pkg/virtual/framework/dynamic"
4544
"github.com/kcp-dev/kcp/pkg/virtual/framework/dynamic/apidefinition"
@@ -52,15 +51,17 @@ import (
5251
replicationauthorizer "github.com/kcp-dev/kcp/pkg/virtual/replication/authorizer"
5352
apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
5453
apisv1alpha2 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha2"
54+
cachev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/cache/v1alpha1"
5555
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
56-
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
56+
"github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions"
5757
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
58+
apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1"
59+
cachev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/cache/v1alpha1"
5860
)
5961

6062
func BuildVirtualWorkspace(
6163
cfg *rest.Config,
6264
rootPathPrefix string,
63-
kcpClusterClient kcpclientset.ClusterInterface,
6465
dynamicClusterClient kcpdynamic.ClusterInterface,
6566
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
6667
localKcpInformers kcpinformers.SharedInformerFactory,
@@ -108,15 +109,36 @@ func BuildVirtualWorkspace(
108109
}
109110
}),
110111
BootstrapAPISetManagement: func(mainConfig genericapiserver.CompletedConfig) (apidefinition.APIDefinitionSetGetter, error) {
112+
// Define informers that need to be waited for in the post-start hook. Calling Informer() starts the informer,
113+
// and we need to do that before the SharedInformerFactory.Start() is called in cmd/virtual-workspaces/cmd.go.
114+
115+
globalInformers := map[string]cache.SharedIndexInformer{
116+
"cachedobjects": globalKcpInformers.Cache().V1alpha1().CachedObjects().Informer(),
117+
"cachedresources": globalKcpInformers.Cache().V1alpha1().CachedResources().Informer(),
118+
"apiexports": globalKcpInformers.Apis().V1alpha2().APIExports().Informer(),
119+
"apiresourceschemas": globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
120+
}
121+
122+
localInformers := map[string]cache.SharedIndexInformer{
123+
"cachedresources": localKcpInformers.Cache().V1alpha1().CachedResources().Informer(),
124+
"apiexports": localKcpInformers.Apis().V1alpha2().APIExports().Informer(),
125+
"apiresourceschemas": localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
126+
}
127+
111128
if err := mainConfig.AddPostStartHook(replication.VirtualWorkspaceName, func(hookContext genericapiserver.PostStartHookContext) error {
112129
defer close(readyCh)
113130

131+
// CachedResources indexers.
132+
114133
indexers.AddIfNotPresentOrDie(
115134
globalKcpInformers.Cache().V1alpha1().CachedObjects().Informer().GetIndexer(),
116135
cache.Indexers{
117136
cachedresourcesreplication.ByGVRAndLogicalClusterAndNamespace: cachedresourcesreplication.IndexByGVRAndLogicalClusterAndNamespace,
118137
},
119138
)
139+
140+
// APIExport indexers.
141+
120142
indexers.AddIfNotPresentOrDie(
121143
globalKcpInformers.Apis().V1alpha2().APIExports().Informer().GetIndexer(),
122144
cache.Indexers{
@@ -130,13 +152,18 @@ func BuildVirtualWorkspace(
130152
},
131153
)
132154

133-
for name, informer := range map[string]cache.SharedIndexInformer{
134-
"cachedresources": globalKcpInformers.Cache().V1alpha1().CachedObjects().Informer(),
135-
"apiexports": globalKcpInformers.Apis().V1alpha2().APIExports().Informer(),
136-
"apiresourceschemas": globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
137-
} {
155+
// Wait for caches to be synced.
156+
157+
for name, informer := range globalInformers {
158+
if !cache.WaitForNamedCacheSync(name, hookContext.Done(), informer.HasSynced) {
159+
klog.Background().Error(nil, "global informer not synced")
160+
return nil
161+
}
162+
}
163+
164+
for name, informer := range localInformers {
138165
if !cache.WaitForNamedCacheSync(name, hookContext.Done(), informer.HasSynced) {
139-
klog.Background().Error(nil, "informer not synced")
166+
klog.Background().Error(nil, "local informer not synced")
140167
return nil
141168
}
142169
}
@@ -147,8 +174,8 @@ func BuildVirtualWorkspace(
147174
}
148175

149176
return &singleResourceAPIDefinitionSetProvider{
150-
localKcpInformers: localKcpInformers,
151-
kcpClusterClient: kcpClusterClient,
177+
localKcpInformers: localKcpInformers,
178+
globalKcpInformers: globalKcpInformers,
152179

153180
getLogicalCluster: func(cluster logicalcluster.Name, name string) (*corev1alpha1.LogicalCluster, error) {
154181
return localKcpInformers.Core().V1alpha1().LogicalClusters().Cluster(cluster).Lister().Get(name)
@@ -168,9 +195,9 @@ func BuildVirtualWorkspace(
168195
)
169196
},
170197

171-
getAPIResourceSchemaByName: func(cluster logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) {
172-
return globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Cluster(cluster).Lister().Get(name)
173-
},
198+
getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister()),
199+
200+
getCachedResource: informer.NewScopedGetterWithFallback[*cachev1alpha1.CachedResource, cachev1alpha1listers.CachedResourceLister](localKcpInformers.Cache().V1alpha1().CachedResources().Lister(), globalKcpInformers.Cache().V1alpha1().CachedResources().Lister()),
174201

175202
config: mainConfig,
176203
dynamicClusterClient: dynamicClusterClient,
@@ -270,86 +297,14 @@ type singleResourceAPIDefinitionSetProvider struct {
270297
dynamicClusterClient kcpdynamic.ClusterInterface
271298
storageProvider func(ctx context.Context, dynamicClusterClientFunc forwardingregistry.DynamicClusterClientFunc, sch *apisv1alpha1.APIResourceSchema, version string) (apiserver.RestProviderFunc, error)
272299

273-
kcpClusterClient kcpclientset.ClusterInterface
274-
localKcpInformers kcpinformers.SharedInformerFactory
275-
276-
getLogicalCluster func(cluster logicalcluster.Name, name string) (*corev1alpha1.LogicalCluster, error)
277-
getAPIBinding func(cluster logicalcluster.Name, name string) (*apisv1alpha2.APIBinding, error)
278-
getAPIExportByPath func(path logicalcluster.Path, name string) (*apisv1alpha2.APIExport, error)
279-
getAPIResourceSchemaByName func(cluster logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error)
280-
}
281-
282-
func getResourceBindingsAnnJSON(lc *corev1alpha1.LogicalCluster) string {
283-
const jsonEmptyObj = "{}"
284-
285-
if lc == nil {
286-
return jsonEmptyObj
287-
}
288-
289-
ann := lc.Annotations[apibinding.ResourceBindingsAnnotationKey]
290-
if ann == "" {
291-
ann = jsonEmptyObj
292-
}
293-
294-
return ann
295-
}
296-
297-
func (a *singleResourceAPIDefinitionSetProvider) getAPIResourceSchema(
298-
ctx context.Context,
299-
clusterName logicalcluster.Name,
300-
gvr schema.GroupVersionResource,
301-
) (*apisv1alpha1.APIResourceSchema, error) {
302-
if gvr.Group == "" {
303-
// Assume built-in types.
304-
return builtin.GetBuiltInAPISchema(apisv1alpha1.GroupResource{Group: "", Resource: gvr.Resource})
305-
}
306-
307-
lc, err := a.getLogicalCluster(clusterName, "cluster")
308-
if err != nil {
309-
return nil, err
310-
}
311-
resBindingsAnnStr := getResourceBindingsAnnJSON(lc)
312-
resBindingsAnn, err := apibinding.UnmarshalResourceBindingsAnnotation(resBindingsAnnStr)
313-
if err != nil {
314-
return nil, fmt.Errorf("failed to parse annotation on LogicalCluster %s|%s: %v", clusterName, "cluster", err)
315-
}
316-
317-
bindingName := ""
318-
for gr, v := range resBindingsAnn {
319-
if v.CRD {
320-
continue
321-
}
322-
if gr == gvr.GroupResource().String() {
323-
bindingName = v.Name
324-
}
325-
}
326-
327-
if bindingName == "" {
328-
return nil, fmt.Errorf("no binding for %s found in workspace %s", gvr.GroupResource().String(), clusterName)
329-
}
330-
331-
apiBinding, err := a.getAPIBinding(clusterName, bindingName)
332-
if err != nil {
333-
return nil, fmt.Errorf("failed to get APIBinding %s|%s", bindingName, clusterName)
334-
}
335-
336-
apiExport, err := a.getAPIExportByPath(logicalcluster.NewPath(apiBinding.Spec.Reference.Export.Path), apiBinding.Spec.Reference.Export.Name)
337-
if err != nil {
338-
return nil, fmt.Errorf("failed to get APIExport %s|%s referenced by APIBinding %s|%s: %v",
339-
apiBinding.Spec.Reference.Export.Path, apiBinding.Spec.Reference.Export.Name,
340-
clusterName, bindingName, err,
341-
)
342-
}
343-
apiExportClusterName := logicalcluster.From(apiExport)
344-
345-
schName := ""
346-
for _, exportResource := range apiExport.Spec.Resources {
347-
if exportResource.Group == gvr.Group && exportResource.Name == gvr.Resource {
348-
schName = exportResource.Schema
349-
}
350-
}
300+
localKcpInformers kcpinformers.SharedInformerFactory
301+
globalKcpInformers kcpinformers.SharedInformerFactory
351302

352-
return a.getAPIResourceSchemaByName(apiExportClusterName, schName)
303+
getLogicalCluster func(cluster logicalcluster.Name, name string) (*corev1alpha1.LogicalCluster, error)
304+
getAPIBinding func(cluster logicalcluster.Name, name string) (*apisv1alpha2.APIBinding, error)
305+
getAPIExportByPath func(path logicalcluster.Path, name string) (*apisv1alpha2.APIExport, error)
306+
getCachedResource func(cluster logicalcluster.Name, name string) (*cachev1alpha1.CachedResource, error)
307+
getAPIResourceSchema func(cluster logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error)
353308
}
354309

355310
func (a *singleResourceAPIDefinitionSetProvider) GetAPIDefinitionSet(ctx context.Context, key dynamiccontext.APIDomainKey) (apis apidefinition.APIDefinitionSet, apisExist bool, err error) {
@@ -362,14 +317,17 @@ func (a *singleResourceAPIDefinitionSetProvider) GetAPIDefinitionSet(ctx context
362317
return a.dynamicClusterClient, nil
363318
}
364319

365-
cachedResource, err := a.kcpClusterClient.CacheV1alpha1().CachedResources().Cluster(parsedKey.CachedResourceCluster.Path()).
366-
Get(ctx, parsedKey.CachedResourceName, metav1.GetOptions{})
320+
cachedResource, err := a.getCachedResource(parsedKey.CachedResourceCluster, parsedKey.CachedResourceName)
367321
if err != nil {
368322
return nil, false, err
369323
}
370324

325+
if !conditions.IsTrue(cachedResource, cachev1alpha1.CachedResourceValid) {
326+
return nil, false, fmt.Errorf("CachedResource %s|%s not ready", parsedKey.CachedResourceCluster, parsedKey.CachedResourceName)
327+
}
328+
371329
wrappedGVR := schema.GroupVersionResource(cachedResource.Spec.GroupVersionResource)
372-
wrappedSch, err := a.getAPIResourceSchema(ctx, parsedKey.CachedResourceCluster, wrappedGVR)
330+
wrappedSch, err := a.getAPIResourceSchema(logicalcluster.From(cachedResource), cachedresources.CachedAPIResourceSchemaName(cachedResource.UID, wrappedGVR.GroupResource()))
373331
if err != nil {
374332
return nil, false, fmt.Errorf("failed to get schema for wrapped object in CachedResource %s|%s: %v", parsedKey.CachedResourceCluster, parsedKey.CachedResourceName, err)
375333
}

pkg/virtual/replication/options/options.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/kcp-dev/kcp/pkg/virtual/framework/rootapiserver"
3030
"github.com/kcp-dev/kcp/pkg/virtual/replication"
3131
"github.com/kcp-dev/kcp/pkg/virtual/replication/builder"
32-
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
3332
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
3433
)
3534

@@ -69,15 +68,10 @@ func (o *Replication) NewReplication(
6968
if err != nil {
7069
return nil, err
7170
}
72-
kcpClusterClient, err := kcpclientset.NewForConfig(config)
73-
if err != nil {
74-
return nil, err
75-
}
7671

7772
return builder.BuildVirtualWorkspace(
7873
config,
7974
path.Join(rootPathPrefix, replication.VirtualWorkspaceName),
80-
kcpClusterClient,
8175
dynamicClusterClient,
8276
kubeClusterClient,
8377
wildcardKcpInformers,

0 commit comments

Comments
 (0)