Skip to content

Commit f60287a

Browse files
committed
fix workspace type extend.with installtion bug
Signed-off-by: olalekan odukoya <[email protected]>
1 parent 896e5e9 commit f60287a

File tree

4 files changed

+112
-32
lines changed

4 files changed

+112
-32
lines changed

pkg/admission/workspacetypeexists/admission.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,27 +182,32 @@ func (o *workspacetypeExists) Admit(ctx context.Context, a admission.Attributes,
182182
}
183183

184184
func (o *workspacetypeExists) resolveTypeRef(workspacePath logicalcluster.Path, ref tenancyv1alpha1.WorkspaceTypeReference) (*tenancyv1alpha1.WorkspaceType, error) {
185+
return resolveWorkspaceTypeReference(workspacePath, ref, o.getType)
186+
}
187+
188+
func resolveWorkspaceTypeReference(workspacePath logicalcluster.Path, ref tenancyv1alpha1.WorkspaceTypeReference, getter func(logicalcluster.Path, string) (*tenancyv1alpha1.WorkspaceType, error)) (*tenancyv1alpha1.WorkspaceType, error) {
185189
if ref.Path != "" {
186-
wt, err := o.getType(logicalcluster.NewPath(ref.Path), string(ref.Name))
190+
wt, err := getter(logicalcluster.NewPath(ref.Path), string(ref.Name))
187191
if err != nil {
188192
return nil, apierrors.NewInternalError(err)
189193
}
190194

191195
return wt, err
192196
}
193197

198+
currentPath := workspacePath
194199
for {
195-
wt, err := o.getType(workspacePath, string(ref.Name))
200+
wt, err := getter(currentPath, string(ref.Name))
196201
if err != nil {
197202
if apierrors.IsNotFound(err) {
198-
parent, hasParent := workspacePath.Parent()
199-
if !hasParent && workspacePath != core.RootCluster.Path() {
203+
parent, hasParent := currentPath.Parent()
204+
if !hasParent && currentPath != core.RootCluster.Path() {
200205
// fall through with root cluster. We always check types in there as last chance.
201206
parent = core.RootCluster.Path()
202207
} else if !hasParent {
203208
return nil, fmt.Errorf("workspace type %q cannot be resolved", ref.String())
204209
}
205-
workspacePath = parent
210+
currentPath = parent
206211
continue
207212
}
208213
return nil, apierrors.NewInternalError(err)
@@ -439,7 +444,12 @@ func (r *transitiveTypeResolver) resolve(wt *tenancyv1alpha1.WorkspaceType, seen
439444

440445
ret := make([]*tenancyv1alpha1.WorkspaceType, 0, len(wt.Spec.Extend.With))
441446
for _, baseTypeRef := range wt.Spec.Extend.With {
442-
qualifiedName := logicalcluster.NewPath(baseTypeRef.Path).Join(tenancyv1alpha1.ObjectName(baseTypeRef.Name)).String()
447+
baseType, err := resolveWorkspaceTypeReference(canonicalPathFrom(wt), baseTypeRef, r.getter)
448+
if err != nil {
449+
return nil, fmt.Errorf("unable to find inherited workspace type %s", baseTypeRef.Name)
450+
}
451+
452+
qualifiedName := canonicalPathFrom(baseType).Join(baseType.Name).String()
443453
if pathSeen[qualifiedName] {
444454
// seen in this path already. That's a cycle.
445455
for i, t := range path {
@@ -455,10 +465,6 @@ func (r *transitiveTypeResolver) resolve(wt *tenancyv1alpha1.WorkspaceType, seen
455465
continue // already seen trunk
456466
}
457467

458-
baseType, err := r.getter(logicalcluster.NewPath(baseTypeRef.Path), tenancyv1alpha1.ObjectName(baseTypeRef.Name))
459-
if err != nil {
460-
return nil, fmt.Errorf("unable to find inherited workspace type %s", qualifiedName)
461-
}
462468
ret = append(ret, baseType)
463469

464470
parents, err := r.resolve(baseType, seen, pathSeen, path)

pkg/admission/workspacetypeexists/admission_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,23 @@ func TestTransitiveTypeResolverResolve(t *testing.T) {
622622
want: sets.New[string]("root:universal", "root:organization", "root:org:type"),
623623
wantErr: true,
624624
},
625+
{
626+
name: "extending types without explicit path",
627+
types: map[string]*tenancyv1alpha1.WorkspaceType{
628+
"root:org:base": newType("root:org:base").WorkspaceType,
629+
"root:org:universal": newType("root:org:universal").WorkspaceType,
630+
},
631+
input: func() *tenancyv1alpha1.WorkspaceType {
632+
wt := newType("root:org:extended").WorkspaceType
633+
wt.Spec.Extend.With = []tenancyv1alpha1.WorkspaceTypeReference{
634+
{Name: "base"},
635+
{Name: "universal", Path: "root:org"},
636+
}
637+
return wt
638+
}(),
639+
want: sets.New[string]("root:org:base", "root:org:universal", "root:org:extended"),
640+
wantErr: false,
641+
},
625642
}
626643
for _, tt := range tests {
627644
t.Run(tt.name, func(t *testing.T) {

pkg/reconciler/tenancy/workspace/workspace_controller.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package workspace
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
"k8s.io/apimachinery/pkg/api/errors"
@@ -50,6 +51,77 @@ const (
5051
ControllerName = "kcp-workspace"
5152
)
5253

54+
type clientPool struct {
55+
mu sync.RWMutex
56+
kcpClients map[string]kcpclientset.ClusterInterface
57+
kubeClients map[string]kubernetes.ClusterInterface
58+
adminConfig *rest.Config
59+
}
60+
61+
func newClientPool(adminConfig *rest.Config) *clientPool {
62+
return &clientPool{
63+
kcpClients: make(map[string]kcpclientset.ClusterInterface),
64+
kubeClients: make(map[string]kubernetes.ClusterInterface),
65+
adminConfig: adminConfig,
66+
}
67+
}
68+
69+
// getKcpClient returns a kcp client (i.e. a client that implements kcpclient.ClusterInterface) for the given shard.
70+
// the returned client establishes a direct connection with the shard with credentials stored in adminConfig.
71+
// clients are cached per shard to prevent connection leaks.
72+
func (p *clientPool) getKcpClient(shardName, baseURL string) (kcpclientset.ClusterInterface, error) {
73+
p.mu.RLock()
74+
if client, exists := p.kcpClients[shardName]; exists {
75+
p.mu.RUnlock()
76+
return client, nil
77+
}
78+
p.mu.RUnlock()
79+
80+
p.mu.Lock()
81+
defer p.mu.Unlock()
82+
83+
if client, exists := p.kcpClients[shardName]; exists {
84+
return client, nil
85+
}
86+
87+
shardConfig := rest.CopyConfig(p.adminConfig)
88+
shardConfig.Host = baseURL
89+
client, err := kcpclientset.NewForConfig(shardConfig)
90+
if err != nil {
91+
return nil, fmt.Errorf("failed to create shard %q kcp client: %w", shardName, err)
92+
}
93+
p.kcpClients[shardName] = client
94+
return client, nil
95+
}
96+
97+
// getKubeClient returns a kube client (i.e. a client that implements kubernetes.ClusterInterface) for the given shard.
98+
// the returned client establishes a direct connection with the shard with credentials stored in adminConfig.
99+
// clients are cached per shard to prevent connection leaks.
100+
func (p *clientPool) getKubeClient(shardName, baseURL string) (kubernetes.ClusterInterface, error) {
101+
p.mu.RLock()
102+
if client, exists := p.kubeClients[shardName]; exists {
103+
p.mu.RUnlock()
104+
return client, nil
105+
}
106+
p.mu.RUnlock()
107+
108+
p.mu.Lock()
109+
defer p.mu.Unlock()
110+
111+
if client, exists := p.kubeClients[shardName]; exists {
112+
return client, nil
113+
}
114+
115+
shardConfig := rest.CopyConfig(p.adminConfig)
116+
shardConfig.Host = baseURL
117+
client, err := kubernetes.NewForConfig(shardConfig)
118+
if err != nil {
119+
return nil, fmt.Errorf("failed to create shard %q kube client: %w", shardName, err)
120+
}
121+
p.kubeClients[shardName] = client
122+
return client, nil
123+
}
124+
53125
func NewController(
54126
shardName string,
55127
kcpClusterClient kcpclientset.ClusterInterface,
@@ -88,6 +160,8 @@ func NewController(
88160
logicalClusterIndexer: logicalClusterInformer.Informer().GetIndexer(),
89161
logicalClusterLister: logicalClusterInformer.Lister(),
90162

163+
clientPool: newClientPool(logicalClusterAdminConfig),
164+
91165
commit: committer.NewCommitter[*tenancyv1alpha1.Workspace, tenancyv1alpha1client.WorkspaceInterface, *tenancyv1alpha1.WorkspaceSpec, *tenancyv1alpha1.WorkspaceStatus](kcpClusterClient.TenancyV1alpha1().Workspaces()),
92166
}
93167

@@ -132,6 +206,9 @@ type Controller struct {
132206
logicalClusterIndexer cache.Indexer
133207
logicalClusterLister corev1alpha1listers.LogicalClusterClusterLister
134208

209+
// clientPool manages reusable clients to prevent connection leaks
210+
clientPool *clientPool
211+
135212
// commit creates a patch and submits it, if needed.
136213
commit func(ctx context.Context, old, new *workspaceResource) error
137214
}

pkg/reconciler/tenancy/workspace/workspace_reconcile.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@ package workspace
1818

1919
import (
2020
"context"
21-
"fmt"
2221
"time"
2322

2423
apierrors "k8s.io/apimachinery/pkg/api/errors"
2524
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2625
utilserrors "k8s.io/apimachinery/pkg/util/errors"
27-
restclient "k8s.io/client-go/rest"
2826

2927
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
3028
"github.com/kcp-dev/client-go/kubernetes"
@@ -61,36 +59,18 @@ func (c *Controller) reconcile(ctx context.Context, ws *tenancyv1alpha1.Workspac
6159
return shards[0].(*corev1alpha1.Shard), nil
6260
}
6361

64-
// kcpLogicalClusterAdminClientFor returns a kcp client (i.e. a client that implements kcpclient.ClusterInterface) for the given shard.
65-
// the returned client establishes a direct connection with the shard with credentials stored in r.logicalClusterAdminConfig.
66-
// TODO:(p0lyn0mial): make it more efficient, maybe we need a per shard client pool or we could use an HTTPRoundTripper
6762
kcpDirectClientFor := func(shard *corev1alpha1.Shard) (kcpclientset.ClusterInterface, error) {
6863
if shard.Name == c.shardName {
6964
return c.kcpClusterClient, nil
7065
}
71-
shardConfig := restclient.CopyConfig(c.logicalClusterAdminConfig)
72-
shardConfig.Host = shard.Spec.BaseURL
73-
shardClient, err := kcpclientset.NewForConfig(shardConfig)
74-
if err != nil {
75-
return nil, fmt.Errorf("failed to create shard %q kube client: %w", shard.Name, err)
76-
}
77-
return shardClient, nil
66+
return c.clientPool.getKcpClient(shard.Name, shard.Spec.BaseURL)
7867
}
7968

80-
// kubeLogicalClusterAdminClientFor returns a kube client (i.e. a client that implements kubernetes.ClusterInterface) for the given shard.
81-
// the returned client establishes a direct connection with the shard with credentials stored in r.logicalClusterAdminConfig.
82-
// TODO:(p0lyn0mial): make it more efficient, maybe we need a per shard client pool or we could use an HTTPRoundTripper
8369
kubeDirectClientFor := func(shard *corev1alpha1.Shard) (kubernetes.ClusterInterface, error) {
8470
if shard.Name == c.shardName {
8571
return c.kubeClusterClient, nil
8672
}
87-
shardConfig := restclient.CopyConfig(c.logicalClusterAdminConfig)
88-
shardConfig.Host = shard.Spec.BaseURL
89-
shardClient, err := kubernetes.NewForConfig(shardConfig)
90-
if err != nil {
91-
return nil, fmt.Errorf("failed to create shard %q kube client: %w", shard.Name, err)
92-
}
93-
return shardClient, nil
73+
return c.clientPool.getKubeClient(shard.Name, shard.Spec.BaseURL)
9474
}
9575

9676
getType := func(path logicalcluster.Path, name string) (*tenancyv1alpha1.WorkspaceType, error) {

0 commit comments

Comments
 (0)