Skip to content

Commit 2e389b4

Browse files
committed
implement support for APIExportEndpointSlices
On-behalf-of: @SAP [email protected]
1 parent 09d43ad commit 2e389b4

File tree

4 files changed

+405
-121
lines changed

4 files changed

+405
-121
lines changed

cmd/api-syncagent/kcp.go

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"regexp"
24+
25+
"github.com/kcp-dev/api-syncagent/internal/kcp"
26+
"github.com/kcp-dev/logicalcluster/v3"
27+
28+
kcpdevv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
29+
kcpdevcore "github.com/kcp-dev/kcp/sdk/apis/core"
30+
kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
31+
32+
"k8s.io/apimachinery/pkg/fields"
33+
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/apimachinery/pkg/types"
35+
"k8s.io/client-go/rest"
36+
"sigs.k8s.io/controller-runtime/pkg/cache"
37+
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
38+
"sigs.k8s.io/controller-runtime/pkg/cluster"
39+
)
40+
41+
// The agent has two potentially different kcp clusters:
42+
//
43+
// endpointCluster - this is where the source of the virtual workspace URLs
44+
// live, i.e. where the APIExport/EndpointSlice.
45+
// managedCluster - this is where the APIExport and APIResourceSchemas
46+
// exist that are meant to be reconciled.
47+
//
48+
// The managedCluster always exists, the endpointCluster only if the workspace
49+
// for the virtual workspace source is different from the managed cluster.
50+
51+
// setupEndpointKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
52+
// that is solvely used to watch whichever object holds the virtual workspace URLs,
53+
// either the APIExport or the APIExportEndpointSlice.
54+
func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
55+
// no need for a dedicated endpoint cluster
56+
if endpoint.APIExport.Cluster == endpoint.EndpointSlice.Cluster {
57+
return nil, nil
58+
}
59+
60+
scheme := runtime.NewScheme()
61+
62+
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
63+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
64+
}
65+
66+
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
67+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
68+
}
69+
70+
// RBAC in kcp might be very tight and might not allow to list/watch all objects;
71+
// restrict the cache's selectors accordingly so we can still make use of caching.
72+
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
73+
&kcpdevv1alpha1.APIExportEndpointSlice{}: {
74+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.EndpointSlice.Name}),
75+
},
76+
}
77+
78+
return cluster.New(endpoint.EndpointSlice.Config, func(o *cluster.Options) {
79+
o.Scheme = scheme
80+
o.Cache = cache.Options{
81+
Scheme: scheme,
82+
ByObject: byObject,
83+
}
84+
})
85+
}
86+
87+
// setupManagedKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
88+
// that is solvely used to manage the APIExport and APIResourceSchemas.
89+
func setupManagedKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
90+
scheme := runtime.NewScheme()
91+
92+
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
93+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
94+
}
95+
96+
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
97+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
98+
}
99+
100+
// RBAC in kcp might be very tight and might not allow to list/watch all objects;
101+
// restrict the cache's selectors accordingly so we can still make use of caching.
102+
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
103+
&kcpdevv1alpha1.APIExport{}: {
104+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.APIExport.Name}),
105+
},
106+
}
107+
108+
return cluster.New(endpoint.APIExport.Config, func(o *cluster.Options) {
109+
o.Scheme = scheme
110+
o.Cache = cache.Options{
111+
Scheme: scheme,
112+
ByObject: byObject,
113+
}
114+
})
115+
}
116+
117+
type qualifiedCluster struct {
118+
Cluster logicalcluster.Name
119+
Path logicalcluster.Path
120+
Config *rest.Config
121+
}
122+
123+
type qualifiedAPIExport struct {
124+
*kcpdevv1alpha1.APIExport
125+
qualifiedCluster
126+
}
127+
128+
type qualifiedAPIExportEndpointSlice struct {
129+
*kcpdevv1alpha1.APIExportEndpointSlice
130+
qualifiedCluster
131+
}
132+
133+
type syncEndpoint struct {
134+
APIExport qualifiedAPIExport
135+
EndpointSlice *qualifiedAPIExportEndpointSlice
136+
}
137+
138+
// resolveSyncEndpoint takes the user provided (usually via CLI flags) APIExportEndpointSliceRef and
139+
// APIExportRef and resolves, returning a consistent SyncEndpoint. The initialRestConfig must point
140+
// to the cluster where either of the two objects reside (i.e. if the APIExportRef is given, it
141+
// must point to the cluster where the APIExport lives, and vice versa for the endpoint slice;
142+
// however the endpoint slice references an APIExport in potentially another cluster, and for this
143+
// case the initialRestConfig will be rewritten accordingly).
144+
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string, apiExportRef string) (*syncEndpoint, error) {
145+
// construct temporary, uncached client
146+
scheme := runtime.NewScheme()
147+
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
148+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
149+
}
150+
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
151+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
152+
}
153+
154+
clientOpts := ctrlruntimeclient.Options{Scheme: scheme}
155+
client, err := ctrlruntimeclient.New(initialRestConfig, clientOpts)
156+
if err != nil {
157+
return nil, fmt.Errorf("failed to create service reader: %w", err)
158+
}
159+
160+
se := &syncEndpoint{}
161+
162+
// When an endpoint ref is given, both the APIExportEndpointSlice and the APIExport must exist.
163+
if endpointSliceRef != "" {
164+
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
165+
if err != nil {
166+
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
167+
}
168+
endpointSlice.Config = initialRestConfig
169+
170+
// find the APIExport referenced not by the user (can't: both ref parameters to this function
171+
// are mutually exclusive), but in the APIExportEndpointSlice.
172+
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
173+
if err != nil {
174+
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
175+
}
176+
177+
client, err := ctrlruntimeclient.New(restConfig, clientOpts)
178+
if err != nil {
179+
return nil, fmt.Errorf("failed to create service reader: %w", err)
180+
}
181+
182+
apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
183+
if err != nil {
184+
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
185+
}
186+
apiExport.Config = restConfig
187+
188+
se.APIExport = apiExport
189+
se.EndpointSlice = &endpointSlice
190+
} else { // if an export ref is given, the endpoint slice is optional (for compat with kcp <0.28)
191+
apiExport, err := resolveAPIExport(ctx, client, apiExportRef)
192+
if err != nil {
193+
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
194+
}
195+
apiExport.Config = initialRestConfig
196+
197+
se.APIExport = apiExport
198+
199+
// try to find an endpoint slice in the same workspace with the same name as the APIExport
200+
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, apiExportRef)
201+
if ctrlruntimeclient.IgnoreNotFound(err) != nil {
202+
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
203+
} else if err == nil {
204+
apiExport.Config = initialRestConfig
205+
se.EndpointSlice = &endpointSlice
206+
}
207+
}
208+
209+
return se, nil
210+
}
211+
212+
func resolveAPIExportEndpointSlice(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExportEndpointSlice, error) {
213+
endpointSlice := &kcpdevv1alpha1.APIExportEndpointSlice{}
214+
key := types.NamespacedName{Name: ref}
215+
if err := client.Get(ctx, key, endpointSlice); err != nil {
216+
return qualifiedAPIExportEndpointSlice{}, fmt.Errorf("failed to get APIExportEndpointSlice %q: %w", ref, err)
217+
}
218+
219+
lcName, lcPath, err := resolveCurrentCluster(ctx, client)
220+
if err != nil {
221+
return qualifiedAPIExportEndpointSlice{}, fmt.Errorf("failed to resolve APIExportEndpointSlice cluster: %w", err)
222+
}
223+
224+
return qualifiedAPIExportEndpointSlice{
225+
APIExportEndpointSlice: endpointSlice,
226+
qualifiedCluster: qualifiedCluster{
227+
Cluster: lcName,
228+
Path: lcPath,
229+
},
230+
}, nil
231+
}
232+
233+
func resolveAPIExport(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExport, error) {
234+
apiExport := &kcpdevv1alpha1.APIExport{}
235+
key := types.NamespacedName{Name: ref}
236+
if err := client.Get(ctx, key, apiExport); err != nil {
237+
return qualifiedAPIExport{}, fmt.Errorf("failed to get APIExport %q: %w", ref, err)
238+
}
239+
240+
lcName, lcPath, err := resolveCurrentCluster(ctx, client)
241+
if err != nil {
242+
return qualifiedAPIExport{}, fmt.Errorf("failed to resolve APIExport cluster: %w", err)
243+
}
244+
245+
return qualifiedAPIExport{
246+
APIExport: apiExport,
247+
qualifiedCluster: qualifiedCluster{
248+
Cluster: lcName,
249+
Path: lcPath,
250+
},
251+
}, nil
252+
}
253+
254+
func resolveCurrentCluster(ctx context.Context, client ctrlruntimeclient.Client) (logicalcluster.Name, logicalcluster.Path, error) {
255+
lc := &kcpdevcorev1alpha1.LogicalCluster{}
256+
if err := client.Get(ctx, types.NamespacedName{Name: kcp.IdentityClusterName}, lc); err != nil {
257+
return "", logicalcluster.None, fmt.Errorf("failed to resolve current workspace: %w", err)
258+
}
259+
260+
lcName := logicalcluster.From(lc)
261+
lcPath := logicalcluster.NewPath(lc.Annotations[kcpdevcore.LogicalClusterPathAnnotationKey])
262+
263+
return lcName, lcPath, nil
264+
}
265+
266+
var clusterFinder = regexp.MustCompile(`/clusters/([^/]+)`)
267+
268+
func retargetRestConfig(cfg *rest.Config, destination string) (*rest.Config, error) {
269+
// no change desired (use current cluster implicitly)
270+
if destination == "" {
271+
return cfg, nil
272+
}
273+
274+
matches := clusterFinder.FindAllStringSubmatch(cfg.Host, -1)
275+
if len(matches) == 0 {
276+
return nil, errors.New("URL must point to a cluster/workspace")
277+
}
278+
if len(matches) > 1 {
279+
return nil, errors.New("invalid URL: URL contains more than one cluster path")
280+
}
281+
282+
current := matches[0][1]
283+
if current == destination {
284+
return cfg, nil
285+
}
286+
287+
newCluster := fmt.Sprintf("/clusters/%s", destination)
288+
289+
newConfig := rest.CopyConfig(cfg)
290+
newConfig.Host = clusterFinder.ReplaceAllString(cfg.Host, newCluster)
291+
292+
return newConfig, nil
293+
}

0 commit comments

Comments
 (0)