diff --git a/examples/sharded-namespace/Dockerfile b/examples/sharded-namespace/Dockerfile new file mode 100644 index 0000000..63b80f9 --- /dev/null +++ b/examples/sharded-namespace/Dockerfile @@ -0,0 +1,21 @@ +# examples/sharded-namespace/Dockerfile +# Multi-stage to keep image slim +FROM golang:1.24 as build + +WORKDIR /src +# Copy go.mod/sum from repo root so deps resolve; adjust paths if needed. +COPY go.mod go.sum ./ +RUN go mod download + +# Copy the whole repo to compile the example against local packages +COPY . . + +# Build the example +WORKDIR /src/examples/sharded-namespace +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/sharded-example main.go + +FROM gcr.io/distroless/static:nonroot +WORKDIR / +COPY --from=build /out/sharded-example /sharded-example +USER 65532:65532 +ENTRYPOINT ["/sharded-example"] diff --git a/examples/sharded-namespace/README.md b/examples/sharded-namespace/README.md new file mode 100644 index 0000000..82537da --- /dev/null +++ b/examples/sharded-namespace/README.md @@ -0,0 +1,137 @@ +# Sharded Namespace Example + +This demo runs two replicas of a multicluster manager that **splits ownership** of downstream "clusters" discovered by the *namespace provider* (each Kubernetes Namespace == one cluster). Synchronization is decided by HRW hashing across peers, then serialized with per-cluster fencing Leases so exactly one peer reconciles a given cluster at a time. + +**Key Components:** +- **Peer membership (presence)**: `coordination.k8s.io/Lease` with prefix `mcr-peer-*` +- **Per-cluster fencing (synchronization)**: `coordination.k8s.io/Lease` with prefix `mcr-shard-` + +Controllers attach per-cluster watches when synchronization starts, and cleanly detach & re-attach when it transfers. + +## Build the image + +From repo root: + +```bash +docker build -t mcr-namespace:dev -f examples/sharded-namespace/Dockerfile . +``` + +If using KinD: +```bash +kind create cluster --name mcr-demo +kind load docker-image mcr-namespace:dev --name mcr-demo +``` + +## Deploy + +```bash +kubectl apply -k examples/sharded-namespace/manifests +``` + +## Observe + +Tail logs from pods: + +```bash +kubectl -n mcr-demo get pods +kubectl -n mcr-demo logs statefulset/sharded-namespace -f +``` + +You should see lines like: +```bash +"synchronization start {"cluster": "zoo", "peer": "sharded-namespace-0"}" +"synchronization start {"cluster": "jungle", "peer": "sharded-namespace-1"}" + +Reconciling ConfigMap {"controller": "multicluster-configmaps", "controllerGroup": "", "controllerKind": "ConfigMap", "reconcileID": "4f1116b3-b5 │ +│ 4e-4e6a-b84f-670ca5cfc9ce", "cluster": "zoo", "ns": "default", "name": "elephant"} + +Reconciling ConfigMap {"controller": "multicluster-configmaps", "controllerGroup": "", "controllerKind": "ConfigMap", "reconcileID": "688b8467-f5 │ +│ d3-491b-989e-87bc8aad780e", "cluster": "jungle", "ns": "default", "name": "monkey"} +``` + +Check Leases: +```bash +# Peer membership (one per pod) +kubectl -n kube-system get lease | grep '^mcr-peer-' + +# Per-cluster fencing (one per namespace/"cluster") +kubectl -n kube-system get lease | grep '^mcr-shard-' +``` + +Who owns a given cluster? +```bash +C=zoo +kubectl -n kube-system get lease mcr-shard-$C \ + -o custom-columns=HOLDER:.spec.holderIdentity,RENEW:.spec.renewTime +``` + + +## Test Synchronization + +Scale down to 1 replica and watch synchronization consolidate: +```bash +# Scale down +kubectl -n mcr-demo scale statefulset/sharded-namespace --replicas=1 + + +# Watch leases lose their holders as pods terminate +kubectl -n kube-system get lease -o custom-columns=NAME:.metadata.name,HOLDER:.spec.holderIdentity | grep "^mcr-shard-" + +# Wait for all clusters to be owned by the single remaining pod (~30s) +kubectl -n kube-system wait --for=jsonpath='{.spec.holderIdentity}'=sharded-namespace-0 \ + lease/mcr-shard-zoo lease/mcr-shard-jungle lease/mcr-shard-island --timeout=60s + +``` +Create/patch a ConfigMap and confirm the single owner reconciles it: +```bash +# Pick a cluster and create a test ConfigMap +C=island +kubectl -n "$C" create cm test-$(date +%s) --from-literal=ts=$(date +%s) --dry-run=client -oyaml | kubectl apply -f - + +# Verify only pod-q reconciles it (since it owns everything now) +kubectl -n mcr-demo logs pod/sharded-namespace-0 --since=100s | grep "Reconciling ConfigMap.*$C" +``` + +Scale up to 3 replicas and watch synchronization rebalance: +```bash +# Scale up +kubectl -n mcr-demo scale statefulset/sharded-namespace --replicas=3 +# Watch leases regain holders as pods start +kubectl -n kube-system get lease -o custom-columns=NAME:.metadata.name,HOLDER:.spec.holderIdentity | grep "^mcr-shard-" + +# Create a cm in the default ns which belongs to sharded-namespace-2 +C=default +kubectl -n "$C" create cm test-$(date +%s) --from-literal=ts=$(date +%s) --dry-run=client -oyaml | kubectl apply -f - +# Verify only pod-2 reconciles it (since it owns the default ns now) +kubectl -n mcr-demo logs pod/sharded-namespace-2 --since=100s | grep "Reconciling ConfigMap.*$C" +``` + +## Tuning +In your example app (e.g., examples/sharded-namespace/main.go), configure fencing and timings: + +```go +mgr, err := mcmanager.New(cfg, provider, manager.Options{}, + // Per-cluster fencing Leases live here as mcr-shard- + mcmanager.WithShardLease("kube-system", "mcr-shard"), + mcmanager.WithPerClusterLease(true), // enabled by default + + // Optional: tune fencing timings (duration, renew, throttle): + // mcmanager.WithLeaseTimings(30*time.Second, 10*time.Second, 750*time.Millisecond), + + // Optional: peer weight for HRW: + // mcmanager.WithPeerWeight(1), +) +if err != nil { + // handle error +} +``` + +The peer registry uses mcr-peer-* automatically and derives the peer ID from the pod hostname (StatefulSet ordinal). + +## Cleanup + +```bash +kubectl delete -k examples/sharded-namespace/manifests +kind delete cluster --name mcr-demo + +``` \ No newline at end of file diff --git a/examples/sharded-namespace/main.go b/examples/sharded-namespace/main.go new file mode 100644 index 0000000..f5c966c --- /dev/null +++ b/examples/sharded-namespace/main.go @@ -0,0 +1,120 @@ +// examples/sharded-namespace/main.go +package main + +import ( + "context" + "errors" + "fmt" + "os" + + "golang.org/x/sync/errgroup" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + "sigs.k8s.io/multicluster-runtime/providers/namespace" +) + +func main() { + klog.Background() // ensure klog initialized + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + log := ctrl.Log.WithName("sharded-example") + + ctx := ctrl.SetupSignalHandler() + + if err := run(ctx); err != nil { + log.Error(err, "exiting") + os.Exit(1) + } +} + +func run(ctx context.Context) error { + // use in-cluster config; fall back to default loading rules for local runs + cfg, err := ctrl.GetConfig() + if err != nil { + return fmt.Errorf("get kubeconfig: %w", err) + } + + // Provider: treats namespaces in the host cluster as “downstream clusters”. + host, err := cluster.New(cfg) + if err != nil { + return fmt.Errorf("create host cluster: %w", err) + } + provider := namespace.New(host) + + // Multicluster manager (no peer ID passed; pod hostname becomes peer ID). + // Configure sharding: + // - fencing prefix: "mcr-shard" (per-cluster Lease names become mcr-shard-) + // - peer membership still uses "mcr-peer" internally (set in WithMultiCluster) + // Peer ID defaults to os.Hostname(). + mgr, err := mcmanager.New(cfg, provider, manager.Options{}, + mcmanager.WithShardLease("kube-system", "mcr-shard"), + // optional but explicit (your manager already defaults this to true) + mcmanager.WithPerClusterLease(true), + ) + if err != nil { + return fmt.Errorf("create mc manager: %w", err) + } + + // A simple controller that logs ConfigMaps per owned “cluster” (namespace). + if err := mcbuilder.ControllerManagedBy(mgr). + Named("multicluster-configmaps"). + For(&corev1.ConfigMap{}). + Complete(mcreconcile.Func(func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { + // attach cluster once; don't repeat it in Info() + l := ctrl.LoggerFrom(ctx).WithValues("cluster", req.ClusterName) + + // get the right cluster client + cl, err := mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return ctrl.Result{}, err + } + + // fetch the object, then log from the object (truth) + cm := &corev1.ConfigMap{} + if err := cl.GetClient().Get(ctx, req.Request.NamespacedName, cm); err != nil { + if apierrors.IsNotFound(err) { + // object vanished — nothing to do + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // now cm.Namespace is accurate (e.g., "zoo", "kube-system", etc.) + l.Info("Reconciling ConfigMap", + "ns", cm.GetNamespace(), + "name", cm.GetName(), + ) + + // show which peer handled it (pod hostname) + if host, _ := os.Hostname(); host != "" { + l.Info("Handled by peer", "peer", host) + } + return ctrl.Result{}, nil + })); err != nil { + return fmt.Errorf("build controller: %w", err) + } + + // Start everything + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { return ignoreCanceled(provider.Run(ctx, mgr)) }) + g.Go(func() error { return ignoreCanceled(host.Start(ctx)) }) + g.Go(func() error { return ignoreCanceled(mgr.Start(ctx)) }) + return g.Wait() +} + +func ignoreCanceled(err error) error { + if errors.Is(err, context.Canceled) { + return nil + } + return err +} diff --git a/examples/sharded-namespace/manifests/kustomization.yaml b/examples/sharded-namespace/manifests/kustomization.yaml new file mode 100644 index 0000000..5b6504b --- /dev/null +++ b/examples/sharded-namespace/manifests/kustomization.yaml @@ -0,0 +1,6 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: + - rbac.yaml + - statefulset.yaml + - sample-data.yaml diff --git a/examples/sharded-namespace/manifests/rbac.yaml b/examples/sharded-namespace/manifests/rbac.yaml new file mode 100644 index 0000000..7157294 --- /dev/null +++ b/examples/sharded-namespace/manifests/rbac.yaml @@ -0,0 +1,56 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: mcr-demo +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: mcr-demo + namespace: mcr-demo +--- +# Allow reading/writing peer Leases in kube-system +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: mcr-lease +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get","list","watch","create","update","patch","delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: mcr-lease +subjects: +- kind: ServiceAccount + name: mcr-demo + namespace: mcr-demo +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: mcr-lease +--- +# Allow the example to read namespaces/configmaps (namespace provider + demo controller) +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: mcr-example +rules: +- apiGroups: [""] + resources: ["namespaces","configmaps"] + verbs: ["get","list","watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: mcr-example +subjects: +- kind: ServiceAccount + name: mcr-demo + namespace: mcr-demo +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: mcr-example diff --git a/examples/sharded-namespace/manifests/sample-data.yaml b/examples/sharded-namespace/manifests/sample-data.yaml new file mode 100644 index 0000000..6c542bb --- /dev/null +++ b/examples/sharded-namespace/manifests/sample-data.yaml @@ -0,0 +1,42 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: zoo +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: elephant + namespace: zoo +data: { a: "1" } +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: lion + namespace: zoo +data: { a: "1" } +--- +apiVersion: v1 +kind: Namespace +metadata: + name: jungle +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: monkey + namespace: jungle +data: { a: "1" } +--- +apiVersion: v1 +kind: Namespace +metadata: + name: island +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: bird + namespace: island +data: { a: "1" } diff --git a/examples/sharded-namespace/manifests/statefulset.yaml b/examples/sharded-namespace/manifests/statefulset.yaml new file mode 100644 index 0000000..09c6a3d --- /dev/null +++ b/examples/sharded-namespace/manifests/statefulset.yaml @@ -0,0 +1,22 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: sharded-namespace + namespace: mcr-demo +spec: + serviceName: sharded-namespace + replicas: 2 + selector: + matchLabels: + app: sharded-namespace + template: + metadata: + labels: + app: sharded-namespace + spec: + serviceAccountName: mcr-demo + containers: + - name: app + image: mcr-namespace:dev # change to your image name + imagePullPolicy: IfNotPresent + # No env needed; hostname == pod name → unique peer ID automatically diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a8ce31a..b24f768 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -91,7 +91,7 @@ func NewTypedUnmanaged[request mcreconcile.ClusterAware[request]](name string, m } return &mcController[request]{ TypedController: c, - clusters: make(map[string]engagedCluster), + clusters: make(map[string]*engagedCluster), }, nil } @@ -101,28 +101,39 @@ type mcController[request mcreconcile.ClusterAware[request]] struct { controller.TypedController[request] lock sync.Mutex - clusters map[string]engagedCluster + clusters map[string]*engagedCluster sources []mcsource.TypedSource[client.Object, request] } type engagedCluster struct { name string cluster cluster.Cluster + ctx context.Context + cancel context.CancelFunc } func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error { c.lock.Lock() defer c.lock.Unlock() - if old, ok := c.clusters[name]; ok && old.cluster == cl { - return nil + // Check if we already have this cluster engaged with the SAME context + if old, ok := c.clusters[name]; ok { + if old.cluster == cl && old.ctx.Err() == nil { + // Same impl, engagement still live → nothing to do + return nil + } + // Re-engage: either old ctx is done, or impl changed. Stop the old one if still live. + if old.ctx.Err() == nil { + old.cancel() + } + delete(c.clusters, name) } - ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is called in the error case only. + engCtx, cancel := context.WithCancel(ctx) // pass through in case the controller itself is cluster aware if ctrl, ok := c.TypedController.(multicluster.Aware); ok { - if err := ctrl.Engage(ctx, name, cl); err != nil { + if err := ctrl.Engage(engCtx, name, cl); err != nil { cancel() return err } @@ -135,49 +146,49 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus cancel() return fmt.Errorf("failed to engage for cluster %q: %w", name, err) } - if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil { + if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil { cancel() return fmt.Errorf("failed to watch for cluster %q: %w", name, err) } } - ec := engagedCluster{ + ec := &engagedCluster{ name: name, cluster: cl, + ctx: engCtx, + cancel: cancel, } c.clusters[name] = ec - go func() { + go func(ctx context.Context, key string, token *engagedCluster) { + <-ctx.Done() c.lock.Lock() defer c.lock.Unlock() - if c.clusters[name] == ec { - delete(c.clusters, name) + if cur, ok := c.clusters[key]; ok && cur == token { + delete(c.clusters, key) } - }() + // note: cancel() is driven by parent; no need to call here + }(engCtx, name, ec) - return nil //nolint:govet // cancel is called in the error case only. + return nil } func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error { c.lock.Lock() defer c.lock.Unlock() - ctx, cancel := context.WithCancel(context.Background()) //nolint:govet // cancel is called in the error case only. - for name, eng := range c.clusters { src, err := src.ForCluster(name, eng.cluster) if err != nil { - cancel() return fmt.Errorf("failed to engage for cluster %q: %w", name, err) } - if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil { - cancel() + if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil { return fmt.Errorf("failed to watch for cluster %q: %w", name, err) } } c.sources = append(c.sources, src) - return nil //nolint:govet // cancel is called in the error case only. + return nil } func startWithinContext[request mcreconcile.ClusterAware[request]](ctx context.Context, src source.TypedSource[request]) source.TypedSource[request] { diff --git a/pkg/manager/leaseguard.go b/pkg/manager/leaseguard.go new file mode 100644 index 0000000..563e3cd --- /dev/null +++ b/pkg/manager/leaseguard.go @@ -0,0 +1,207 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// leaseGuard manages a single Lease as a *fence* for one shard/cluster. +// +// Design/semantics: +// +// - TryAcquire(ctx) attempts to create/adopt the Lease for holder `id`, +// and, on success, starts a background renew loop. It is idempotent: +// calling it while already held is a cheap true. +// +// - Renewing: a ticker renews the Lease every `renew`. If renewal fails +// (API error, conflict, or we observe another holder while still valid), +// `onLost` is invoked once (best-effort), the fence is released, and the +// loop exits. +// +// - Release(ctx) stops renewing and, if we still own the Lease, clears +// HolderIdentity (best-effort). Safe to call multiple times. +// +// - Thread-safety: leaseGuard is intended to be used by a single goroutine +// (caller) per fence. External synchronization is required if multiple +// goroutines might call its methods concurrently. +// +// - Timings: choose ldur (duration) > renew. A common pattern is +// renew ≈ ldur/3. Too small ldur increases churn; too large slows +// failover. +// +// RBAC: the caller’s client must be allowed to get/list/watch/create/update/patch +// Leases in namespace. +type leaseGuard struct { + client client.Client + namespace string + name string + id string + ldur time.Duration // lease duration + renew time.Duration // renew period + onLost func() // callback when we lose the lease + + held bool + cancel context.CancelFunc +} + +// newLeaseGuard builds a guard; it does not contact the API server. +func newLeaseGuard(client client.Client, namespace, name, id string, ldur, renew time.Duration, onLost func()) *leaseGuard { + return &leaseGuard{client: client, namespace: namespace, name: name, id: id, ldur: ldur, renew: renew, onLost: onLost} +} + +// TryAcquire creates/adopts the Lease for g.id and starts renewing it. +// Returns true if we own it after this call (or already owned). +// Fails (returns false) when another non-expired holder exists or API calls error. +func (g *leaseGuard) TryAcquire(ctx context.Context) bool { + if g.held { + return true + } + + key := types.NamespacedName{Namespace: g.namespace, Name: g.name} + now := metav1.NowMicro() + + ldurSec := int32(g.ldur / time.Second) + + var ls coordinationv1.Lease + err := g.client.Get(ctx, key, &ls) + switch { + case apierrors.IsNotFound(err): + ls = coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{Namespace: g.namespace, Name: g.name}, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &g.id, + LeaseDurationSeconds: &ldurSec, + AcquireTime: &now, + RenewTime: &now, + }, + } + if err := g.client.Create(ctx, &ls); err != nil { + return false + } + case err != nil: + return false + default: + // adopt if free/expired/ours + ho := "" + if ls.Spec.HolderIdentity != nil { + ho = *ls.Spec.HolderIdentity + } + if ho != "" && ho != g.id { + if !expired(&ls, now) { + return false + } + } + ls.Spec.HolderIdentity = &g.id + ls.Spec.LeaseDurationSeconds = &ldurSec + // keep first AcquireTime if already ours, otherwise set it + if ho != g.id || ls.Spec.AcquireTime == nil { + ls.Spec.AcquireTime = &now + } + ls.Spec.RenewTime = &now + if err := g.client.Update(ctx, &ls); err != nil { + return false + } + } + + // we own it; start renewer + rctx, cancel := context.WithCancel(context.Background()) + g.cancel = cancel + g.held = true + go g.renewLoop(rctx, key) + return true +} + +// Internal: renew loop and single-step renew. If renewal observes a different, +// valid holder or API errors persist, the fence is released first and onLost() is invoked once. +func (g *leaseGuard) renewLoop(ctx context.Context, key types.NamespacedName) { + t := time.NewTicker(g.renew) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if ok := g.renewOnce(ctx, key); !ok { + // best-effort: release, then notify once + g.Release(context.Background()) + if g.onLost != nil { + g.onLost() + } + return + } + } + } +} + +func (g *leaseGuard) renewOnce(ctx context.Context, key types.NamespacedName) bool { + now := metav1.NowMicro() + var ls coordinationv1.Lease + if err := g.client.Get(ctx, key, &ls); err != nil { + return false + } + // another holder? + if ls.Spec.HolderIdentity != nil && *ls.Spec.HolderIdentity != g.id && !expired(&ls, now) { + return false + } + // update + ldurSec := int32(g.ldur / time.Second) + ls.Spec.HolderIdentity = &g.id + ls.Spec.LeaseDurationSeconds = &ldurSec + ls.Spec.RenewTime = &now + if err := g.client.Update(ctx, &ls); err != nil { + return false + } + return true +} + +// Release stops renewing; best-effort clear if we still own it. +func (g *leaseGuard) Release(ctx context.Context) { + if !g.held { + return + } + if g.cancel != nil { + g.cancel() + } + g.held = false + + key := types.NamespacedName{Namespace: g.namespace, Name: g.name} + var ls coordinationv1.Lease + if err := g.client.Get(ctx, key, &ls); err == nil { + if ls.Spec.HolderIdentity != nil && *ls.Spec.HolderIdentity == g.id { + empty := "" + ls.Spec.HolderIdentity = &empty + // keep RenewTime/AcquireTime; just clear holder + _ = g.client.Update(ctx, &ls) // ignore errors + } + } +} + +func expired(ls *coordinationv1.Lease, now metav1.MicroTime) bool { + if ls.Spec.RenewTime == nil || ls.Spec.LeaseDurationSeconds == nil { + return true + } + return now.Time.After(ls.Spec.RenewTime.Time.Add(time.Duration(*ls.Spec.LeaseDurationSeconds) * time.Second)) +} diff --git a/pkg/manager/leaseguard_test.go b/pkg/manager/leaseguard_test.go new file mode 100644 index 0000000..44365fb --- /dev/null +++ b/pkg/manager/leaseguard_test.go @@ -0,0 +1,225 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "testing" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + crclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// test constants +const ( + testNS = "kube-system" + testName = "mcr-shard-default" + testID = "peer-0" + otherID = "peer-1" +) + +func newScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := coordinationv1.AddToScheme(s); err != nil { + t.Fatalf("add scheme: %v", err) + } + return s +} + +func getLease(t *testing.T, c crclient.Client) *coordinationv1.Lease { + t.Helper() + var ls coordinationv1.Lease + if err := c.Get(context.Background(), types.NamespacedName{Namespace: testNS, Name: testName}, &ls); err != nil { + t.Fatalf("get lease: %v", err) + } + return &ls +} + +func makeLease(holder string, renewAgo time.Duration, durSec int32) *coordinationv1.Lease { + now := time.Now() + renew := metav1.NewMicroTime(now.Add(-renewAgo)) + return &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{Namespace: testNS, Name: testName}, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &holder, + LeaseDurationSeconds: &durSec, + RenewTime: &renew, + AcquireTime: &renew, + }, + } +} + +func TestTryAcquire_CreatesAndRenewsThenReleaseClearsHolder(t *testing.T) { + s := newScheme(t) + c := fake.NewClientBuilder().WithScheme(s).Build() + + onLostCh := make(chan struct{}, 1) + g := newLeaseGuard(c, testNS, testName, testID, 3*time.Second, 200*time.Millisecond, func() { onLostCh <- struct{}{} }) + + if ok := g.TryAcquire(context.Background()); !ok { + t.Fatalf("expected TryAcquire to succeed on create") + } + if !g.held { + t.Fatalf("guard should be held after TryAcquire") + } + + ls := getLease(t, c) + if ls.Spec.HolderIdentity == nil || *ls.Spec.HolderIdentity != testID { + t.Fatalf("holder mismatch, got %v", ls.Spec.HolderIdentity) + } + if ls.Spec.LeaseDurationSeconds == nil || *ls.Spec.LeaseDurationSeconds != int32(3) { + t.Fatalf("duration mismatch, got %v", ls.Spec.LeaseDurationSeconds) + } + + // Idempotent + if ok := g.TryAcquire(context.Background()); !ok { + t.Fatalf("expected idempotent TryAcquire to return true") + } + + // Release clears holder (best-effort) + g.Release(context.Background()) + ls = getLease(t, c) + if ls.Spec.HolderIdentity != nil && *ls.Spec.HolderIdentity != "" { + t.Fatalf("expected holder cleared on release, got %q", *ls.Spec.HolderIdentity) + } +} + +func TestTryAcquire_FailsWhenOtherHoldsAndNotExpired(t *testing.T) { + s := newScheme(t) + // Other holder renewed recently, not expired + dur := int32(30) + ls := makeLease(otherID, 1*time.Second, dur) + c := fake.NewClientBuilder().WithScheme(s).WithObjects(ls).Build() + + g := newLeaseGuard(c, testNS, testName, testID, 10*time.Second, 2*time.Second, nil) + + if ok := g.TryAcquire(context.Background()); ok { + t.Fatalf("expected TryAcquire to fail while another valid holder exists") + } + if g.held { + t.Fatalf("guard should not be held") + } +} + +func TestTryAcquire_AdoptsWhenExpired(t *testing.T) { + s := newScheme(t) + // Other holder expired: renew time far in the past relative to duration + dur := int32(5) + ls := makeLease(otherID, 30*time.Second, dur) + c := fake.NewClientBuilder().WithScheme(s).WithObjects(ls).Build() + + g := newLeaseGuard(c, testNS, testName, testID, 10*time.Second, 2*time.Second, nil) + + if ok := g.TryAcquire(context.Background()); !ok { + t.Fatalf("expected TryAcquire to adopt expired lease") + } + got := getLease(t, c) + if got.Spec.HolderIdentity == nil || *got.Spec.HolderIdentity != testID { + t.Fatalf("expected holder=%q, got %v", testID, got.Spec.HolderIdentity) + } +} + +func TestRenewLoop_LossTriggersOnLostAndReleases(t *testing.T) { + s := newScheme(t) + c := fake.NewClientBuilder().WithScheme(s).Build() + + lost := make(chan struct{}, 1) + g := newLeaseGuard(c, testNS, testName, testID, 3*time.Second, 50*time.Millisecond, func() { lost <- struct{}{} }) + + // Acquire + if ok := g.TryAcquire(context.Background()); !ok { + t.Fatalf("expected TryAcquire to succeed") + } + + // Flip lease to another holder (valid) so renewOnce observes loss. + ls := getLease(t, c) + now := metav1.NowMicro() + dur := int32(3) + other := otherID + ls.Spec.HolderIdentity = &other + ls.Spec.LeaseDurationSeconds = &dur + ls.Spec.RenewTime = &now + if err := c.Update(context.Background(), ls); err != nil { + t.Fatalf("update lease: %v", err) + } + + // Expect onLost to fire and guard to release within a few renew ticks + select { + case <-lost: + // ok + case <-time.After(2 * time.Second): + t.Fatalf("expected onLost to be called after renewal detects loss") + } + if g.held { + t.Fatalf("guard should not be held after loss") + } +} + +func TestRelease_NoHoldIsNoop(t *testing.T) { + s := newScheme(t) + c := fake.NewClientBuilder().WithScheme(s).Build() + + g := newLeaseGuard(c, testNS, testName, testID, 3*time.Second, 1*time.Second, nil) + // Not acquired yet; should be a no-op + g.Release(context.Background()) + // Nothing to assert other than "does not panic" +} + +func TestRenewLoop_ReleasesBeforeOnLost(t *testing.T) { + s := newScheme(t) + c := fake.NewClientBuilder().WithScheme(s).Build() + + heldAtCallback := make(chan bool, 1) + var g *leaseGuard + g = newLeaseGuard(c, testNS, testName, testID, 3*time.Second, 50*time.Millisecond, func() { + heldAtCallback <- g.held + }) + + // Acquire + if ok := g.TryAcquire(context.Background()); !ok { + t.Fatalf("expected TryAcquire to succeed") + } + + // Flip lease to another holder so renewOnce observes loss. + ls := getLease(t, c) + now := metav1.NowMicro() + dur := int32(3) + other := otherID + ls.Spec.HolderIdentity = &other + ls.Spec.LeaseDurationSeconds = &dur + ls.Spec.RenewTime = &now + if err := c.Update(context.Background(), ls); err != nil { + t.Fatalf("update lease: %v", err) + } + + // Expect callback to observe held == false (Release runs before onLost) + select { + case v := <-heldAtCallback: + if v { + t.Fatalf("expected g.held=false at onLost callback time") + } + case <-time.After(2 * time.Second): + t.Fatalf("expected onLost to be called after renewal detects loss") + } +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index b79663e..c16c82e 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "time" "github.com/go-logr/logr" @@ -33,6 +34,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" mccontext "sigs.k8s.io/multicluster-runtime/pkg/context" + "sigs.k8s.io/multicluster-runtime/pkg/manager/peers" + "sigs.k8s.io/multicluster-runtime/pkg/manager/sharder" "sigs.k8s.io/multicluster-runtime/pkg/multicluster" ) @@ -127,27 +130,48 @@ var _ Manager = &mcManager{} type mcManager struct { manager.Manager provider multicluster.Provider - - mcRunnables []multicluster.Aware + engine *synchronizationEngine } // New returns a new Manager for creating Controllers. The provider is used to // discover and manage clusters. With a provider set to nil, the manager will // behave like a regular controller-runtime manager. -func New(config *rest.Config, provider multicluster.Provider, opts Options) (Manager, error) { +func New(config *rest.Config, provider multicluster.Provider, opts manager.Options, mcOpts ...Option) (Manager, error) { mgr, err := manager.New(config, opts) if err != nil { return nil, err } - return WithMultiCluster(mgr, provider) + return WithMultiCluster(mgr, provider, mcOpts...) } // WithMultiCluster wraps a host manager to run multi-cluster controllers. -func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) { - return &mcManager{ - Manager: mgr, - provider: provider, - }, nil +func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider, mcOpts ...Option) (Manager, error) { + cfg := SynchronizationConfig{ + FenceNS: "kube-system", FencePrefix: "mcr-shard", PerClusterLease: true, + LeaseDuration: 20 * time.Second, LeaseRenew: 10 * time.Second, FenceThrottle: 750 * time.Millisecond, + PeerPrefix: "mcr-peer", PeerWeight: 1, Probe: 5 * time.Second, Rehash: 15 * time.Second, + } + + pr := peers.NewLeaseRegistry(mgr.GetClient(), cfg.FenceNS, cfg.PeerPrefix, "", cfg.PeerWeight, mgr.GetLogger()) + self := pr.Self() + + eng := newSynchronizationEngine( + mgr.GetClient(), mgr.GetLogger(), + sharder.NewHRW(), pr, self, cfg, + ) + + m := &mcManager{Manager: mgr, provider: provider, engine: eng} + + // Apply options before wiring the Runnable so overrides take effect early. + for _, o := range mcOpts { + o(m) + } + + // Start synchronization loop as a manager Runnable. + if err := mgr.Add(eng.Runnable()); err != nil { + return nil, err + } + return m, nil } // GetCluster returns a cluster for the given identifying cluster name. Get @@ -185,28 +209,15 @@ func (m *mcManager) GetProvider() multicluster.Provider { // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. -func (m *mcManager) Add(r Runnable) (err error) { - m.mcRunnables = append(m.mcRunnables, r) - defer func() { - if err != nil { - m.mcRunnables = m.mcRunnables[:len(m.mcRunnables)-1] - } - }() - +func (m *mcManager) Add(r Runnable) error { + m.engine.AddRunnable(r) return m.Manager.Add(r) } // Engage gets called when the component should start operations for the given // Cluster. ctx is cancelled when the cluster is disengaged. func (m *mcManager) Engage(ctx context.Context, name string, cl cluster.Cluster) error { - ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is called in the error case only. - for _, r := range m.mcRunnables { - if err := r.Engage(ctx, name, cl); err != nil { - cancel() - return fmt.Errorf("failed to engage cluster %q: %w", name, err) - } - } - return nil //nolint:govet // cancel is called in the error case only. + return m.engine.Engage(ctx, name, cl) } func (m *mcManager) GetManager(ctx context.Context, clusterName string) (manager.Manager, error) { diff --git a/pkg/manager/manager_envtest_suite_test.go b/pkg/manager/manager_envtest_suite_test.go new file mode 100644 index 0000000..e97c1a4 --- /dev/null +++ b/pkg/manager/manager_envtest_suite_test.go @@ -0,0 +1,41 @@ +package manager_test + +import ( + "testing" + + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestManager(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Manager Envtest Suite") +} + +var testEnv *envtest.Environment +var cfg *rest.Config + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + testEnv = &envtest.Environment{} + var err error + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + // Prevent the metrics listener being created + metricsserver.DefaultBindAddress = "0" +}) + +var _ = AfterSuite(func() { + if testEnv != nil { + Expect(testEnv.Stop()).To(Succeed()) + } + // Put the DefaultBindAddress back + metricsserver.DefaultBindAddress = ":8080" +}) diff --git a/pkg/manager/peers/doc.go b/pkg/manager/peers/doc.go new file mode 100644 index 0000000..b52f83b --- /dev/null +++ b/pkg/manager/peers/doc.go @@ -0,0 +1,30 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package peers implements a lightweight membership registry for sharding. + +Each peer advertises presence via a coordination.k8s.io/Lease named +- (e.g. mcr-peer-sharded-namespace-0). The registry: + + - upserts our own Lease on a cadence (heartbeat), + - lists other peer Leases with matching labels/prefix, + - exposes a stable snapshot for HRW/placement. + +This registry only tracks membership; single-writer ownership is enforced +separately via per-cluster fencing Leases. +*/ +package peers diff --git a/pkg/manager/peers/registry.go b/pkg/manager/peers/registry.go new file mode 100644 index 0000000..f5e929d --- /dev/null +++ b/pkg/manager/peers/registry.go @@ -0,0 +1,244 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peers + +import ( + "context" + "fmt" + "os" + "strconv" + "sync" + "time" + + "github.com/go-logr/logr" + + coordv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + crclient "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/multicluster-runtime/pkg/manager/sharder" + "sigs.k8s.io/multicluster-runtime/pkg/util" +) + +const ( + labelPartOf = "app.kubernetes.io/part-of" + labelPeer = "mcr.sigs.k8s.io/peer" + labelPrefix = "mcr.sigs.k8s.io/prefix" + annotWeight = "mcr.sigs.k8s.io/weight" + partOfValue = "multicluster-runtime" + defaultWeight = uint32(1) +) + +// Registry provides peer membership for sharding decisions. +type Registry interface { + // Self returns this process's identity and weight. + Self() sharder.PeerInfo + // Snapshot returns the current set of live peers. Treat as read-only. + Snapshot() []sharder.PeerInfo + // Run blocks, periodically renewing our Lease and refreshing peer membership + // until ctx is cancelled or an error occurs. + Run(ctx context.Context) error +} + +type leaseRegistry struct { + ns, namePrefix string + self sharder.PeerInfo + cli crclient.Client + + mu sync.RWMutex + peers map[string]sharder.PeerInfo + + ttl time.Duration + renew time.Duration + + log logr.Logger +} + +// NewLeaseRegistry constructs a Lease-based Registry. +// +// Params: +// - ns: namespace where peer Leases live (e.g. "kube-system") +// - namePrefix: Lease name prefix (e.g. "mcr-peer") +// - selfID: this process ID (defaults to os.Hostname() if empty) +// - weight: relative capacity (0 treated as 1) +// - log: logger; use logr.Discard() to silence +func NewLeaseRegistry(cli crclient.Client, ns, namePrefix string, selfID string, weight uint32, log logr.Logger) Registry { + if selfID == "" { + if hn, _ := os.Hostname(); hn != "" { + selfID = hn + } else { + selfID = "unknown" + } + } + // Sanitize to DNS-1123 subdomain for Lease names: lowercase, [a-z0-9-.], start/end alphanumeric. + selfID = util.SanitizeDNS1123(selfID) + if weight == 0 { + weight = defaultWeight + } + return &leaseRegistry{ + ns: ns, + namePrefix: namePrefix, + self: sharder.PeerInfo{ID: selfID, Weight: weight}, + cli: cli, + peers: map[string]sharder.PeerInfo{}, + ttl: 30 * time.Second, + renew: 10 * time.Second, + log: log.WithName("lease-registry").WithValues("ns", ns, "prefix", namePrefix, "selfID", selfID, "weight", weight), + } +} + +func (r *leaseRegistry) Self() sharder.PeerInfo { return r.self } + +func (r *leaseRegistry) Snapshot() []sharder.PeerInfo { + r.mu.RLock() + defer r.mu.RUnlock() + out := make([]sharder.PeerInfo, 0, len(r.peers)) + for _, p := range r.peers { + out = append(out, p) + } + return out +} + +func (r *leaseRegistry) Run(ctx context.Context) error { + r.log.Info("peer registry starting", "ns", r.ns, "prefix", r.namePrefix, "self", r.self.ID) + + // Tick frequently enough to renew well within ttl. + t := time.NewTicker(r.renew) + defer t.Stop() + + for { + // Do one pass immediately so we publish our presence promptly. + if err := r.renewSelfLease(ctx); err != nil && ctx.Err() == nil { + r.log.V(1).Info("renewSelfLease failed; will retry", "err", err) + } + if err := r.refreshPeers(ctx); err != nil && ctx.Err() == nil { + r.log.V(1).Info("refreshPeers failed; will retry", "err", err) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + // loop + } + } +} + +// renewSelfLease upserts our own Lease with fresh RenewTime and duration. +func (r *leaseRegistry) renewSelfLease(ctx context.Context) error { + now := metav1.MicroTime{Time: time.Now()} + ttlSec := int32(r.ttl / time.Second) + name := fmt.Sprintf("%s-%s", r.namePrefix, r.self.ID) + + lease := &coordv1.Lease{} + err := r.cli.Get(ctx, crclient.ObjectKey{Namespace: r.ns, Name: name}, lease) + switch { + case apierrors.IsNotFound(err): + lease = &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.ns, + Name: name, + Labels: map[string]string{ + labelPartOf: partOfValue, + labelPeer: "true", + labelPrefix: r.namePrefix, + }, + Annotations: map[string]string{ + annotWeight: strconv.FormatUint(uint64(r.self.Weight), 10), + }, + }, + Spec: coordv1.LeaseSpec{ + HolderIdentity: ptr.To(r.self.ID), + RenewTime: &now, + LeaseDurationSeconds: ptr.To(ttlSec), + }, + } + return r.cli.Create(ctx, lease) + + case err != nil: + return err + + default: + // Update the existing Lease + lease.Spec.HolderIdentity = ptr.To(r.self.ID) + lease.Spec.RenewTime = &now + lease.Spec.LeaseDurationSeconds = ptr.To(ttlSec) + if lease.Annotations == nil { + lease.Annotations = map[string]string{} + } + lease.Annotations[annotWeight] = strconv.FormatUint(uint64(r.self.Weight), 10) + return r.cli.Update(ctx, lease) + } +} + +// refreshPeers lists peer Leases and updates the in-memory snapshot. +func (r *leaseRegistry) refreshPeers(ctx context.Context) error { + list := &coordv1.LeaseList{} + // Only list our labeled peer leases with our prefix for efficiency. + if err := r.cli.List(ctx, list, + crclient.InNamespace(r.ns), + crclient.MatchingLabels{ + labelPeer: "true", + labelPrefix: r.namePrefix, + }, + ); err != nil { + return err + } + + now := time.Now() + next := make(map[string]sharder.PeerInfo, len(list.Items)) + + for i := range list.Items { + l := &list.Items[i] + // Basic sanity for holder identity + if l.Spec.HolderIdentity == nil || *l.Spec.HolderIdentity == "" { + continue + } + id := *l.Spec.HolderIdentity + + // Respect expiry: RenewTime + LeaseDurationSeconds + if l.Spec.RenewTime == nil || l.Spec.LeaseDurationSeconds == nil { + // If missing, treat as expired/stale. + continue + } + exp := l.Spec.RenewTime.Time.Add(time.Duration(*l.Spec.LeaseDurationSeconds) * time.Second) + if now.After(exp) { + continue // stale peer + } + + // Weight from annotation (optional) + weight := defaultWeight + if wStr := l.Annotations[annotWeight]; wStr != "" { + if w64, err := strconv.ParseUint(wStr, 10, 32); err == nil && w64 > 0 { + weight = uint32(w64) + } + } + + next[id] = sharder.PeerInfo{ID: id, Weight: weight} + } + + // Store snapshot (including ourselves; if not listed yet, ensure we're present). + next[r.self.ID] = sharder.PeerInfo{ID: r.self.ID, Weight: r.self.Weight} + + r.mu.Lock() + r.peers = next + r.mu.Unlock() + return nil +} diff --git a/pkg/manager/peers/registry_test.go b/pkg/manager/peers/registry_test.go new file mode 100644 index 0000000..f45c3fe --- /dev/null +++ b/pkg/manager/peers/registry_test.go @@ -0,0 +1,225 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peers + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + + coordv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + crclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + _ "github.com/onsi/ginkgo/v2" +) + +const ( + testNS = "kube-system" + prefix = "mcr-peer" + selfID = "peer-0" + otherID = "peer-1" +) + +func newScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := coordv1.AddToScheme(s); err != nil { + t.Fatalf("add scheme: %v", err) + } + return s +} + +func getLease(t *testing.T, c crclient.Client, name string) *coordv1.Lease { + t.Helper() + var ls coordv1.Lease + if err := c.Get(context.Background(), crclient.ObjectKey{Namespace: testNS, Name: name}, &ls); err != nil { + t.Fatalf("get lease %s: %v", name, err) + } + return &ls +} + +func TestRenewSelfLease_CreateThenUpdate(t *testing.T) { + s := newScheme(t) + c := fake.NewClientBuilder().WithScheme(s).Build() + + r := NewLeaseRegistry(c, testNS, prefix, selfID, 2, logr.Discard()).(*leaseRegistry) + + // First call creates the Lease + if err := r.renewSelfLease(context.Background()); err != nil { + t.Fatalf("renewSelfLease create: %v", err) + } + name := prefix + "-" + selfID + ls := getLease(t, c, name) + if ls.Spec.HolderIdentity == nil || *ls.Spec.HolderIdentity != selfID { + t.Fatalf("holder = %v, want %q", ls.Spec.HolderIdentity, selfID) + } + if got := ls.Labels[labelPeer]; got != "true" { + t.Fatalf("label %s = %q, want true", labelPeer, got) + } + if got := ls.Labels[labelPrefix]; got != prefix { + t.Fatalf("label %s = %q, want %s", labelPrefix, got, prefix) + } + if got := ls.Annotations[annotWeight]; got != "2" { + t.Fatalf("weight annot = %q, want 2", got) + } + + // Update path: change weight and ensure it is written + r.self.Weight = 3 + time.Sleep(1 * time.Millisecond) // ensure RenewTime advances + if err := r.renewSelfLease(context.Background()); err != nil { + t.Fatalf("renewSelfLease update: %v", err) + } + ls2 := getLease(t, c, name) + if got := ls2.Annotations[annotWeight]; got != "3" { + t.Fatalf("updated weight annot = %q, want 3", got) + } + if ls.Spec.RenewTime == nil || ls2.Spec.RenewTime == nil || + !ls2.Spec.RenewTime.Time.After(ls.Spec.RenewTime.Time) { + t.Fatalf("RenewTime did not advance: old=%v new=%v", ls.Spec.RenewTime, ls2.Spec.RenewTime) + } +} + +func TestRefreshPeers_FiltersAndParses(t *testing.T) { + s := newScheme(t) + + now := time.Now() + validRenew := metav1.NewMicroTime(now) + expiredRenew := metav1.NewMicroTime(now.Add(-time.Hour)) + dur := int32(60) + + // valid peer (otherID), correct labels/prefix, not expired, weight 5 + valid := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, Name: prefix + "-" + otherID, + Labels: map[string]string{ + labelPartOf: partOfValue, + labelPeer: "true", + labelPrefix: prefix, + }, + Annotations: map[string]string{ + annotWeight: "5", + }, + }, + Spec: coordv1.LeaseSpec{ + HolderIdentity: &[]string{otherID}[0], + RenewTime: &validRenew, + LeaseDurationSeconds: &dur, + }, + } + + // expired peer (should be ignored) + expired := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, Name: prefix + "-expired", + Labels: map[string]string{ + labelPeer: "true", + labelPrefix: prefix, + }, + }, + Spec: coordv1.LeaseSpec{ + HolderIdentity: &[]string{"nobody"}[0], + RenewTime: &expiredRenew, + LeaseDurationSeconds: &dur, + }, + } + + // wrong prefix (should be ignored) + wrong := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, Name: "otherprefix-" + "x", + Labels: map[string]string{ + labelPeer: "true", + labelPrefix: "otherprefix", + }, + }, + Spec: coordv1.LeaseSpec{ + HolderIdentity: &[]string{"x"}[0], + RenewTime: &validRenew, + LeaseDurationSeconds: &dur, + }, + } + + c := fake.NewClientBuilder().WithScheme(s).WithObjects(valid, expired, wrong).Build() + + r := NewLeaseRegistry(c, testNS, prefix, selfID, 1, logr.Discard()).(*leaseRegistry) + if err := r.refreshPeers(context.Background()); err != nil { + t.Fatalf("refreshPeers: %v", err) + } + + snap := r.Snapshot() + + // Expect self + valid other + want := map[string]uint32{ + selfID: 1, + otherID: 5, + } + got := map[string]uint32{} + for _, p := range snap { + got[p.ID] = p.Weight + } + for id, w := range want { + if got[id] != w { + t.Fatalf("snapshot missing/mismatch for %s: got %d want %d; full=%v", id, got[id], w, got) + } + } + // Should not include expired/wrong + if _, ok := got["expired"]; ok { + t.Fatalf("snapshot unexpectedly contains expired") + } + if _, ok := got["x"]; ok { + t.Fatalf("snapshot unexpectedly contains wrong prefix") + } +} + +func TestRun_PublishesSelfAndStopsOnCancel(t *testing.T) { + s := newScheme(t) + c := fake.NewClientBuilder().WithScheme(s).Build() + r := NewLeaseRegistry(c, testNS, prefix, selfID, 1, logr.Discard()).(*leaseRegistry) + + // Speed up the loop for the test + r.ttl = 2 * time.Second + r.renew = 50 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + go func() { + _ = r.Run(ctx) + close(done) + }() + + // wait for at least one tick + time.Sleep(120 * time.Millisecond) + + // self lease should exist + name := prefix + "-" + selfID + _ = getLease(t, c, name) // will fatal if missing + + // cancel and ensure Run returns + cancel() + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatalf("Run did not exit after cancel") + } +} diff --git a/pkg/manager/sharder/hrw.go b/pkg/manager/sharder/hrw.go new file mode 100644 index 0000000..db583d6 --- /dev/null +++ b/pkg/manager/sharder/hrw.go @@ -0,0 +1,86 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharder + +import ( + "hash/fnv" +) + +// HRW implements Highest-Random-Weight (aka Rendezvous) hashing. +// +// Given a stable cluster identifier and a snapshot of peers, HRW selects +// the peer with the highest score for that cluster. All peers compute the +// same scores independently, so the winner is deterministic as long as +// the inputs (clusterID, peer IDs, weights) are identical. +// +// Weighting: this implementation biases selection by multiplying the +// score with the peer's Weight (0 is treated as 1). This is a simple and +// fast heuristic; if you need proportional weighting with stronger +// distribution guarantees, consider the canonical weighted rendezvous +// formula (e.g., score = -ln(u)/w). +type HRW struct{} + +// NewHRW returns a new HRW sharder. +func NewHRW() *HRW { return &HRW{} } + +// ShouldOwn returns true if self is the HRW winner for clusterID among peers. +// +// Inputs: +// - clusterID: a stable string that identifies the shard/cluster (e.g., namespace name). +// - peers: the full membership snapshot used for the decision; all participants +// should use the same list (order does not matter). +// - self: the caller's peer info. +// +// Behavior & caveats: +// - If peers is empty, this returns true (caller may choose to gate with fencing). +// - Weight 0 is treated as 1 (no special meaning). +// - Ties are broken by "last wins" in the iteration order (practically +// unreachable with 64-bit hashing, but documented for completeness). +// - The hash (FNV-1a, 64-bit) is fast and stable but not cryptographically secure. +// - This method does not enforce ownership; callers should still use a +// per-shard fence (e.g., a Lease) to serialize actual work. +// +// Determinism requirements: +// - All peers must see the same peer IDs and weights when computing. +// - clusterID must be stable across processes (same input → same winner). +func (h *HRW) ShouldOwn(clusterID string, peers []PeerInfo, self PeerInfo) bool { + if len(peers) == 0 { + return true + } + var best PeerInfo + var bestScore uint64 + for _, p := range peers { + score := hash64(clusterID + "|" + p.ID) + if p.Weight == 0 { + p.Weight = 1 + } + score *= uint64(p.Weight) + if score >= bestScore { + bestScore = score + best = p + } + } + return best.ID == self.ID +} + +// hash64 returns a stable 64-bit FNV-1a hash of s. +// Fast, non-cryptographic; suitable for rendezvous hashing. +func hash64(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write([]byte(s)) + return h.Sum64() +} diff --git a/pkg/manager/sharder/sharder.go b/pkg/manager/sharder/sharder.go new file mode 100644 index 0000000..5c00b39 --- /dev/null +++ b/pkg/manager/sharder/sharder.go @@ -0,0 +1,45 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharder + +// PeerInfo describes a participating peer in sharding decisions. +// +// ID must be a stable, unique identifier for the peer (e.g., pod hostname). +// +// Weight is a relative capacity hint. The current HRW implementation in this +// package treats 0 as 1 and multiplies the peer’s score by Weight. With many +// shards, a peer’s expected share of ownership is roughly proportional to +// Weight relative to the sum of all peers’ weights. If you need stricter, +// provably proportional weighting, use a canonical weighted rendezvous score +// (e.g., s = -ln(u)/w) instead of simple multiplication. +type PeerInfo struct { + ID string + Weight uint32 // optional (default 1) +} + +// Sharder chooses whether the local peer should "own" (run) a given cluster. +// +// Implementations must be deterministic across peers given the same inputs. +// ShouldOwn does not enforce ownership; callers should still gate actual work +// behind a per-cluster fence (e.g., a Lease) to guarantee single-writer. +type Sharder interface { + // ShouldOwn returns true if self is the winner for clusterID among peers. + // - clusterID: stable identifier for the shard + // - peers: full membership snapshot (order-independent) + // - self: the caller’s PeerInfo + ShouldOwn(clusterID string, peers []PeerInfo, self PeerInfo) bool +} diff --git a/pkg/manager/sharding.go b/pkg/manager/sharding.go new file mode 100644 index 0000000..cf0c732 --- /dev/null +++ b/pkg/manager/sharding.go @@ -0,0 +1,99 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "time" + + "sigs.k8s.io/multicluster-runtime/pkg/manager/peers" + "sigs.k8s.io/multicluster-runtime/pkg/manager/sharder" +) + +// Option mutates mcManager configuration. +type Option func(*mcManager) + +// WithSharder replaces the default HRW sharder. +func WithSharder(s sharder.Sharder) Option { + return func(m *mcManager) { + if m.engine != nil { + m.engine.sharder = s + } + } +} + +// WithPeerWeight allows heterogeneous peers (capacity hint). +// Effective share tends toward w_i/Σw under many shards. +func WithPeerWeight(w uint32) Option { + return func(m *mcManager) { + if m.engine != nil { + m.engine.cfg.PeerWeight = w + } + } +} + +// WithShardLease configures the fencing Lease ns/prefix (mcr-shard-* by default). +func WithShardLease(ns, name string) Option { + return func(m *mcManager) { + if m.engine != nil { + m.engine.cfg.FenceNS = ns + m.engine.cfg.FencePrefix = name + } + } +} + +// WithPerClusterLease enables/disables per-cluster fencing (true -> mcr-shard-). +func WithPerClusterLease(on bool) Option { + return func(m *mcManager) { + if m.engine != nil { + m.engine.cfg.PerClusterLease = on + } + } +} + +// WithSynchronizationIntervals tunes the synchronization probe/rehash cadences. +func WithSynchronizationIntervals(probe, rehash time.Duration) Option { + return func(m *mcManager) { + if m.engine != nil { + m.engine.cfg.Probe = probe + m.engine.cfg.Rehash = rehash + } + } +} + +// WithLeaseTimings configures fencing lease timings. +// Choose renew < duration (e.g., renew ≈ duration/3). +func WithLeaseTimings(duration, renew, throttle time.Duration) Option { + return func(m *mcManager) { + if m.engine != nil { + m.engine.cfg.LeaseDuration = duration + m.engine.cfg.LeaseRenew = renew + m.engine.cfg.FenceThrottle = throttle + } + } +} + +// WithPeerRegistry injects a custom peer Registry. When set, it overrides the +// default Lease-based registry. Peer weight should be provided by the custom +// registry; WithPeerWeight does not apply. +func WithPeerRegistry(reg peers.Registry) Option { + return func(m *mcManager) { + if m.engine != nil && reg != nil { + m.engine.peers = reg + m.engine.self = reg.Self() + } + } +} diff --git a/pkg/manager/sharding_test.go b/pkg/manager/sharding_test.go new file mode 100644 index 0000000..bdeed9d --- /dev/null +++ b/pkg/manager/sharding_test.go @@ -0,0 +1,60 @@ +package manager + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/runtime" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "sigs.k8s.io/multicluster-runtime/pkg/manager/sharder" +) + +type fakeRegistry struct{ p sharder.PeerInfo } + +func (f *fakeRegistry) Self() sharder.PeerInfo { return f.p } +func (f *fakeRegistry) Snapshot() []sharder.PeerInfo { return []sharder.PeerInfo{f.p} } +func (f *fakeRegistry) Run(ctx context.Context) error { <-ctx.Done(); return ctx.Err() } + +func TestOptions_ApplyPeerRegistry(t *testing.T) { + s := runtime.NewScheme() + _ = coordinationv1.AddToScheme(s) + cli := fake.NewClientBuilder().WithScheme(s).Build() + + cfg := SynchronizationConfig{} + reg := &fakeRegistry{p: sharder.PeerInfo{ID: "x", Weight: 7}} + eng := newSynchronizationEngine(cli, logr.Discard(), sharder.NewHRW(), reg, reg.Self(), cfg) + m := &mcManager{Manager: nil, provider: nil, engine: eng} + + WithPeerRegistry(reg)(m) + if m.engine.peers != reg { + t.Fatalf("expected custom registry applied") + } + if m.engine.self != reg.Self() { + t.Fatalf("expected self to be updated from registry") + } +} + +func TestOptions_ApplyLeaseAndTimings(t *testing.T) { + m := &mcManager{engine: &synchronizationEngine{cfg: SynchronizationConfig{}}} + WithShardLease("ns", "name")(m) + WithPerClusterLease(true)(m) + WithLeaseTimings(30*time.Second, 10*time.Second, 750*time.Millisecond)(m) + WithSynchronizationIntervals(5*time.Second, 15*time.Second)(m) + + cfg := m.engine.cfg + if cfg.FenceNS != "ns" || cfg.FencePrefix != "name" || !cfg.PerClusterLease { + t.Fatalf("lease cfg not applied: %+v", cfg) + } + if cfg.LeaseDuration != 30*time.Second || cfg.LeaseRenew != 10*time.Second || cfg.FenceThrottle != 750*time.Millisecond { + t.Fatalf("timings not applied: %+v", cfg) + } + if cfg.Probe != 5*time.Second || cfg.Rehash != 15*time.Second { + t.Fatalf("cadence not applied: %+v", cfg) + } +} diff --git a/pkg/manager/synchronization.go b/pkg/manager/synchronization.go new file mode 100644 index 0000000..54e6f86 --- /dev/null +++ b/pkg/manager/synchronization.go @@ -0,0 +1,348 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "sigs.k8s.io/multicluster-runtime/pkg/manager/peers" + "sigs.k8s.io/multicluster-runtime/pkg/manager/sharder" + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" + "sigs.k8s.io/multicluster-runtime/pkg/util" +) + +// SynchronizationConfig holds the knobs for shard synchronization and fencing. +// +// Fencing: +// - FenceNS/FencePrefix: namespace and name prefix for per-shard Lease objects +// (e.g. mcr-shard- when PerClusterLease is true). +// - PerClusterLease: when true, use one Lease per cluster (recommended). When false, +// use a single shared fence name for all clusters. +// - LeaseDuration / LeaseRenew: total TTL and renew cadence. Choose renew < duration +// (commonly renew ≈ duration/3). +// - FenceThrottle: backoff before retrying failed fence acquisition, to reduce API churn. +// +// Peers (membership): +// - PeerPrefix: prefix used by the peer registry for membership Leases. +// - PeerWeight: relative capacity hint used by the sharder (0 treated as 1). +// +// Cadence: +// - Probe: periodic synchronization tick interval (decision loop). +// - Rehash: optional slower cadence for planned redistribution (unused in basic HRW). +type SynchronizationConfig struct { + // FenceNS is the namespace where fence Leases live (usually "kube-system"). + FenceNS string + // FencePrefix is the base Lease name for fences; with PerClusterLease it becomes + // FencePrefix+"-"+ (e.g., "mcr-shard-zoo"). + FencePrefix string + // PerClusterLease controls whether we create one fence per cluster (true) or a + // single shared fence name (false). Per-cluster is recommended. + PerClusterLease bool + // LeaseDuration is the total TTL written to the Lease (spec.LeaseDurationSeconds). + LeaseDuration time.Duration + // LeaseRenew is the renewal cadence; must be < LeaseDuration (rule of thumb ≈ 1/3). + LeaseRenew time.Duration + // FenceThrottle backs off repeated failed TryAcquire attempts to reduce API churn. + FenceThrottle time.Duration + + // PeerPrefix is the Lease name prefix used by the peer registry for membership. + PeerPrefix string + // PeerWeight is this peer’s capacity hint for HRW (0 treated as 1). + PeerWeight uint32 + + // Probe is the synchronization decision loop interval. + Probe time.Duration + // Rehash is an optional slower cadence for planned redistribution (currently unused). + Rehash time.Duration +} + +// synchronizationEngine makes synchronization decisions and starts/stops per-cluster work. +// +// It combines: +// - a peerRegistry (live peer snapshot), +// - a sharder (e.g., HRW) to decide "who should own", +// - a per-cluster leaseGuard to fence "who actually runs". +// +// The engine keeps per-cluster engagement state, ties watches/workers to an +// engagement context, and uses the fence to guarantee single-writer semantics. +type synchronizationEngine struct { + // kube is the host cluster client used for Leases and provider operations. + kube client.Client + // log is the engine’s logger. + log logr.Logger + // sharder decides “shouldOwn” given clusterID, peers, and self (e.g., HRW). + sharder sharder.Sharder + // peers provides a live membership snapshot for sharding decisions. + peers peers.Registry + // self is this process’s identity/weight as known by the peer registry. + self sharder.PeerInfo + // cfg holds all synchronization/fencing configuration. + cfg SynchronizationConfig + + // mu guards engaged and runnables. + mu sync.Mutex + // engaged tracks per-cluster engagement state keyed by cluster name. + engaged map[string]*engagement + // runnables are the multicluster components to (de)start per cluster. + runnables []multicluster.Aware +} + +// engagement tracks per-cluster lifecycle within the engine. +// +// ctx/cancel: engagement context; cancellation stops sources/workers. +// started: whether runnables have been engaged for this cluster. +// fence: the per-cluster Lease guard (nil until first start attempt). +// nextTry: throttle timestamp for fence acquisition retries. +type engagement struct { + // name is the cluster identifier (e.g., namespace). + name string + // cl is the controller-runtime cluster handle for this engagement. + cl cluster.Cluster + // ctx is the engagement context; cancelling it stops sources/work. + ctx context.Context + // cancel cancels ctx. + cancel context.CancelFunc + // started is true after runnables have been engaged for this cluster. + started bool + + // fence is the per-cluster Lease guard (nil until first start attempt). + fence *leaseGuard + // nextTry defers the next TryAcquire attempt until this time (throttling). + nextTry time.Time +} + +// newSynchronizationEngine wires an engine with its dependencies and initial config. +func newSynchronizationEngine(kube client.Client, log logr.Logger, shard sharder.Sharder, peers peers.Registry, self sharder.PeerInfo, cfg SynchronizationConfig) *synchronizationEngine { + return &synchronizationEngine{ + kube: kube, log: log, + sharder: shard, peers: peers, self: self, cfg: cfg, + engaged: make(map[string]*engagement), + } +} + +func (e *synchronizationEngine) fenceName(cluster string) string { + // Per-cluster fence: mcr-shard-; otherwise a single global fence + if e.cfg.PerClusterLease { + return fmt.Sprintf("%s-%s", e.cfg.FencePrefix, util.SanitizeDNS1123(cluster)) + } + return e.cfg.FencePrefix +} + +// Runnable returns a Runnable that manages synchronization of clusters. +func (e *synchronizationEngine) Runnable() manager.Runnable { + return manager.RunnableFunc(func(ctx context.Context) error { + e.log.Info("synchronization runnable starting", "peer", e.self.ID) + errCh := make(chan error, 1) + go func() { errCh <- e.peers.Run(ctx) }() + e.recompute(ctx) + t := time.NewTicker(e.cfg.Probe) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + return err + case <-t.C: + e.log.V(1).Info("synchronization tick", "peers", len(e.peers.Snapshot())) + e.recompute(ctx) + } + } + }) +} + +// Engage registers a cluster for synchronization management. +func (e *synchronizationEngine) Engage(parent context.Context, name string, cl cluster.Cluster) error { + // If provider already canceled, don't engage a dead cluster. + if err := parent.Err(); err != nil { + return err + } + + var doRecompute bool + + e.mu.Lock() + defer e.mu.Unlock() + + if cur, ok := e.engaged[name]; ok { + // Re-engage same name: replace token; stop old engagement; preserve fence. + var fence *leaseGuard + if cur.fence != nil { + fence = cur.fence // keep the existing fence/renewer + } + if cur.cancel != nil { + cur.cancel() // stop old sources/workers + } + + newEng := &engagement{name: name, cl: cl, fence: fence} + e.engaged[name] = newEng + + // cleanup tied to the *new* token; old goroutine will no-op (ee != token) + go func(pctx context.Context, key string, token *engagement) { + <-pctx.Done() + e.mu.Lock() + if ee, ok := e.engaged[key]; ok && ee == token { + if ee.started && ee.cancel != nil { + ee.cancel() + } + if ee.fence != nil { + go ee.fence.Release(context.Background()) + } + delete(e.engaged, key) + } + e.mu.Unlock() + }(parent, name, newEng) + + doRecompute = true + } else { + eng := &engagement{name: name, cl: cl} + e.engaged[name] = eng + go func(pctx context.Context, key string, token *engagement) { + <-pctx.Done() + e.mu.Lock() + if ee, ok := e.engaged[key]; ok && ee == token { + if ee.started && ee.cancel != nil { + ee.cancel() + } + if ee.fence != nil { + go ee.fence.Release(context.Background()) + } + delete(e.engaged, key) + } + e.mu.Unlock() + }(parent, name, eng) + doRecompute = true + } + + // Kick a decision outside the lock for faster attach. + if doRecompute { + go e.recompute(parent) + } + + return nil +} + +// recompute checks the current synchronization state and starts/stops clusters as needed. +func (e *synchronizationEngine) recompute(parent context.Context) { + peers := e.peers.Snapshot() + self := e.self + + type toStart struct { + name string + cl cluster.Cluster + ctx context.Context + } + var starts []toStart + var stops []context.CancelFunc + + now := time.Now() + + e.mu.Lock() + defer e.mu.Unlock() + for name, engm := range e.engaged { + should := e.sharder.ShouldOwn(name, peers, self) + + switch { + case should && !engm.started: + // ensure fence exists + if engm.fence == nil { + onLost := func(cluster string) func() { + return func() { + // best-effort stop if we lose the lease mid-flight + e.log.Info("lease lost; stopping", "cluster", cluster, "peer", self.ID) + e.mu.Lock() + if ee := e.engaged[cluster]; ee != nil && ee.started && ee.cancel != nil { + ee.cancel() + ee.started = false + } + e.mu.Unlock() + } + }(name) + engm.fence = newLeaseGuard( + e.kube, + e.cfg.FenceNS, e.fenceName(name), e.self.ID, + e.cfg.LeaseDuration, e.cfg.LeaseRenew, onLost, + ) + } + + // throttle attempts + if now.Before(engm.nextTry) { + continue + } + + // try to take the fence; if we fail, set nextTry and retry on next tick + if !engm.fence.TryAcquire(parent) { + engm.nextTry = now.Add(e.cfg.FenceThrottle) + continue + } + + // acquired fence; start the cluster + ctx, cancel := context.WithCancel(parent) + engm.ctx, engm.cancel, engm.started = ctx, cancel, true + starts = append(starts, toStart{name: engm.name, cl: engm.cl, ctx: ctx}) + e.log.Info("synchronization start", "cluster", name, "peer", self.ID) + + case !should && engm.started: + // stop + release fence + if engm.cancel != nil { + stops = append(stops, engm.cancel) + } + if engm.fence != nil { + go engm.fence.Release(parent) + } + engm.cancel = nil + engm.started = false + engm.nextTry = time.Time{} + e.log.Info("synchronization stop", "cluster", name, "peer", self.ID) + } + } + for _, c := range stops { + c() + } + for _, s := range starts { + go e.startForCluster(s.ctx, s.name, s.cl) + } +} + +// startForCluster engages all runnables for the given cluster. +func (e *synchronizationEngine) startForCluster(ctx context.Context, name string, cl cluster.Cluster) { + for _, r := range e.runnables { + if err := r.Engage(ctx, name, cl); err != nil { + e.log.Error(err, "failed to engage", "cluster", name) + // best-effort: cancel + mark stopped so next tick can retry + e.mu.Lock() + if engm := e.engaged[name]; engm != nil && engm.cancel != nil { + engm.cancel() + engm.started = false + } + e.mu.Unlock() + return + } + } +} + +func (e *synchronizationEngine) AddRunnable(r multicluster.Aware) { + e.runnables = append(e.runnables, r) +} diff --git a/pkg/manager/synchronization_envtest_test.go b/pkg/manager/synchronization_envtest_test.go new file mode 100644 index 0000000..17a55f5 --- /dev/null +++ b/pkg/manager/synchronization_envtest_test.go @@ -0,0 +1,93 @@ +package manager_test + +import ( + "context" + "fmt" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/manager" + + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + "sigs.k8s.io/multicluster-runtime/providers/namespace" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Synchronization engine envtest (namespace provider)", func() { + It("distributes leases and reconciles per-namespace", func(ctx SpecContext) { + // Build a direct client (no cache) for setup/reads + sch := runtime.NewScheme() + Expect(corev1.AddToScheme(sch)).To(Succeed()) + Expect(coordinationv1.AddToScheme(sch)).To(Succeed()) + directCli, err := client.New(cfg, client.Options{Scheme: sch}) + Expect(err).NotTo(HaveOccurred()) + + // Ensure kube-system exists for fencing leases + { + var ks corev1.Namespace + err := directCli.Get(ctx, client.ObjectKey{Name: "kube-system"}, &ks) + if apierrors.IsNotFound(err) { + Expect(directCli.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "kube-system"}})).To(Succeed()) + } else { + Expect(err).NotTo(HaveOccurred()) + } + } + + // Create N namespaces to act as clusters + nsNames := []string{"zoo", "jungle", "island"} + for _, n := range nsNames { + Expect(directCli.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n}})).To(Succeed()) + } + + // Provider: namespaces as clusters (uses its own cache via cluster) + host, err := cluster.New(cfg) + Expect(err).NotTo(HaveOccurred()) + prov := namespace.New(host) + + // Build mc manager with short timings + m, err := mcmanager.New(cfg, prov, manager.Options{}, + mcmanager.WithShardLease("kube-system", "mcr-shard"), + mcmanager.WithPerClusterLease(true), + mcmanager.WithLeaseTimings(6*time.Second, 2*time.Second, 100*time.Millisecond), + mcmanager.WithSynchronizationIntervals(200*time.Millisecond, 1*time.Second), + ) + Expect(err).NotTo(HaveOccurred()) + + // Add a trivial runnable to exercise engagement + r := &noopRunnable{} + Expect(m.Add(r)).To(Succeed()) + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Start manager and provider + go func() { _ = prov.Run(cctx, m) }() + go func() { _ = host.Start(cctx) }() + go func() { _ = m.Start(cctx) }() + + // Eventually expect three leases with holders set (read via direct client) + Eventually(func(g Gomega) { + for _, n := range nsNames { + var ls coordinationv1.Lease + key := client.ObjectKey{Namespace: "kube-system", Name: fmt.Sprintf("mcr-shard-%s", n)} + g.Expect(directCli.Get(ctx, key, &ls)).To(Succeed()) + g.Expect(ls.Spec.HolderIdentity).NotTo(BeNil()) + g.Expect(*ls.Spec.HolderIdentity).NotTo(BeEmpty()) + } + }).WithTimeout(20 * time.Second).WithPolling(200 * time.Millisecond).Should(Succeed()) + }) +}) + +type noopRunnable struct{} + +func (n *noopRunnable) Start(ctx context.Context) error { <-ctx.Done(); return ctx.Err() } +func (n *noopRunnable) Engage(ctx context.Context, name string, cl cluster.Cluster) error { return nil } diff --git a/pkg/manager/synchronization_test.go b/pkg/manager/synchronization_test.go new file mode 100644 index 0000000..7d1e44f --- /dev/null +++ b/pkg/manager/synchronization_test.go @@ -0,0 +1,132 @@ +package manager + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/runtime" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/cluster" + + "sigs.k8s.io/multicluster-runtime/pkg/manager/sharder" +) + +type stubSharder struct{ own bool } + +func (s *stubSharder) ShouldOwn(clusterID string, _ []sharder.PeerInfo, _ sharder.PeerInfo) bool { + return s.own +} + +type stubRegistry struct{ self sharder.PeerInfo } + +func (r *stubRegistry) Self() sharder.PeerInfo { return r.self } +func (r *stubRegistry) Snapshot() []sharder.PeerInfo { return []sharder.PeerInfo{r.self} } +func (r *stubRegistry) Run(ctx context.Context) error { <-ctx.Done(); return ctx.Err() } + +type stubRunnable struct{ called chan string } + +func (s *stubRunnable) Engage(ctx context.Context, name string, cl cluster.Cluster) error { + select { + case s.called <- name: + default: + } + return nil +} + +func TestSynchronization_StartsWhenShouldOwnAndFenceAcquired(t *testing.T) { + s := runtime.NewScheme() + if err := coordinationv1.AddToScheme(s); err != nil { + t.Fatalf("scheme: %v", err) + } + cli := fake.NewClientBuilder().WithScheme(s).Build() + + cfg := SynchronizationConfig{ + FenceNS: "kube-system", FencePrefix: "mcr-shard", PerClusterLease: true, + LeaseDuration: 3 * time.Second, LeaseRenew: 50 * time.Millisecond, FenceThrottle: 50 * time.Millisecond, + PeerPrefix: "mcr-peer", PeerWeight: 1, Probe: 10 * time.Millisecond, + } + reg := &stubRegistry{self: sharder.PeerInfo{ID: "peer-0", Weight: 1}} + sh := &stubSharder{own: true} + e := newSynchronizationEngine(cli, logr.Discard(), sh, reg, reg.self, cfg) + + sink := &stubRunnable{called: make(chan string, 1)} + e.AddRunnable(sink) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := e.Engage(ctx, "zoo", nil); err != nil { + t.Fatalf("engage: %v", err) + } + // Force a recompute to decide and start + e.recompute(ctx) + + select { + case name := <-sink.called: + if name != "zoo" { + t.Fatalf("expected engage for zoo, got %s", name) + } + case <-time.After(2 * time.Second): + t.Fatalf("expected runnable to be engaged") + } + + // Verify Lease created and held by self + var ls coordinationv1.Lease + key := client.ObjectKey{Namespace: cfg.FenceNS, Name: e.fenceName("zoo")} + if err := cli.Get(ctx, key, &ls); err != nil { + t.Fatalf("get lease: %v", err) + } + if ls.Spec.HolderIdentity == nil || *ls.Spec.HolderIdentity != reg.self.ID { + t.Fatalf("expected holder %q, got %+v", reg.self.ID, ls.Spec.HolderIdentity) + } +} + +func TestSynchronization_StopsAndReleasesWhenShouldOwnFalse(t *testing.T) { + s := runtime.NewScheme() + if err := coordinationv1.AddToScheme(s); err != nil { + t.Fatalf("scheme: %v", err) + } + cli := fake.NewClientBuilder().WithScheme(s).Build() + + cfg := SynchronizationConfig{ + FenceNS: "kube-system", FencePrefix: "mcr-shard", PerClusterLease: true, + LeaseDuration: 3 * time.Second, LeaseRenew: 50 * time.Millisecond, FenceThrottle: 50 * time.Millisecond, + PeerPrefix: "mcr-peer", PeerWeight: 1, Probe: 10 * time.Millisecond, + } + reg := &stubRegistry{self: sharder.PeerInfo{ID: "peer-0", Weight: 1}} + sh := &stubSharder{own: true} + e := newSynchronizationEngine(cli, logr.Discard(), sh, reg, reg.self, cfg) + + e.AddRunnable(&stubRunnable{called: make(chan string, 1)}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := e.Engage(ctx, "zoo", nil); err != nil { + t.Fatalf("engage: %v", err) + } + e.recompute(ctx) // start and acquire lease + + // Flip ownership to false and recompute; engine should stop and release fence + sh.own = false + e.recompute(ctx) + + // Poll for lease holder cleared by Release() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + var ls coordinationv1.Lease + if err := cli.Get(ctx, client.ObjectKey{Namespace: cfg.FenceNS, Name: e.fenceName("zoo")}, &ls); err == nil { + if ls.Spec.HolderIdentity != nil && *ls.Spec.HolderIdentity == "" { + return + } + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("expected lease holder to be cleared after release") +} diff --git a/pkg/source/kind.go b/pkg/source/kind.go index 78f70e4..36e444d 100644 --- a/pkg/source/kind.go +++ b/pkg/source/kind.go @@ -17,10 +17,21 @@ limitations under the License. package source import ( + "context" + "sync" + "time" + + toolscache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + crcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/source" + crsource "sigs.k8s.io/controller-runtime/pkg/source" mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" @@ -45,7 +56,8 @@ func TypedKind[object client.Object, request mcreconcile.ClusterAware[request]]( obj: obj, handler: handler, predicates: predicates, - project: func(_ cluster.Cluster, obj object) (object, error) { return obj, nil }, + project: func(_ cluster.Cluster, o object) (object, error) { return o, nil }, + resync: 0, // no periodic resync by default } } @@ -54,10 +66,20 @@ type kind[object client.Object, request mcreconcile.ClusterAware[request]] struc handler mchandler.TypedEventHandlerFunc[object, request] predicates []predicate.TypedPredicate[object] project func(cluster.Cluster, object) (object, error) + resync time.Duration } type clusterKind[object client.Object, request mcreconcile.ClusterAware[request]] struct { - source.TypedSyncingSource[request] + clusterName string + cl cluster.Cluster + obj object + h handler.TypedEventHandler[object, request] + preds []predicate.TypedPredicate[object] + resync time.Duration + + mu sync.Mutex + registration toolscache.ResourceEventHandlerRegistration + activeCtx context.Context } // WithProjection sets the projection function for the KindSource. @@ -66,22 +88,209 @@ func (k *kind[object, request]) WithProjection(project func(cluster.Cluster, obj return k } -func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (source.TypedSource[request], error) { +func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (crsource.TypedSource[request], error) { obj, err := k.project(cl, k.obj) if err != nil { return nil, err } return &clusterKind[object, request]{ - TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(name, cl), k.predicates...), + clusterName: name, + cl: cl, + obj: obj, + h: k.handler(name, cl), + preds: k.predicates, + resync: k.resync, }, nil } -func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (source.TypedSyncingSource[request], error) { - obj, err := k.project(cl, k.obj) +func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (crsource.TypedSyncingSource[request], error) { + src, err := k.ForCluster(name, cl) if err != nil { return nil, err } - return &clusterKind[object, request]{ - TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(name, cl), k.predicates...), - }, nil + return src.(crsource.TypedSyncingSource[request]), nil +} + +// WaitForSync satisfies TypedSyncingSource. +func (ck *clusterKind[object, request]) WaitForSync(ctx context.Context) error { + if !ck.cl.GetCache().WaitForCacheSync(ctx) { + return ctx.Err() + } + return nil +} + +// Start registers a removable handler on the (scoped) informer and removes it on ctx.Done(). +func (ck *clusterKind[object, request]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[request]) error { + log := log.FromContext(ctx).WithValues("cluster", ck.clusterName, "source", "kind") + + // Check if we're already started with this context + ck.mu.Lock() + if ck.registration != nil && ck.activeCtx != nil { + // Check if the active context is still valid + select { + case <-ck.activeCtx.Done(): + // Previous context cancelled, need to clean up and re-register + log.V(1).Info("previous context cancelled, cleaning up for re-registration") + // Clean up old registration is handled below + default: + // Still active with same context - check if it's the same context + if ck.activeCtx == ctx { + ck.mu.Unlock() + log.V(1).Info("handler already registered with same context") + return nil + } + // Different context but old one still active - this shouldn't happen + log.V(1).Info("different context while old one active, will re-register") + } + } + ck.mu.Unlock() + + inf, err := ck.getInformer(ctx, ck.obj) + if err != nil { + log.Error(err, "get informer failed") + return err + } + + // If there's an old registration, remove it first + ck.mu.Lock() + if ck.registration != nil { + log.V(1).Info("removing old event handler registration") + if err := inf.RemoveEventHandler(ck.registration); err != nil { + log.Error(err, "failed to remove old event handler") + } + ck.registration = nil + ck.activeCtx = nil + } + ck.mu.Unlock() + + // predicate helpers + passCreate := func(e event.TypedCreateEvent[object]) bool { + for _, p := range ck.preds { + if !p.Create(e) { + return false + } + } + return true + } + passUpdate := func(e event.TypedUpdateEvent[object]) bool { + for _, p := range ck.preds { + if !p.Update(e) { + return false + } + } + return true + } + passDelete := func(e event.TypedDeleteEvent[object]) bool { + for _, p := range ck.preds { + if !p.Delete(e) { + return false + } + } + return true + } + + // typed event builders + makeCreate := func(o client.Object) event.TypedCreateEvent[object] { + return event.TypedCreateEvent[object]{Object: any(o).(object)} + } + makeUpdate := func(oo, no client.Object) event.TypedUpdateEvent[object] { + return event.TypedUpdateEvent[object]{ObjectOld: any(oo).(object), ObjectNew: any(no).(object)} + } + makeDelete := func(o client.Object) event.TypedDeleteEvent[object] { + return event.TypedDeleteEvent[object]{Object: any(o).(object)} + } + + // Adapter that forwards to controller handler, honoring ctx. + h := toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(i interface{}) { + if ctx.Err() != nil { + return + } + if o, ok := i.(client.Object); ok { + e := makeCreate(o) + if passCreate(e) { + ck.h.Create(ctx, e, q) + } + } + }, + UpdateFunc: func(oo, no interface{}) { + if ctx.Err() != nil { + return + } + ooObj, ok1 := oo.(client.Object) + noObj, ok2 := no.(client.Object) + if ok1 && ok2 { + e := makeUpdate(ooObj, noObj) + if passUpdate(e) { + ck.h.Update(ctx, e, q) + } + } + }, + DeleteFunc: func(i interface{}) { + if ctx.Err() != nil { + return + } + // be robust to tombstones (provider should already unwrap) + if ts, ok := i.(toolscache.DeletedFinalStateUnknown); ok { + i = ts.Obj + } + if o, ok := i.(client.Object); ok { + e := makeDelete(o) + if passDelete(e) { + ck.h.Delete(ctx, e, q) + } + } + }, + } + + // Register via removable API. + reg, addErr := inf.AddEventHandlerWithResyncPeriod(h, ck.resync) + if addErr != nil { + log.Error(addErr, "AddEventHandlerWithResyncPeriod failed") + return addErr + } + + // Store registration and context + ck.mu.Lock() + ck.registration = reg + ck.activeCtx = ctx + ck.mu.Unlock() + + log.V(1).Info("kind source handler registered", "hasRegistration", reg != nil) + + // Defensive: ensure cache is synced. + if !ck.cl.GetCache().WaitForCacheSync(ctx) { + ck.mu.Lock() + _ = inf.RemoveEventHandler(ck.registration) + ck.registration = nil + ck.activeCtx = nil + ck.mu.Unlock() + log.V(1).Info("cache not synced; handler removed") + return ctx.Err() + } + log.V(1).Info("kind source cache synced") + + // Wait for context cancellation in a goroutine + go func() { + <-ctx.Done() + ck.mu.Lock() + defer ck.mu.Unlock() + + // Only remove if this is still our active registration + if ck.activeCtx == ctx && ck.registration != nil { + if err := inf.RemoveEventHandler(ck.registration); err != nil { + log.Error(err, "failed to remove event handler on context cancel") + } + ck.registration = nil + ck.activeCtx = nil + log.V(1).Info("kind source handler removed due to context cancellation") + } + }() + + return nil +} + +// getInformer resolves the informer from the cluster cache (provider returns a scoped informer). +func (ck *clusterKind[object, request]) getInformer(ctx context.Context, obj client.Object) (crcache.Informer, error) { + return ck.cl.GetCache().GetInformer(ctx, obj) } diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 0000000..8cea02b --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,37 @@ +package util + +// SanitizeDNS1123 converts s to a DNS-1123 subdomain-compatible string for resource names. +// It lowercases, replaces unsupported characters with '-', and trims leading/trailing non-alphanumerics. +func SanitizeDNS1123(s string) string { + b := make([]rune, 0, len(s)) + for _, r := range s { + switch { + case r >= 'A' && r <= 'Z': + b = append(b, r+('a'-'A')) + case (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' || r == '.': + b = append(b, r) + default: + b = append(b, '-') + } + } + // trim leading/trailing non-alphanumeric + start, end := 0, len(b) + for start < end { + r := b[start] + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') { + break + } + start++ + } + for end > start { + r := b[end-1] + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') { + break + } + end-- + } + if start >= end { + return "unknown" + } + return string(b[start:end]) +}