Skip to content
This repository was archived by the owner on Aug 28, 2025. It is now read-only.

Commit 3d2aaf4

Browse files
authored
Feat: Listener: Add support for standard k8s clusters (#52)
* listener: add support for standard k8s clusters * review: add context that times out for fetching apiexports
1 parent 729bbc8 commit 3d2aaf4

File tree

6 files changed

+292
-75
lines changed

6 files changed

+292
-75
lines changed

cmd/listen.go

Lines changed: 31 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,29 @@
11
package cmd
22

33
import (
4-
"context"
54
"crypto/tls"
6-
"errors"
7-
"net/url"
85
"os"
96

107
kcpapis "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
118
kcpcore "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
129
kcptenancy "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1"
13-
"github.com/openmfp/crd-gql-gateway/listener/clusterpath"
14-
"github.com/openmfp/crd-gql-gateway/listener/flags"
15-
"github.com/rs/zerolog/log"
10+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
11+
1612
"github.com/spf13/cobra"
17-
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
13+
1814
"k8s.io/apimachinery/pkg/runtime"
1915
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20-
kcpctrl "sigs.k8s.io/controller-runtime/pkg/kcp"
2116

2217
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
23-
"k8s.io/client-go/rest"
2418
ctrl "sigs.k8s.io/controller-runtime"
25-
"sigs.k8s.io/controller-runtime/pkg/client"
2619
"sigs.k8s.io/controller-runtime/pkg/healthz"
2720
"sigs.k8s.io/controller-runtime/pkg/log/zap"
2821
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
2922
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3023
"sigs.k8s.io/controller-runtime/pkg/webhook"
3124

32-
"github.com/openmfp/crd-gql-gateway/listener/apischema"
33-
"github.com/openmfp/crd-gql-gateway/listener/controller"
34-
"github.com/openmfp/crd-gql-gateway/listener/discoveryclient"
35-
"github.com/openmfp/crd-gql-gateway/listener/workspacefile"
25+
"github.com/openmfp/crd-gql-gateway/listener/flags"
26+
"github.com/openmfp/crd-gql-gateway/listener/kcp"
3627
// +kubebuilder:scaffold:imports
3728
)
3829

@@ -50,27 +41,28 @@ var (
5041

5142
var listenCmd = &cobra.Command{
5243
Use: "listen",
53-
Example: "KUBECONFIG=.kcp/admin.kubeconfig go run . listen",
44+
Example: "KUBECONFIG=<path to kubeconfig file> go run . listen",
5445
PreRun: func(cmd *cobra.Command, args []string) {
5546
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
5647

5748
utilruntime.Must(kcpapis.AddToScheme(scheme))
5849
utilruntime.Must(kcpcore.AddToScheme(scheme))
5950
utilruntime.Must(kcptenancy.AddToScheme(scheme))
60-
utilruntime.Must(apiextensions.AddToScheme(scheme))
51+
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
6152
// +kubebuilder:scaffold:scheme
6253

63-
var err error
64-
opFlags, err = flags.NewFromEnv()
65-
if err != nil {
66-
log.Fatal().Err(err).Msg("Error getting app restCfg, exiting")
67-
}
6854
opts := zap.Options{
6955
Development: true,
7056
}
71-
7257
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
7358

59+
var err error
60+
opFlags, err = flags.NewFromEnv()
61+
if err != nil {
62+
setupLog.Error(err, "failed to get operator flags from env, exiting...")
63+
os.Exit(1)
64+
}
65+
7466
disableHTTP2 := func(c *tls.Config) {
7567
setupLog.Info("disabling http/2")
7668
c.NextProtos = []string{"http/1.1"}
@@ -97,75 +89,40 @@ var listenCmd = &cobra.Command{
9789
},
9890
Run: func(cmd *cobra.Command, args []string) {
9991
cfg := ctrl.GetConfigOrDie()
100-
cfgURL, err := url.Parse(cfg.Host)
101-
if err != nil {
102-
setupLog.Error(err, "failed to parse config Host")
103-
os.Exit(1)
104-
}
105-
clt, err := client.New(cfg, client.Options{
106-
Scheme: scheme,
107-
})
108-
if err != nil {
109-
setupLog.Error(err, "failed to create client from config")
110-
os.Exit(1)
111-
}
112-
tenancyAPIExport := &kcpapis.APIExport{}
113-
err = clt.Get(context.TODO(), client.ObjectKey{Name: kcptenancy.SchemeGroupVersion.Group}, tenancyAPIExport)
114-
if err != nil {
115-
setupLog.Error(err, "failed to get tenancy APIExport")
116-
os.Exit(1)
117-
}
118-
virtualWorkspaces := tenancyAPIExport.Status.VirtualWorkspaces // nolint: staticcheck
119-
if len(virtualWorkspaces) == 0 {
120-
err := errors.New("empty virtual workspace list")
121-
setupLog.Error(err, "failed to get at least one virtual workspace")
122-
os.Exit(1)
123-
}
124-
vwCFGURL, err := url.Parse(virtualWorkspaces[0].URL)
125-
if err != nil {
126-
setupLog.Error(err, "failed to parse virtual workspace config URL")
127-
os.Exit(1)
128-
}
129-
cfgURL.Path = vwCFGURL.Path
130-
virtualWorkspaceCfg := rest.CopyConfig(cfg)
131-
virtualWorkspaceCfg.Host = cfgURL.String()
13292

133-
mgr, err := kcpctrl.NewClusterAwareManager(virtualWorkspaceCfg, ctrl.Options{
93+
mgrOpts := ctrl.Options{
13494
Scheme: scheme,
13595
Metrics: metricsServerOptions,
13696
WebhookServer: webhookServer,
13797
HealthProbeBindAddress: opFlags.ProbeAddr,
13898
LeaderElection: opFlags.EnableLeaderElection,
13999
LeaderElectionID: "72231e1f.openmfp.io",
140-
})
100+
}
101+
102+
newMgrFunc := kcp.ManagerFactory(opFlags)
103+
104+
mgr, err := newMgrFunc(cfg, mgrOpts)
141105
if err != nil {
142106
setupLog.Error(err, "unable to start manager")
143107
os.Exit(1)
144108
}
145109

146-
ioHandler, err := workspacefile.NewIOHandler(opFlags.OpenAPIdefinitionsPath)
147-
if err != nil {
148-
setupLog.Error(err, "failed to create IO Handler")
149-
os.Exit(1)
110+
reconcilerOpts := kcp.ReconcilerOpts{
111+
Scheme: scheme,
112+
Config: cfg,
113+
OpenAPIDefinitionsPath: opFlags.OpenAPIdefinitionsPath,
150114
}
151115

152-
df, err := discoveryclient.NewFactory(virtualWorkspaceCfg)
116+
newReconcilerFunc := kcp.ReconcilerFactory(opFlags)
117+
118+
reconciler, err := newReconcilerFunc(reconcilerOpts)
153119
if err != nil {
154-
setupLog.Error(err, "failed to create Discovery client factory")
120+
setupLog.Error(err, "unable to instantiate reconciler")
155121
os.Exit(1)
156122
}
157123

158-
reconciler := controller.NewAPIBindingReconciler(
159-
ioHandler, df, apischema.NewResolver(), &clusterpath.Resolver{
160-
Scheme: mgr.GetScheme(),
161-
Config: cfg,
162-
ResolverFunc: clusterpath.Resolve,
163-
},
164-
)
165-
166-
err = reconciler.SetupWithManager(mgr)
167-
if err != nil {
168-
setupLog.Error(err, "unable to create controller", "controller", "Workspace")
124+
if err := reconciler.SetupWithManager(mgr); err != nil {
125+
setupLog.Error(err, "unable to create controller")
169126
os.Exit(1)
170127
}
171128
// +kubebuilder:scaffold:builder
@@ -181,8 +138,7 @@ var listenCmd = &cobra.Command{
181138

182139
setupLog.Info("starting manager")
183140
signalHandler := ctrl.SetupSignalHandler()
184-
err = mgr.Start(signalHandler)
185-
if err != nil {
141+
if err := mgr.Start(signalHandler); err != nil {
186142
setupLog.Error(err, "problem running manager")
187143
os.Exit(1)
188144
}

listener/controller/crd_controller.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package controller
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
8+
"io/fs"
9+
10+
"github.com/openmfp/crd-gql-gateway/listener/apischema"
11+
"github.com/openmfp/crd-gql-gateway/listener/workspacefile"
12+
"k8s.io/client-go/discovery"
13+
14+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
15+
ctrl "sigs.k8s.io/controller-runtime"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
"sigs.k8s.io/controller-runtime/pkg/log"
18+
)
19+
20+
// CRDReconciler reconciles a CustomResourceDefinition object
21+
type CRDReconciler struct {
22+
ClusterName string
23+
client.Client
24+
*discovery.DiscoveryClient
25+
io workspacefile.IOHandler
26+
sc apischema.Resolver
27+
}
28+
29+
func NewCRDReconciler(name string,
30+
clt client.Client,
31+
dc *discovery.DiscoveryClient,
32+
io workspacefile.IOHandler,
33+
sc apischema.Resolver,
34+
) *CRDReconciler {
35+
return &CRDReconciler{
36+
ClusterName: name,
37+
Client: clt,
38+
DiscoveryClient: dc,
39+
io: io,
40+
sc: sc,
41+
}
42+
}
43+
44+
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinition,verbs=get;list;watch
45+
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinition/status,verbs=get
46+
func (r *CRDReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
47+
48+
logger := log.FromContext(ctx).WithValues("cluster", r.ClusterName).WithName(req.Name)
49+
logger.Info("starting reconciliation...")
50+
51+
crd := &apiextensionsv1.CustomResourceDefinition{}
52+
if err := r.Client.Get(ctx, req.NamespacedName, crd); client.IgnoreNotFound(err) != nil {
53+
logger.Error(err, "failed to get reconciled object")
54+
return ctrl.Result{}, err
55+
}
56+
57+
savedJSON, err := r.io.Read(r.ClusterName)
58+
if errors.Is(err, fs.ErrNotExist) {
59+
actualJSON, err1 := r.sc.Resolve(r.DiscoveryClient)
60+
if err1 != nil {
61+
logger.Error(err1, "failed to resolve server JSON schema")
62+
return ctrl.Result{}, err1
63+
}
64+
if err := r.io.Write(actualJSON, r.ClusterName); err != nil {
65+
logger.Error(err, "failed to write JSON to filesystem")
66+
return ctrl.Result{}, err
67+
}
68+
return ctrl.Result{}, nil
69+
}
70+
71+
if err != nil {
72+
logger.Error(err, "failed to read JSON from filesystem")
73+
return ctrl.Result{}, err
74+
}
75+
76+
actualJSON, err := r.sc.Resolve(r.DiscoveryClient)
77+
if err != nil {
78+
logger.Error(err, "failed to resolve server JSON schema")
79+
return ctrl.Result{}, err
80+
}
81+
if !bytes.Equal(actualJSON, savedJSON) {
82+
if err := r.io.Write(actualJSON, r.ClusterName); err != nil {
83+
logger.Error(err, "failed to write JSON to filesystem")
84+
return ctrl.Result{}, err
85+
}
86+
}
87+
88+
return ctrl.Result{}, nil
89+
}
90+
91+
// SetupWithManager sets up the controller with the Manager.
92+
func (r *CRDReconciler) SetupWithManager(mgr ctrl.Manager) error {
93+
return ctrl.NewControllerManagedBy(mgr).
94+
For(&apiextensionsv1.CustomResourceDefinition{}).
95+
Named("CRD").
96+
Complete(r)
97+
}

listener/flags/operator_flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Flags struct {
1111
SecureMetrics bool `envconfig:"default=true,optional"`
1212
EnableHTTP2 bool `envconfig:"default=false,optional"`
1313
OpenAPIdefinitionsPath string `envconfig:"OPEN_API_DEFINITIONS_PATH,default=./bin/definitions"`
14+
EnableKcp bool `envconfig:"default=true,optional"`
1415
}
1516

1617
func NewFromEnv() (*Flags, error) {

listener/kcp/manager_factory.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package kcp
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/openmfp/crd-gql-gateway/listener/flags"
7+
"k8s.io/client-go/rest"
8+
ctrl "sigs.k8s.io/controller-runtime"
9+
kcpctrl "sigs.k8s.io/controller-runtime/pkg/kcp"
10+
"sigs.k8s.io/controller-runtime/pkg/manager"
11+
)
12+
13+
type NewManagerFunc func(cfg *rest.Config, opts ctrl.Options) (manager.Manager, error)
14+
15+
func ManagerFactory(opFlags *flags.Flags) NewManagerFunc {
16+
if opFlags.EnableKcp {
17+
return NewKcpManager
18+
}
19+
return ctrl.NewManager
20+
}
21+
22+
func NewKcpManager(cfg *rest.Config, opts ctrl.Options) (manager.Manager, error) {
23+
virtualWorkspaceCfg, err := virtualWorkspaceConfigFromCfg(cfg, opts.Scheme)
24+
if err != nil {
25+
return nil, fmt.Errorf("unable to get virtual workspace config: %w", err)
26+
}
27+
mgr, err := kcpctrl.NewClusterAwareManager(virtualWorkspaceCfg, opts)
28+
if err != nil {
29+
return nil, fmt.Errorf("unable to instantiate manager: %w", err)
30+
}
31+
return mgr, nil
32+
}

0 commit comments

Comments
 (0)