Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/getsentry/sentry-go v0.40.0 // indirect
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-logr/zerologr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
Expand Down Expand Up @@ -104,6 +105,8 @@ require (
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.1 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.46.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions internal/test/integration/cmd/kcp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"os"

setup "github.com/platform-mesh/security-operator/internal/test/integration"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func main() {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
log := ctrl.Log.WithName("kcpsetup")

ctx := ctrl.SetupSignalHandler()

if err := setup.KcpSetup(ctx, ""); err != nil {
log.Error(err, "failed to configure kcp")
os.Exit(1)
}

log.Info("kcp setup completed; starting manager")
if err := setup.RunPredicateManager(ctx, "", ctrl.Log.WithName("predicate")); err != nil {
log.Error(err, "manager exited with error")
os.Exit(1)
}
}
219 changes: 219 additions & 0 deletions internal/test/integration/kcp_setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package test

import (
"context"
_ "embed"
"fmt"
"net/url"
"os"
"strings"
"time"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

kcpapiv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
apisv1alpha2 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha2"
"github.com/kcp-dev/kcp/sdk/apis/core"
kcpcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1"
"github.com/kcp-dev/logicalcluster/v3"
clusterclient "github.com/kcp-dev/multicluster-provider/client"
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(clientgoscheme.Scheme))
utilruntime.Must(kcpapiv1alpha1.AddToScheme(clientgoscheme.Scheme))
utilruntime.Must(apisv1alpha2.AddToScheme(clientgoscheme.Scheme))
utilruntime.Must(kcpcorev1alpha1.AddToScheme(clientgoscheme.Scheme))
utilruntime.Must(tenancyv1alpha1.AddToScheme(clientgoscheme.Scheme))
}

var (
//go:embed yaml/apiresourceschema-accountinfos.core.platform-mesh.io.yaml
accountInfoSchemaYAML []byte

//go:embed yaml/apiresourceschema-accounts.core.platform-mesh.io.yaml
accountSchemaYAML []byte

//go:embed yaml/apiresourceschema-authorizationmodels.core.platform-mesh.io.yaml
authorizationModelSchemaYAML []byte

//go:embed yaml/apiresourceschema-stores.core.platform-mesh.io.yaml
storeSchemaYAML []byte

//go:embed yaml/apiexport-core.platform-mesh.io.yaml
apiExportPlatformMeshYAML []byte

//go:embed yaml/apibinding-core-platform-mesh.io.yaml
apiBindingCorePlatformMeshYAML []byte

//go:embed yaml/workspace-type-org.yaml
workspaceTypeOrgYAML []byte

//go:embed yaml/workspace-type-orgs.yaml
workspaceTypeOrgsYAML []byte

//go:embed yaml/workspace-type-account.yaml
workspaceTypeAccountYAML []byte
)

func KcpSetup(ctx context.Context, kubeconfig string) error {
cfg, err := loadKCPConfig(kubeconfig)
if err != nil {
return err
}

cli, err := clusterclient.New(cfg, client.Options{Scheme: clientgoscheme.Scheme})
if err != nil {
return fmt.Errorf("failed to build cluster client: %w", err)
}

rootPath := logicalcluster.NewPath("root")
pmsPath, err := ensureWorkspace(ctx, rootPath, "platform-mesh-system", nil, cli)
if err != nil {
return err
}

// Create APIResourceSchemas, APIExport, APIBinding in platform-mesh-system
pmsClient := cli.Cluster(pmsPath)
for _, schemaYAML := range [][]byte{accountInfoSchemaYAML, accountSchemaYAML, authorizationModelSchemaYAML, storeSchemaYAML} {
var schema kcpapiv1alpha1.APIResourceSchema
if err := yaml.Unmarshal(schemaYAML, &schema); err != nil {
return fmt.Errorf("failed to unmarshal APIResourceSchema: %w", err)
}
if err := pmsClient.Create(ctx, &schema); err != nil && !kerrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create APIResourceSchema/%s: %w", schema.Name, err)
}
fmt.Printf("applied APIResourceSchema/%s\n", schema.Name)
}

var exp kcpapiv1alpha1.APIExport
if err := yaml.Unmarshal(apiExportPlatformMeshYAML, &exp); err != nil {
return fmt.Errorf("failed to unmarshal APIExport: %w", err)
}
if err := pmsClient.Create(ctx, &exp); err != nil && !kerrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create APIExport/%s: %w", exp.Name, err)
}
fmt.Printf("applied APIExport/%s\n", exp.Name)

