Skip to content

Commit 5a123ff

Browse files
authored
Merge pull request #128 from datum-cloud/feat/clustersharding
feat: split work based on clusters with sharding
2 parents 950f980 + e5a25fe commit 5a123ff

File tree

8 files changed

+234
-99
lines changed

8 files changed

+234
-99
lines changed

cmd/main.go

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ import (
2828
"sigs.k8s.io/controller-runtime/pkg/healthz"
2929
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3030
"sigs.k8s.io/controller-runtime/pkg/manager"
31+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3132
"sigs.k8s.io/controller-runtime/pkg/webhook"
3233
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
3334
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
3435
gatewayv1alpha3 "sigs.k8s.io/gateway-api/apis/v1alpha3"
3536
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
37+
"sigs.k8s.io/multicluster-runtime/pkg/manager/coordinator/sharded"
3638
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
3739
mcsingle "sigs.k8s.io/multicluster-runtime/providers/single"
3840

@@ -76,6 +78,12 @@ func main() {
7678
var enableLeaderElection bool
7779
var leaderElectionNamespace string
7880
var probeAddr string
81+
var enableClusterSharding bool
82+
var clusterShardingLeaseNamespace string
83+
var clusterShardingLeasePrefix string
84+
var clusterShardingPeerWeight uint
85+
var singletonControllersLeaderElection bool
86+
var singletonControllersLeaderElectionID string
7987

8088
var serverConfigFile string
8189

@@ -84,6 +92,42 @@ func main() {
8492
"Enable leader election for controller manager. "+
8593
"Enabling this will ensure there is only one active controller manager.")
8694
flag.StringVar(&leaderElectionNamespace, "leader-elect-namespace", "", "The namespace to use for leader election.")
95+
flag.BoolVar(
96+
&enableClusterSharding,
97+
"cluster-sharding-enabled",
98+
false,
99+
"Enable multicluster controller sharding via per-cluster coordination leases.",
100+
)
101+
flag.StringVar(
102+
&clusterShardingLeaseNamespace,
103+
"cluster-sharding-lease-namespace",
104+
"kube-system",
105+
"Namespace for controller cluster sharding leases.",
106+
)
107+
flag.StringVar(
108+
&clusterShardingLeasePrefix,
109+
"cluster-sharding-lease-prefix",
110+
"mcr-shard",
111+
"Lease name prefix for controller cluster sharding.",
112+
)
113+
flag.UintVar(
114+
&clusterShardingPeerWeight,
115+
"cluster-sharding-peer-weight",
116+
1,
117+
"Relative shard weight for this controller instance.",
118+
)
119+
flag.BoolVar(
120+
&singletonControllersLeaderElection,
121+
"singleton-controllers-leader-elect",
122+
true,
123+
"Enable leader election for singleton downstream controllers (Challenge and GatewayDownstreamCertificateSolver).",
124+
)
125+
flag.StringVar(
126+
&singletonControllersLeaderElectionID,
127+
"singleton-controllers-leader-election-id",
128+
"6a7d51cc.datumapis.com-singleton",
129+
"Leader election ID for singleton downstream controllers.",
130+
)
87131

88132
opts := zap.Options{
89133
Development: true,
@@ -157,12 +201,57 @@ func main() {
157201
renewDeadline := serverConfig.LeaderElection.RenewDeadline.Duration
158202
retryPeriod := serverConfig.LeaderElection.RetryPeriod.Duration
159203

204+
mcManagerOptions := []mcmanager.Option{}
205+
if enableClusterSharding {
206+
setupLog.Info(
207+
"enabling cluster sharding coordinator",
208+
"leaseNamespace",
209+
clusterShardingLeaseNamespace,
210+
"leasePrefix",
211+
clusterShardingLeasePrefix,
212+
"peerWeight",
213+
clusterShardingPeerWeight,
214+
)
215+
216+
clusterShardingOptions := []sharded.Option{
217+
sharded.WithShardLease(clusterShardingLeaseNamespace, clusterShardingLeasePrefix),
218+
sharded.WithPerClusterLease(true),
219+
}
220+
if clusterShardingPeerWeight > 0 {
221+
clusterShardingOptions = append(
222+
clusterShardingOptions,
223+
sharded.WithPeerWeight(uint32(clusterShardingPeerWeight)),
224+
)
225+
}
226+
227+
mcManagerOptions = append(
228+
mcManagerOptions,
229+
mcmanager.WithCoordinator(
230+
sharded.New(
231+
deploymentCluster.GetClient(),
232+
ctrl.Log.WithName("cluster-sharding-coordinator"),
233+
clusterShardingOptions...,
234+
),
235+
),
236+
)
237+
}
238+
239+
primaryManagerLeaderElection := enableLeaderElection
240+
if enableClusterSharding && enableLeaderElection {
241+
setupLog.Info(
242+
"disabling primary manager leader election while cluster sharding is enabled",
243+
"singletonControllersLeaderElection",
244+
singletonControllersLeaderElection,
245+
)
246+
primaryManagerLeaderElection = false
247+
}
248+
160249
mgr, err := mcmanager.New(cfg, provider, ctrl.Options{
161250
Scheme: scheme,
162251
Metrics: metricsServerOptions,
163252
WebhookServer: webhookServer,
164253
HealthProbeBindAddress: probeAddr,
165-
LeaderElection: enableLeaderElection,
254+
LeaderElection: primaryManagerLeaderElection,
166255
LeaderElectionID: "6a7d51cc.datumapis.com",
167256
LeaderElectionNamespace: leaderElectionNamespace,
168257
LeaseDuration: &leaseDuration,
@@ -179,7 +268,7 @@ func main() {
179268
// if you are doing or is intended to do any operation such as perform cleanups
180269
// after the manager stops then its usage might be unsafe.
181270
// LeaderElectionReleaseOnCancel: true,
182-
})
271+
}, mcManagerOptions...)
183272
if err != nil {
184273
setupLog.Error(err, "unable to start manager")
185274
os.Exit(1)
@@ -205,6 +294,28 @@ func main() {
205294
os.Exit(1)
206295
}
207296

297+
var singletonMgr manager.Manager
298+
singletonControllerMgr := mgr.GetLocalManager()
299+
if enableClusterSharding {
300+
singletonMgr, err = manager.New(cfg, manager.Options{
301+
Scheme: scheme,
302+
Metrics: metricsserver.Options{BindAddress: "0"},
303+
WebhookServer: webhook.NewServer(webhook.Options{Port: 0}),
304+
HealthProbeBindAddress: "0",
305+
LeaderElection: singletonControllersLeaderElection,
306+
LeaderElectionID: singletonControllersLeaderElectionID,
307+
LeaderElectionNamespace: leaderElectionNamespace,
308+
LeaseDuration: &leaseDuration,
309+
RenewDeadline: &renewDeadline,
310+
RetryPeriod: &retryPeriod,
311+
})
312+
if err != nil {
313+
setupLog.Error(err, "unable to create singleton controller manager")
314+
os.Exit(1)
315+
}
316+
singletonControllerMgr = singletonMgr
317+
}
318+
208319
if err := (&controller.NetworkReconciler{}).SetupWithManager(mgr); err != nil {
209320
setupLog.Error(err, "unable to create controller", "controller", "Network")
210321
os.Exit(1)
@@ -283,7 +394,7 @@ func main() {
283394
if err := (&controller.GatewayDownstreamCertificateSolverReconciler{
284395
Config: serverConfig,
285396
DownstreamCluster: downstreamCluster,
286-
}).SetupWithManager(mgr); err != nil {
397+
}).SetupWithManager(singletonControllerMgr); err != nil {
287398
setupLog.Error(err, "unable to create controller", "controller", "GatewayDownstreamCertificateSolver")
288399
os.Exit(1)
289400
}
@@ -311,7 +422,7 @@ func main() {
311422
if err := (&controller.ChallengeReconciler{
312423
Config: serverConfig,
313424
DownstreamCluster: downstreamCluster,
314-
}).SetupWithManager(mgr); err != nil {
425+
}).SetupWithManager(singletonControllerMgr); err != nil {
315426
setupLog.Error(err, "unable to create controller", "controller", "Challenge")
316427
os.Exit(1)
317428
}
@@ -399,6 +510,13 @@ func main() {
399510
return ignoreCanceled(mgr.Start(ctx))
400511
})
401512

513+
if singletonMgr != nil {
514+
setupLog.Info("starting singleton controller manager")
515+
g.Go(func() error {
516+
return ignoreCanceled(singletonMgr.Start(ctx))
517+
})
518+
}
519+
402520
if err := g.Wait(); err != nil {
403521
setupLog.Error(err, "unable to start")
404522
os.Exit(1)

go.mod

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@ require (
1212
github.com/go-redis/redis/v7 v7.4.1
1313
github.com/google/go-cmp v0.7.0
1414
github.com/google/uuid v1.6.0
15-
github.com/onsi/ginkgo/v2 v2.23.4
15+
github.com/onsi/ginkgo/v2 v2.27.2
1616
github.com/openrdap/rdap v0.9.1
1717
github.com/stretchr/testify v1.11.1
1818
go.miloapis.com/dns-operator v0.5.1
1919
go.miloapis.com/milo v0.7.4
20-
golang.org/x/net v0.45.0
21-
golang.org/x/sync v0.17.0
22-
google.golang.org/protobuf v1.36.6
20+
golang.org/x/net v0.47.0
21+
golang.org/x/sync v0.18.0
22+
google.golang.org/protobuf v1.36.8
2323
k8s.io/api v0.34.1
2424
k8s.io/apiextensions-apiserver v0.34.1
2525
k8s.io/apimachinery v0.34.1
2626
k8s.io/client-go v0.34.1
27-
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
28-
sigs.k8s.io/controller-runtime v0.22.1
27+
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
28+
sigs.k8s.io/controller-runtime v0.22.4
2929
sigs.k8s.io/gateway-api v1.3.1-0.20250527223622-54df0a899c1c
30-
sigs.k8s.io/multicluster-runtime v0.21.0-alpha.8
30+
sigs.k8s.io/multicluster-runtime v0.22.4-beta.1.0.20260121175728-cda86980358b
3131
)
3232

3333
require (
@@ -207,9 +207,9 @@ require (
207207
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
208208
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
209209
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
210-
github.com/prometheus/client_golang v1.23.0 // indirect
210+
github.com/prometheus/client_golang v1.23.2 // indirect
211211
github.com/prometheus/client_model v0.6.2 // indirect
212-
github.com/prometheus/common v0.65.0 // indirect
212+
github.com/prometheus/common v0.66.1 // indirect
213213
github.com/prometheus/otlptranslator v0.0.0-20250717125610-8549f4ab4f8f // indirect
214214
github.com/prometheus/procfs v0.17.0 // indirect
215215
github.com/quic-go/qpack v0.5.1 // indirect
@@ -227,8 +227,8 @@ require (
227227
github.com/sourcegraph/conc v0.3.0 // indirect
228228
github.com/spf13/afero v1.14.0 // indirect
229229
github.com/spf13/cast v1.7.1 // indirect
230-
github.com/spf13/cobra v1.9.1 // indirect
231-
github.com/spf13/pflag v1.0.7 // indirect
230+
github.com/spf13/cobra v1.10.0 // indirect
231+
github.com/spf13/pflag v1.0.9 // indirect
232232
github.com/spf13/viper v1.20.1 // indirect
233233
github.com/stoewer/go-strcase v1.3.1 // indirect
234234
github.com/subosito/gotenv v1.6.0 // indirect
@@ -249,9 +249,9 @@ require (
249249
github.com/yuin/gopher-lua v1.1.1 // indirect
250250
github.com/yusufpapurcu/wmi v1.2.4 // indirect
251251
github.com/zonedb/zonedb v1.0.3544 // indirect
252-
go.etcd.io/etcd/api/v3 v3.6.4 // indirect
253-
go.etcd.io/etcd/client/pkg/v3 v3.6.4 // indirect
254-
go.etcd.io/etcd/client/v3 v3.6.4 // indirect
252+
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
253+
go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect
254+
go.etcd.io/etcd/client/v3 v3.6.5 // indirect
255255
go.mongodb.org/mongo-driver v1.14.0 // indirect
256256
go.opencensus.io v0.24.0 // indirect
257257
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
@@ -268,27 +268,26 @@ require (
268268
go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect
269269
go.opentelemetry.io/otel/trace v1.37.0 // indirect
270270
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
271-
go.uber.org/automaxprocs v1.6.0 // indirect
272271
go.uber.org/mock v0.5.2 // indirect
273272
go.uber.org/multierr v1.11.0 // indirect
274273
go.uber.org/zap v1.27.0 // indirect
275-
go.yaml.in/yaml/v2 v2.4.2 // indirect
274+
go.yaml.in/yaml/v2 v2.4.3 // indirect
276275
go.yaml.in/yaml/v3 v3.0.4 // indirect
277-
golang.org/x/crypto v0.43.0 // indirect
276+
golang.org/x/crypto v0.45.0 // indirect
278277
golang.org/x/crypto/x509roots/fallback v0.0.0-20250406160420-959f8f3db0fb // indirect
279278
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect
280-
golang.org/x/mod v0.28.0 // indirect
279+
golang.org/x/mod v0.29.0 // indirect
281280
golang.org/x/oauth2 v0.30.0 // indirect
282-
golang.org/x/sys v0.37.0 // indirect
283-
golang.org/x/term v0.36.0 // indirect
284-
golang.org/x/text v0.30.0 // indirect
281+
golang.org/x/sys v0.38.0 // indirect
282+
golang.org/x/term v0.37.0 // indirect
283+
golang.org/x/text v0.31.0 // indirect
285284
golang.org/x/time v0.12.0 // indirect
286-
golang.org/x/tools v0.37.0 // indirect
285+
golang.org/x/tools v0.38.0 // indirect
287286
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
288287
google.golang.org/genproto/googleapis/api v0.0.0-20250728155136-f173205681a0 // indirect
289288
google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0 // indirect
290289
google.golang.org/grpc v1.74.2 // indirect
291-
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
290+
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
292291
gopkg.in/inf.v0 v0.9.1 // indirect
293292
gopkg.in/yaml.v2 v2.4.0 // indirect
294293
gopkg.in/yaml.v3 v3.0.1 // indirect
@@ -297,18 +296,18 @@ require (
297296
k8s.io/cli-runtime v0.34.1 // indirect
298297
k8s.io/component-base v0.34.1 // indirect
299298
k8s.io/klog/v2 v2.130.1 // indirect
300-
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
299+
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
301300
k8s.io/kubectl v0.34.1 // indirect
302301
k8s.io/metrics v0.34.1 // indirect
303302
oras.land/oras-go/v2 v2.6.0 // indirect
304303
periph.io/x/host/v3 v3.8.5 // indirect
305304
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
306-
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
305+
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
307306
sigs.k8s.io/kubectl-validate v0.0.5-0.20241223122011-eb064d2f92d5 // indirect
308307
sigs.k8s.io/kustomize/api v0.20.1 // indirect
309308
sigs.k8s.io/kustomize/kyaml v0.20.1 // indirect
310309
sigs.k8s.io/mcs-api v0.2.0 // indirect
311310
sigs.k8s.io/randfill v1.0.0 // indirect
312-
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
311+
sigs.k8s.io/structured-merge-diff/v6 v6.3.2-0.20260122202528-d9cc6641c482 // indirect
313312
sigs.k8s.io/yaml v1.6.0 // indirect
314313
)

0 commit comments

Comments
 (0)