var b apisv1alpha2.APIBinding
if err := yaml.Unmarshal(apiBindingCorePlatformMeshYAML, &b); err != nil {
return fmt.Errorf("failed to unmarshal APIBinding: %w", err)
}
if err := pmsClient.Create(ctx, &b); err != nil && !kerrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create APIBinding/%s: %w", b.Name, err)
}
fmt.Printf("applied APIBinding/%s\n", b.Name)

rootClient := cli.Cluster(core.RootCluster.Path())

for _, wtYAML := range [][]byte{workspaceTypeOrgYAML, workspaceTypeOrgsYAML, workspaceTypeAccountYAML} {
var wt tenancyv1alpha1.WorkspaceType
if err := yaml.Unmarshal(wtYAML, &wt); err != nil {
return fmt.Errorf("failed to unmarshal WorkspaceType: %w", err)
}
if err := rootClient.Create(ctx, &wt); err != nil && !kerrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create WorkspaceType/%s: %w", wt.Name, err)
}
fmt.Printf("applied WorkspaceType/%s\n", wt.Name)
}

orgsPath, err := ensureWorkspace(ctx, rootPath, "orgs", &tenancyv1alpha1.WorkspaceTypeReference{
Name: "orgs",
Path: rootPath.String(),
}, cli)
if err != nil {
return err
}

// this workspace is needed to be selected by the predicate and trigger the reconcilation
_, err = ensureWorkspace(ctx, orgsPath, "test", &tenancyv1alpha1.WorkspaceTypeReference{
Name: "org",
Path: rootPath.String(),
}, cli)
if err != nil {
return err
}

// this workspace is needed to demostrate the reconcilation of the workspaces with the same hieracy level
_, err = ensureWorkspace(ctx, orgsPath, "no-reconcile-org", &tenancyv1alpha1.WorkspaceTypeReference{
Name: "org",
Path: rootPath.String(),
}, cli)
if err != nil {
return err
}

return nil
}

func loadKCPConfig(kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath != "" {
return nil, fmt.Errorf("explicit kubeconfig path is not supported; set KUBECONFIG instead")
}

kubeconfig := os.Getenv("KUBECONFIG")
if strings.TrimSpace(kubeconfig) == "" {
return nil, fmt.Errorf("KUBECONFIG is not set")
}

rawCfg, err := clientcmd.LoadFromFile(kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to load kubeconfig from %q: %w", kubeconfig, err)
}

cfg, err := clientcmd.NewDefaultClientConfig(*rawCfg, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to build rest config from %q: %w", kubeconfig, err)
}

parsed, err := url.Parse(cfg.Host)
if err != nil {
return nil, fmt.Errorf("failed to parse kubeconfig host %q: %w", cfg.Host, err)
}

if strings.HasPrefix(parsed.Path, "/clusters/") {
parsed.Path = ""
cfg = rest.CopyConfig(cfg)
cfg.Host = parsed.String()
}
return cfg, nil
}

func ensureWorkspace(ctx context.Context, parentPath logicalcluster.Path, name string, wsType *tenancyv1alpha1.WorkspaceTypeReference, cli clusterclient.ClusterClient) (logicalcluster.Path, error) {
wsPath := logicalcluster.NewPath(fmt.Sprintf("%s:%s", parentPath.String(), name))
ws := &tenancyv1alpha1.Workspace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: tenancyv1alpha1.WorkspaceSpec{},
}
if wsType != nil {
ws.Spec.Type = wsType
}

err := cli.Cluster(parentPath).Create(ctx, ws)
if err != nil && !kerrors.IsAlreadyExists(err) {
return wsPath, fmt.Errorf("failed to create workspace %s under %s: %w", name, parentPath, err)
}
fmt.Printf("workspace %s created (or existed) at %s\n", name, wsPath)

// Wait until the workspace is ready
for i := 0; i < 240; i++ {
var current tenancyv1alpha1.Workspace
getErr := cli.Cluster(parentPath).Get(ctx, client.ObjectKey{Name: name}, &current)
if getErr == nil && string(current.Status.Phase) == "Ready" {
return wsPath, nil
}
time.Sleep(500 * time.Millisecond)
}

return wsPath, nil
}
133 changes: 133 additions & 0 deletions internal/test/integration/predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package test

import (
"context"
"fmt"
"strings"

"github.com/go-logr/logr"
kcpapiv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
"github.com/kcp-dev/logicalcluster/v3"
"github.com/kcp-dev/multicluster-provider/apiexport"
clusterclient "github.com/kcp-dev/multicluster-provider/client"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
crpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"

kcpcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
)

func RunPredicateManager(ctx context.Context, kubeconfig string, log logr.Logger) error {
baseCfg, err := loadKCPConfig(kubeconfig)
if err != nil {
return err
}

sch := clientgoscheme.Scheme
cli, err := clusterclient.New(baseCfg, client.Options{Scheme: sch})
if err != nil {
return fmt.Errorf("failed to build admin client: %w", err)
}

platformMeshSystemPath := logicalcluster.NewPath("root:platform-mesh-system")

var endpointSlice kcpapiv1alpha1.APIExportEndpointSlice
exportName := "core.platform-mesh.io"
if err := cli.Cluster(platformMeshSystemPath).Get(ctx, client.ObjectKey{Name: exportName}, &endpointSlice); err != nil {
return fmt.Errorf("failed to get APIExportEndpointSlice %q in %s: %w", exportName, platformMeshSystemPath, err)
}

url := endpointSlice.Status.APIExportEndpoints[0].URL
log.Info("using APIExport virtual workspace endpoint", "url", url)

vwCfg := rest.CopyConfig(baseCfg)
vwCfg.Host = url

provider, err := apiexport.New(vwCfg, apiexport.Options{Scheme: sch})
if err != nil {
return fmt.Errorf("failed to create apiexport provider: %w", err)
}

mgr, err := mcmanager.New(vwCfg, provider, mcmanager.Options{Scheme: sch})
if err != nil {
return fmt.Errorf("failed to create multicluster manager: %w", err)
}

if err := (&LogicalClusterPredicateReconciler{log: log.WithName("logicalcluster-predicate"), mgr: mgr}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("failed to set up logicalcluster predicate controller: %w", err)
}

go func() {
if err := provider.Run(ctx, mgr); err != nil {
log.Error(err, "apiexport provider exited")
}
}()

log.Info("starting manager")
return mgr.Start(ctx)
}

var OnlyTestLogicalClusterPredicate crpredicate.Predicate = crpredicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return shouldReconcile(e.Object)
},
UpdateFunc: func(e event.UpdateEvent) bool {
return shouldReconcile(e.ObjectNew)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return shouldReconcile(e.Object)
},
GenericFunc: func(e event.GenericEvent) bool {
return shouldReconcile(e.Object)
},
}

// to demonstarate the unwilling reconcilation this predicate filter out any logical cluster which has no 'test' in owner name
func shouldReconcile(obj client.Object) bool {
lc, ok := obj.(*kcpcorev1alpha1.LogicalCluster)
if !ok {
ctrl.Log.WithName("logicalcluster-predicate").Info("predicate: skipping non-LogicalCluster object", "type", fmt.Sprintf("%T", obj))
return false
}

match := strings.Contains(lc.Spec.Owner.Name, "test")
ctrl.Log.WithName("logicalcluster-predicate").Info("predicate decision", "cluster", lc.Spec.Owner.Name, "should be reconciled ?", match)
return match
}

type LogicalClusterPredicateReconciler struct {
log logr.Logger
mgr mcmanager.Manager
}

func (r *LogicalClusterPredicateReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) {
m, err := r.mgr.GetManager(ctx, req.ClusterName)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get manager for %q: %w", req.ClusterName, err)
}
var lc kcpcorev1alpha1.LogicalCluster
if err := m.GetClient().Get(ctx, client.ObjectKey{Name: kcpcorev1alpha1.LogicalClusterName}, &lc); err != nil {
return ctrl.Result{}, err
}

r.log.Info("reconciled LogicalCluster",
"clusterKey", req.ClusterName,
"phase", string(lc.Status.Phase),
"ownerName", lc.Spec.Owner.Name,
"ownerCluster", lc.Spec.Owner.Cluster,
)
return ctrl.Result{}, nil
}

func (r *LogicalClusterPredicateReconciler) SetupWithManager(mgr mcmanager.Manager) error {
return mcbuilder.ControllerManagedBy(mgr).
Named("logicalcluster_predicate").
For(&kcpcorev1alpha1.LogicalCluster{}, mcbuilder.WithPredicates(OnlyTestLogicalClusterPredicate)).
Complete(r)
}
Loading
Loading