diff --git a/Taskfile.yml b/Taskfile.yml index 0133a321c..fa8e384ee 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -110,6 +110,7 @@ tasks: - task: generate:third-party-licenses-list - task: generate:changelog - task: generate:buildkite-pipelines + - buf generate - nix fmt . # Ensure flake.nix has been formatted. generate:buildkite-pipelines: diff --git a/acceptance/go.sum b/acceptance/go.sum index 165ab10dc..81222ca15 100644 --- a/acceptance/go.sum +++ b/acceptance/go.sum @@ -670,6 +670,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/raft/v3 v3.6.0 h1:5NtvbDVYpnfZWcIHgGRk9DyzkBIXOi8j+DDp1IcnUWQ= +go.etcd.io/raft/v3 v3.6.0/go.mod h1:nLvLevg6+xrVtHUmVaTcTz603gQPHfh7kUAwV6YpfGo= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/collector/pdata v1.32.0 h1:hBzlJV1rujr1UdD2CBy2gmaIKtC15ysg/z+x8F3McQA= diff --git a/buf.gen.yaml b/buf.gen.yaml new file mode 100644 index 000000000..bbaada26a --- /dev/null +++ b/buf.gen.yaml @@ -0,0 +1,19 @@ +version: v2 +managed: + enabled: true + override: + - file_option: go_package_prefix + value: github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen +inputs: + - directory: pkg/multicluster/leaderelection/proto + paths: + - pkg/multicluster/leaderelection/proto/transport +plugins: + - remote: buf.build/protocolbuffers/go + out: pkg/multicluster/leaderelection/proto/gen + opt: + - paths=source_relative + - remote: buf.build/grpc/go + out: pkg/multicluster/leaderelection/proto/gen + opt: + - paths=source_relative \ No newline at end of file diff --git a/buf.yaml b/buf.yaml new file mode 100644 index 000000000..89b99819e --- /dev/null +++ b/buf.yaml @@ -0,0 +1,10 @@ +# For details on buf.yaml configuration, visit https://buf.build/docs/configuration/v2/buf-yaml +version: v2 +lint: + use: + - STANDARD +breaking: + use: + - FILE +modules: + - path: pkg/multicluster/leaderelection/proto \ No newline at end of file diff --git a/flake.nix b/flake.nix index 053f82c4c..c19664e46 100644 --- a/flake.nix +++ b/flake.nix @@ -59,6 +59,7 @@ pkgs.backport pkgs.bk pkgs.buildkite-agent + pkgs.buf pkgs.changie # Changelog manager pkgs.code-generator pkgs.controller-gen @@ -77,6 +78,7 @@ pkgs.golangci-lint pkgs.gotestsum pkgs.goverter + pkgs.grpc-tools pkgs.helm-3-10-3 pkgs.helm-docs pkgs.jq @@ -88,6 +90,7 @@ pkgs.kuttl pkgs.openssl pkgs.otel-desktop-viewer + pkgs.protoc-gen-go pkgs.setup-envtest # Kubernetes provided test utilities pkgs.vcluster pkgs.yq-go diff --git a/gen/go.sum b/gen/go.sum index 87e181895..3bcf1a817 100644 --- a/gen/go.sum +++ b/gen/go.sum @@ -645,6 +645,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/raft/v3 v3.6.0 h1:5NtvbDVYpnfZWcIHgGRk9DyzkBIXOi8j+DDp1IcnUWQ= +go.etcd.io/raft/v3 v3.6.0/go.mod h1:nLvLevg6+xrVtHUmVaTcTz603gQPHfh7kUAwV6YpfGo= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/bridges/prometheus v0.61.0 h1:RyrtJzu5MAmIcbRrwg75b+w3RlZCP0vJByDVzcpAe3M= diff --git a/licenses/third_party.md b/licenses/third_party.md index 39764eaa0..a4b675a74 100644 --- a/licenses/third_party.md +++ b/licenses/third_party.md @@ -98,6 +98,7 @@ run `task generate:third-party-licenses-list` | github.com/gobwas/glob | [MIT](https://github.com/gobwas/glob/blob/v0.2.3/LICENSE) | | github.com/gogo/protobuf | [BSD-3-Clause](https://github.com/gogo/protobuf/blob/v1.3.2/LICENSE) | | github.com/golang-jwt/jwt/v5 | [MIT](https://github.com/golang-jwt/jwt/blob/v5.3.0/LICENSE) | +| github.com/golang/protobuf/proto | [BSD-3-Clause](https://github.com/golang/protobuf/blob/v1.5.4/LICENSE) | | github.com/gonvenience/bunt | [MIT](https://github.com/gonvenience/bunt/blob/v1.3.5/LICENSE) | | github.com/gonvenience/neat | [MIT](https://github.com/gonvenience/neat/blob/v1.3.13/LICENSE) | | github.com/gonvenience/term | [MIT](https://github.com/gonvenience/term/blob/v1.0.2/LICENSE) | @@ -230,6 +231,7 @@ run `task generate:third-party-licenses-list` | github.com/wk8/go-ordered-map/v2 | [Apache-2.0](https://github.com/wk8/go-ordered-map/blob/v2.1.8/LICENSE) | | github.com/x448/float16 | [MIT](https://github.com/x448/float16/blob/v0.8.4/LICENSE) | | github.com/xlab/treeprint | [MIT](https://github.com/xlab/treeprint/blob/v1.2.0/LICENSE) | +| go.etcd.io/raft/v3 | [Apache-2.0](https://github.com/etcd-io/raft/blob/v3.6.0/LICENSE) | | go.opentelemetry.io/auto/sdk | [Apache-2.0](https://github.com/open-telemetry/opentelemetry-go-instrumentation/blob/sdk/v1.2.1/sdk/LICENSE) | | go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc | [Apache-2.0](https://github.com/open-telemetry/opentelemetry-go-contrib/blob/instrumentation/google.golang.org/grpc/otelgrpc/v0.61.0/instrumentation/google.golang.org/grpc/otelgrpc/LICENSE) | | go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp | [Apache-2.0](https://github.com/open-telemetry/opentelemetry-go-contrib/blob/instrumentation/net/http/otelhttp/v0.63.0/instrumentation/net/http/otelhttp/LICENSE) | @@ -307,7 +309,7 @@ run `task generate:third-party-licenses-list` | sigs.k8s.io/json | [BSD-3-Clause](https://github.com/kubernetes-sigs/json/blob/cfa47c3a1cc8/LICENSE) | | sigs.k8s.io/kustomize/api | [Apache-2.0](https://github.com/kubernetes-sigs/kustomize/blob/api/v0.19.0/api/LICENSE) | | sigs.k8s.io/kustomize/kyaml | [Apache-2.0](https://github.com/kubernetes-sigs/kustomize/blob/kyaml/v0.19.0/kyaml/LICENSE) | -| sigs.k8s.io/multicluster-runtime/pkg | [Apache-2.0](https://github.com/kubernetes-sigs/multicluster-runtime/blob/v0.22.4-beta.1/LICENSE) | +| sigs.k8s.io/multicluster-runtime | [Apache-2.0](https://github.com/kubernetes-sigs/multicluster-runtime/blob/v0.22.4-beta.1/LICENSE) | | sigs.k8s.io/randfill | [Apache-2.0](https://github.com/kubernetes-sigs/randfill/blob/v1.0.0/LICENSE) | | sigs.k8s.io/structured-merge-diff/v6 | [Apache-2.0](https://github.com/kubernetes-sigs/structured-merge-diff/blob/v6.3.0/LICENSE) | | sigs.k8s.io/yaml | [MIT](https://github.com/kubernetes-sigs/yaml/blob/v1.6.0/LICENSE) | diff --git a/operator/cmd/run/run.go b/operator/cmd/run/run.go index 73bc17691..2a071c638 100644 --- a/operator/cmd/run/run.go +++ b/operator/cmd/run/run.go @@ -47,6 +47,7 @@ import ( internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/operator/pkg/resources" "github.com/redpanda-data/redpanda-operator/pkg/kube" + "github.com/redpanda-data/redpanda-operator/pkg/multicluster" "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log" "github.com/redpanda-data/redpanda-operator/pkg/pflagutil" pkgsecrets "github.com/redpanda-data/redpanda-operator/pkg/secrets" @@ -363,11 +364,12 @@ func Run( opts.managerOptions.Cache.DefaultNamespaces = map[string]cache.Config{opts.namespace: {}} } - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), opts.managerOptions) + mcmanager, err := multicluster.NewSingleClusterManager(ctrl.GetConfigOrDie(), opts.managerOptions) if err != nil { setupLog.Error(err, "Unable to start manager") return err } + mgr := mcmanager.GetLocalManager() // Configure controllers that are always enabled (Redpanda, Topic, User, Schema). diff --git a/operator/go.mod b/operator/go.mod index fef51dac1..b696baf7c 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -161,6 +161,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.3.0 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/gonvenience/bunt v1.3.5 // indirect github.com/gonvenience/neat v1.3.13 // indirect github.com/gonvenience/term v1.0.2 // indirect @@ -286,6 +287,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/raft/v3 v3.6.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/collector/pdata v1.32.0 // indirect go.opentelemetry.io/contrib/bridges/prometheus v0.61.0 // indirect diff --git a/operator/go.sum b/operator/go.sum index 04b6b2a27..282ba10f8 100644 --- a/operator/go.sum +++ b/operator/go.sum @@ -144,6 +144,8 @@ github.com/cisco-open/k8s-objectmatcher v1.9.0 h1:/sfuO0BD09fpynZjXsqeZrh28Juc4V github.com/cisco-open/k8s-objectmatcher v1.9.0/go.mod h1:CH4E6qAK+q+JwKFJn0DaTNqxrbmWCaDQzGthKLK4nZ0= github.com/cloudhut/common v0.11.0 h1:N9yDk2fHhKjhzhKlZeMzrF5v1Q2kUm1EpnExAWAP+pc= github.com/cloudhut/common v0.11.0/go.mod h1:VVehSv0ZPulx35rzCXdwjkjQ3pRKZQAXk2/v3EE+6WU= +github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= +github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= @@ -651,6 +653,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/raft/v3 v3.6.0 h1:5NtvbDVYpnfZWcIHgGRk9DyzkBIXOi8j+DDp1IcnUWQ= +go.etcd.io/raft/v3 v3.6.0/go.mod h1:nLvLevg6+xrVtHUmVaTcTz603gQPHfh7kUAwV6YpfGo= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/collector/pdata v1.32.0 h1:hBzlJV1rujr1UdD2CBy2gmaIKtC15ysg/z+x8F3McQA= diff --git a/operator/internal/lifecycle/testdata/cases.pools.golden.txtar b/operator/internal/lifecycle/testdata/cases.pools.golden.txtar index a0494a357..3fdf52b5d 100644 --- a/operator/internal/lifecycle/testdata/cases.pools.golden.txtar +++ b/operator/internal/lifecycle/testdata/cases.pools.golden.txtar @@ -1355,6 +1355,7 @@ - SYS_RESOURCE privileged: true runAsGroup: 0 + runAsNonRoot: false runAsUser: 0 volumeMounts: - mountPath: /etc/tls/certs/default diff --git a/pkg/go.mod b/pkg/go.mod index 42131cc5e..2d2fd03b5 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -27,6 +27,7 @@ require ( github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 github.com/twmb/franz-go/pkg/sr v1.5.0 github.com/wk8/go-ordered-map/v2 v2.1.8 + go.etcd.io/raft/v3 v3.6.0 go.opentelemetry.io/collector/pdata v1.32.0 go.opentelemetry.io/contrib/bridges/prometheus v0.61.0 go.opentelemetry.io/otel v1.38.0 @@ -46,6 +47,7 @@ require ( golang.org/x/mod v0.30.0 golang.org/x/time v0.13.0 golang.org/x/tools v0.39.0 + google.golang.org/grpc v1.75.1 google.golang.org/protobuf v1.36.10 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.18.5 @@ -57,6 +59,7 @@ require ( k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 pgregory.net/rapid v1.1.0 sigs.k8s.io/controller-runtime v0.22.4 + sigs.k8s.io/multicluster-runtime v0.22.4-beta.1 sigs.k8s.io/yaml v1.6.0 ) @@ -152,6 +155,7 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.3.0 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/gonvenience/bunt v1.3.5 // indirect github.com/gonvenience/neat v1.3.13 // indirect github.com/gonvenience/term v1.0.2 // indirect @@ -290,7 +294,6 @@ require ( google.golang.org/genproto v0.0.0-20251111163417-95abcf5c77ba // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect - google.golang.org/grpc v1.75.1 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index ec6cfcaa0..8f54a9dba 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -128,6 +128,8 @@ github.com/chrisseto/rapid v0.0.0-20240815210052-cdeef406c65c h1:GZtcJAFTBCr16eM github.com/chrisseto/rapid v0.0.0-20240815210052-cdeef406c65c/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= github.com/cloudhut/common v0.11.0 h1:N9yDk2fHhKjhzhKlZeMzrF5v1Q2kUm1EpnExAWAP+pc= github.com/cloudhut/common v0.11.0/go.mod h1:VVehSv0ZPulx35rzCXdwjkjQ3pRKZQAXk2/v3EE+6WU= +github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= +github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= @@ -265,6 +267,8 @@ github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7O github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20251114195745-4902fdda35c8 h1:3DsUAV+VNEQa2CUVLxCY3f87278uWfIDhJnbdvDjvmE= github.com/google/pprof v0.0.0-20251114195745-4902fdda35c8/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= @@ -592,6 +596,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/raft/v3 v3.6.0 h1:5NtvbDVYpnfZWcIHgGRk9DyzkBIXOi8j+DDp1IcnUWQ= +go.etcd.io/raft/v3 v3.6.0/go.mod h1:nLvLevg6+xrVtHUmVaTcTz603gQPHfh7kUAwV6YpfGo= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/collector/pdata v1.32.0 h1:hBzlJV1rujr1UdD2CBy2gmaIKtC15ysg/z+x8F3McQA= @@ -807,6 +813,8 @@ sigs.k8s.io/kustomize/api v0.19.0 h1:F+2HB2mU1MSiR9Hp1NEgoU2q9ItNOaBJl0I4Dlus5SQ sigs.k8s.io/kustomize/api v0.19.0/go.mod h1:/BbwnivGVcBh1r+8m3tH1VNxJmHSk1PzP5fkP6lbL1o= sigs.k8s.io/kustomize/kyaml v0.19.0 h1:RFge5qsO1uHhwJsu3ipV7RNolC7Uozc0jUBC/61XSlA= sigs.k8s.io/kustomize/kyaml v0.19.0/go.mod h1:FeKD5jEOH+FbZPpqUghBP8mrLjJ3+zD3/rf9NNu1cwY= +sigs.k8s.io/multicluster-runtime v0.22.4-beta.1 h1:0XWbDINepM9UOyLkqhG4g7BtGBFKCDvZFyPsw1vufKE= +sigs.k8s.io/multicluster-runtime v0.22.4-beta.1/go.mod h1:zSMb4mC8MAZK42l8eE1ywkeX6vjuNRenYzJ1w+GPdfI= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco= diff --git a/pkg/multicluster/bootstrap/bootstrapper.go b/pkg/multicluster/bootstrap/bootstrapper.go new file mode 100644 index 000000000..9ca0341f3 --- /dev/null +++ b/pkg/multicluster/bootstrap/bootstrapper.go @@ -0,0 +1,136 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package bootstrap + +import ( + "context" + "strings" + + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type RemoteConfiguration struct { + ContextName string + APIServer string + ServiceAddress string +} + +func (r RemoteConfiguration) Client() (client.Client, error) { + config, err := configFromContext(r.ContextName) + if err != nil { + return nil, err + } + + return client.New(config, client.Options{}) +} + +func (r RemoteConfiguration) Config() (*rest.Config, error) { + return configFromContext(r.ContextName) +} + +func (r RemoteConfiguration) Address() (string, error) { + if r.APIServer != "" { + return r.APIServer, nil + } + config, err := r.Config() + if err != nil { + return "", err + } + + return config.Host, nil +} + +func (r RemoteConfiguration) FQDN(c BootstrapClusterConfiguration) (string, error) { + if r.ServiceAddress != "" { + return strings.Split(r.ServiceAddress, ":")[0], nil + } + + return c.ServiceName + "-" + r.ContextName, nil +} + +type BootstrapClusterConfiguration struct { + BootstrapTLS bool + BootstrapKubeconfigs bool + EnsureNamespace bool + OperatorNamespace string + ServiceName string + RemoteClusters []RemoteConfiguration +} + +func BootstrapKubernetesClusters(ctx context.Context, organization string, configuration BootstrapClusterConfiguration) error { + caCertificate, err := GenerateCA(organization, "Root CA", nil) + if err != nil { + return err + } + + kubeconfigs := [][]byte{} + certificates := []*Certificate{} + for _, cluster := range configuration.RemoteClusters { + if configuration.BootstrapKubeconfigs { + address, err := cluster.Address() + if err != nil { + return err + } + config, err := CreateRemoteKubeconfig(ctx, &RemoteKubernetesConfiguration{ + ContextName: cluster.ContextName, + EnsureNamespace: configuration.EnsureNamespace, + Namespace: configuration.OperatorNamespace, + Name: configuration.ServiceName, + APIServer: address, + }) + if err != nil { + return err + } + kubeconfigs = append(kubeconfigs, config) + } + if configuration.BootstrapTLS { + serviceFQDN, err := cluster.FQDN(configuration) + if err != nil { + return err + } + certificate, err := caCertificate.Sign(serviceFQDN) + if err != nil { + return err + } + certificates = append(certificates, certificate) + } + } + + for i, cluster := range configuration.RemoteClusters { + if configuration.BootstrapKubeconfigs { + for i := range kubeconfigs { + kubeconfig := kubeconfigs[i] + + if err := CreateKubeconfigSecret(ctx, kubeconfig, &RemoteKubernetesConfiguration{ + ContextName: cluster.ContextName, + Namespace: configuration.OperatorNamespace, + Name: configuration.ServiceName + "-" + configuration.RemoteClusters[i].ContextName, + EnsureNamespace: configuration.EnsureNamespace, + }); err != nil { + return err + } + } + } + if configuration.BootstrapTLS { + certificate := certificates[i] + if err := CreateTLSSecret(ctx, caCertificate, certificate, &RemoteKubernetesConfiguration{ + ContextName: cluster.ContextName, + Namespace: configuration.OperatorNamespace, + Name: configuration.ServiceName, + EnsureNamespace: configuration.EnsureNamespace, + }); err != nil { + return err + } + } + } + + return nil +} diff --git a/pkg/multicluster/bootstrap/certificates.go b/pkg/multicluster/bootstrap/certificates.go new file mode 100644 index 000000000..1f3f4716e --- /dev/null +++ b/pkg/multicluster/bootstrap/certificates.go @@ -0,0 +1,400 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package bootstrap + +import ( + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "errors" + "fmt" + "math/big" + "net" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + notBeforeGracePeriod = 1 * time.Hour + defaultCAExpiration = 10 * 365 * 24 * time.Hour // 10 years + defaultIntermediateCAExpiration = 5 * 365 * 24 * time.Hour // 5 year + defaultCertificateExpiration = 365 * 24 * time.Hour // 1 year +) + +type CertificateConfiguration struct { + Namespace string + Name string +} + +type CACertificate struct { + organization string + gracePeriod time.Duration + certificateExpiration time.Duration + intermediateCAExpiration time.Duration + + cert *x509.Certificate + pk *ecdsa.PrivateKey + + pem []byte + privateKeyPEM []byte +} + +func (c *CACertificate) Bytes() []byte { + return c.pem +} + +func (c *CACertificate) PrivateKeyBytes() []byte { + return c.privateKeyPEM +} + +func (c *CACertificate) Sign(names ...string) (*Certificate, error) { + if len(names) == 0 { + return nil, errors.New("must specify at least one name") + } + + now := time.Now() + notBefore := now.Add(-c.gracePeriod) + notAfter := now.Add(c.certificateExpiration) + + if c.cert.NotBefore.After(notBefore) { + return nil, errors.New("CA is not valid prior to the certificate not before time, cannot issue a new certificate") + } + + if c.cert.NotAfter.Before(notAfter) { + return nil, errors.New("CA expires before certificate expiration time, cannot issue a new certificate") + } + + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, fmt.Errorf("generating key: %w", err) + } + + ips := []net.IP{} + dns := []string{} + for _, name := range names { + if ip := net.ParseIP(name); ip != nil { + ips = append(ips, ip) + } else { + dns = append(dns, name) + } + } + + serialNumber, err := rand.Int(rand.Reader, big.NewInt(1<<62)) + if err != nil { + return nil, fmt.Errorf("generating serial number: %w", err) + } + tmpl := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + CommonName: names[0], + Organization: []string{c.organization}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, + DNSNames: dns, + IPAddresses: ips, + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, c.cert, &priv.PublicKey, c.pk) + if err != nil { + return nil, fmt.Errorf("creating certificate:: %w", err) + } + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + keyBytes, err := x509.MarshalECPrivateKey(priv) + if err != nil { + return nil, fmt.Errorf("marshaling certificate:: %w", err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes}) + + return &Certificate{ + privateKey: keyPEM, + certificate: certPEM, + }, nil +} + +func (c *CACertificate) Intermediate(name string) (*CACertificate, error) { + now := time.Now() + notBefore := now.Add(-c.gracePeriod) + notAfter := now.Add(c.intermediateCAExpiration) + + if c.cert.NotBefore.After(notBefore) { + return nil, errors.New("CA is not valid prior to the certificate not before time, cannot issue a new certificate") + } + + if c.cert.NotAfter.Before(notAfter) { + return nil, errors.New("CA expires before certificate expiration time, cannot issue a new certificate") + } + + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, fmt.Errorf("generating key: %w", err) + } + + serialNumber, err := rand.Int(rand.Reader, big.NewInt(1<<62)) + if err != nil { + return nil, fmt.Errorf("generating serial number: %w", err) + } + tmpl := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + CommonName: name, + Organization: []string{c.organization}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(365 * 24 * time.Hour), + + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign, + BasicConstraintsValid: true, + IsCA: true, + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, c.cert, &priv.PublicKey, c.pk) + if err != nil { + return nil, fmt.Errorf("creating certificate:: %w", err) + } + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + keyBytes, err := x509.MarshalECPrivateKey(priv) + if err != nil { + return nil, fmt.Errorf("marshaling certificate:: %w", err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes}) + + cert, err := x509.ParseCertificate(derBytes) + if err != nil { + return nil, fmt.Errorf("parsing certificate:: %w", err) + } + + return &CACertificate{ + pk: priv, + cert: cert, + organization: c.organization, + certificateExpiration: c.certificateExpiration, + intermediateCAExpiration: c.intermediateCAExpiration, + privateKeyPEM: keyPEM, + pem: certPEM, + }, nil +} + +type Certificate struct { + privateKey []byte + certificate []byte +} + +func (c *Certificate) PrivateKeyBytes() []byte { + return c.privateKey +} + +func (c *Certificate) Bytes() []byte { + return c.certificate +} + +type CAConfiguration struct { + NotBeforeGracePeriod time.Duration + CALifetime time.Duration + IntermediateCALifetime time.Duration + CertificateLifetime time.Duration +} + +func GenerateCA(organization, name string, configuration *CAConfiguration) (*CACertificate, error) { + if configuration == nil { + configuration = &CAConfiguration{} + } + if configuration.CALifetime == 0 { + configuration.CALifetime = defaultCAExpiration + } + if configuration.IntermediateCALifetime == 0 { + configuration.IntermediateCALifetime = defaultIntermediateCAExpiration + } + if configuration.CertificateLifetime == 0 { + configuration.CertificateLifetime = defaultCertificateExpiration + } + if configuration.NotBeforeGracePeriod == 0 { + configuration.NotBeforeGracePeriod = notBeforeGracePeriod + } + + caKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, fmt.Errorf("generating key: %w", err) + } + + now := time.Now() + notBefore := now.Add(-configuration.NotBeforeGracePeriod) + notAfter := now.Add(configuration.CALifetime) + + serialNumber, err := rand.Int(rand.Reader, big.NewInt(1<<62)) + if err != nil { + return nil, fmt.Errorf("generating serial number: %w", err) + } + caTemplate := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + CommonName: name, + Organization: []string{organization}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign, + BasicConstraintsValid: true, + IsCA: true, + MaxPathLen: 1, + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &caTemplate, &caTemplate, &caKey.PublicKey, caKey) + if err != nil { + return nil, fmt.Errorf("creating certificate:: %w", err) + } + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + keyBytes, err := x509.MarshalECPrivateKey(caKey) + if err != nil { + return nil, fmt.Errorf("marshaling certificate:: %w", err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes}) + + cert, err := x509.ParseCertificate(derBytes) + if err != nil { + return nil, fmt.Errorf("parsing certificate:: %w", err) + } + + return &CACertificate{ + pk: caKey, + cert: cert, + organization: organization, + privateKeyPEM: keyPEM, + pem: certPEM, + certificateExpiration: configuration.CertificateLifetime, + intermediateCAExpiration: configuration.IntermediateCALifetime, + gracePeriod: configuration.NotBeforeGracePeriod, + }, nil +} + +func LoadCA(certPEM, keyPEM []byte, configuration *CAConfiguration) (*CACertificate, error) { + if configuration == nil { + configuration = &CAConfiguration{} + } + if configuration.CALifetime == 0 { + configuration.CALifetime = defaultCAExpiration + } + if configuration.IntermediateCALifetime == 0 { + configuration.IntermediateCALifetime = defaultIntermediateCAExpiration + } + if configuration.CertificateLifetime == 0 { + configuration.CertificateLifetime = defaultCertificateExpiration + } + if configuration.NotBeforeGracePeriod == 0 { + configuration.NotBeforeGracePeriod = notBeforeGracePeriod + } + + certBlock, _ := pem.Decode(certPEM) + if certBlock == nil { + return nil, errors.New("invalid cert PEM block") + } + + keyBlock, _ := pem.Decode(keyPEM) + if keyBlock == nil { + return nil, errors.New("invalid key PEM block") + } + + cert, err := x509.ParseCertificate(certBlock.Bytes) + if err != nil { + return nil, fmt.Errorf("parsing certificate: %w", err) + } + + pk, err := x509.ParseECPrivateKey(keyBlock.Bytes) + if err != nil { + return nil, fmt.Errorf("parsing private key: %w", err) + } + + if len(cert.Issuer.Organization) == 0 { + return nil, errors.New("no organzation found in certificate") + } + + return &CACertificate{ + pk: pk, + cert: cert, + organization: cert.Issuer.Organization[0], + privateKeyPEM: keyPEM, + pem: certPEM, + certificateExpiration: configuration.CertificateLifetime, + intermediateCAExpiration: configuration.IntermediateCALifetime, + gracePeriod: configuration.NotBeforeGracePeriod, + }, nil +} + +func CreateTLSSecret(ctx context.Context, ca *CACertificate, certificate *Certificate, configuration *RemoteKubernetesConfiguration) error { + if configuration == nil { + configuration = &RemoteKubernetesConfiguration{} + } + if configuration.Namespace == "" { + configuration.Namespace = defaultServiceAccountNamespace + } + if configuration.Name == "" { + configuration.Name = defaultServiceAccountName + } + if configuration.RESTConfig == nil { + if configuration.ContextName == "" { + return errors.New("either the name of a kubernetes context of a rest Config must be specified") + } + config, err := configFromContext(configuration.ContextName) + if err != nil { + return fmt.Errorf("getting REST configuration: %v", err) + } + configuration.RESTConfig = config + } + + cl, err := client.New(configuration.RESTConfig, client.Options{}) + if err != nil { + return fmt.Errorf("initializing client: %v", err) + } + + if configuration.EnsureNamespace { + if err := EnsureNamespace(ctx, configuration.Namespace, cl); err != nil { + return fmt.Errorf("ensuring namespace exists: %v", err) + } + } + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: configuration.Name + "-certificates", + Namespace: configuration.Namespace, + }, + Data: map[string][]byte{ + "ca.crt": ca.Bytes(), + "tls.crt": certificate.Bytes(), + "tls.key": certificate.PrivateKeyBytes(), + }, + } + _, err = controllerutil.CreateOrUpdate(ctx, cl, secret, func() error { + secret.Data = map[string][]byte{ + "ca.crt": ca.Bytes(), + "tls.crt": certificate.Bytes(), + "tls.key": certificate.PrivateKeyBytes(), + } + return nil + }) + if err != nil { + return fmt.Errorf("creating tls file: %v", err) + } + return nil +} diff --git a/pkg/multicluster/bootstrap/kubeconfig.go b/pkg/multicluster/bootstrap/kubeconfig.go new file mode 100644 index 000000000..a8f6a5242 --- /dev/null +++ b/pkg/multicluster/bootstrap/kubeconfig.go @@ -0,0 +1,234 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package bootstrap + +import ( + "bytes" + "context" + "encoding/base64" + "errors" + "fmt" + "text/template" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + defaultServiceAccountNamespace = metav1.NamespaceDefault + defaultServiceAccountName = "multicluster-operator" +) + +type RemoteKubernetesConfiguration struct { + ContextName string + Namespace string + Name string + APIServer string + RESTConfig *rest.Config + EnsureNamespace bool +} + +func configFromContext(contextName string) (*rest.Config, error) { + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), + &clientcmd.ConfigOverrides{CurrentContext: contextName}, + ).ClientConfig() +} + +func CreateKubeconfigSecret(ctx context.Context, data []byte, configuration *RemoteKubernetesConfiguration) error { + if configuration == nil { + configuration = &RemoteKubernetesConfiguration{} + } + if configuration.Namespace == "" { + configuration.Namespace = defaultServiceAccountNamespace + } + if configuration.Name == "" { + configuration.Name = defaultServiceAccountName + } + if configuration.RESTConfig == nil { + if configuration.ContextName == "" { + return errors.New("either the name of a kubernetes context of a rest Config must be specified") + } + config, err := configFromContext(configuration.ContextName) + if err != nil { + return fmt.Errorf("getting REST configuration: %v", err) + } + configuration.RESTConfig = config + } + + cl, err := client.New(configuration.RESTConfig, client.Options{}) + if err != nil { + return fmt.Errorf("initializing client: %v", err) + } + + if configuration.EnsureNamespace { + if err := EnsureNamespace(ctx, configuration.Namespace, cl); err != nil { + return fmt.Errorf("ensuring namespace exists: %v", err) + } + } + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: configuration.Name + "-kubeconfig", + Namespace: configuration.Namespace, + }, + Data: map[string][]byte{ + "kubeconfig.yaml": data, + }, + } + _, err = controllerutil.CreateOrUpdate(ctx, cl, secret, func() error { + secret.Data = map[string][]byte{ + "kubeconfig.yaml": data, + } + return nil + }) + if err != nil { + return fmt.Errorf("creating kubeconfig file: %v", err) + } + return nil +} + +func CreateRemoteKubeconfig(ctx context.Context, configuration *RemoteKubernetesConfiguration) ([]byte, error) { + if configuration == nil { + configuration = &RemoteKubernetesConfiguration{} + } + if configuration.Namespace == "" { + configuration.Namespace = defaultServiceAccountNamespace + } + if configuration.Name == "" { + configuration.Name = defaultServiceAccountName + } + if configuration.RESTConfig == nil { + if configuration.ContextName == "" { + return nil, errors.New("either the name of a kubernetes context of a rest Config must be specified") + } + config, err := configFromContext(configuration.ContextName) + if err != nil { + return nil, fmt.Errorf("getting REST configuration: %v", err) + } + configuration.RESTConfig = config + } + if configuration.APIServer == "" { + configuration.APIServer = configuration.RESTConfig.Host + } + + cl, err := client.New(configuration.RESTConfig, client.Options{}) + if err != nil { + return nil, fmt.Errorf("initializing client: %v", err) + } + + if configuration.EnsureNamespace { + if err := EnsureNamespace(ctx, configuration.Namespace, cl); err != nil { + return nil, fmt.Errorf("ensuring namespace exists: %v", err) + } + } + + _, err = controllerutil.CreateOrUpdate(ctx, cl, &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: configuration.Name, + Namespace: configuration.Namespace, + }, + }, func() error { return nil }) + if err != nil { + return nil, fmt.Errorf("creating service account: %v", err) + } + + _, err = controllerutil.CreateOrUpdate(ctx, cl, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: configuration.Name + "-token", + Namespace: configuration.Namespace, + Annotations: map[string]string{ + corev1.ServiceAccountNameKey: configuration.Name, + }, + }, + Type: corev1.SecretTypeServiceAccountToken, + }, func() error { return nil }) + if err != nil { + return nil, fmt.Errorf("creating service account token: %v", err) + } + + trigger := time.After(0) + for { + select { + case <-trigger: + var token corev1.Secret + if err := cl.Get(ctx, types.NamespacedName{ + Namespace: configuration.Namespace, + Name: configuration.Name + "-token", + }, &token); err != nil { + return nil, err + } + data := token.Data + if data != nil { + token, hasToken := data["token"] + certificate, hasCertificate := data["ca.crt"] + if hasToken && hasCertificate { + var buf bytes.Buffer + err = kubeconfigTemplate.Execute(&buf, struct { + CA string + APIServer string + Cluster string + User string + Token string + }{ + CA: base64.StdEncoding.EncodeToString(certificate), + APIServer: configuration.APIServer, + Token: string(token), + User: configuration.Name, + Cluster: configuration.ContextName, + }) + if err != nil { + return nil, fmt.Errorf("creating kubeconfig file: %v", err) + } + return buf.Bytes(), nil + } + } + trigger = time.After(1 * time.Second) + case <-ctx.Done(): + return nil, fmt.Errorf("unable to get populated token") + } + } +} + +var kubeconfigTemplate = template.Must(template.New("").Parse(` +apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: {{.CA}} + server: {{.APIServer}} + name: {{.Cluster}} +contexts: +- context: + cluster: {{.Cluster}} + user: {{.User}} + name: {{.Cluster}} +current-context: {{.Cluster}} +kind: Config +preferences: {} +users: +- name: {{.User}} + user: + token: {{.Token}} +`)) + +func EnsureNamespace(ctx context.Context, namespace string, cl client.Client) error { + _, err := controllerutil.CreateOrUpdate(ctx, cl, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + }, func() error { return nil }) + return err +} diff --git a/pkg/multicluster/leaderelection/lock.go b/pkg/multicluster/leaderelection/lock.go new file mode 100644 index 000000000..3bf6154bd --- /dev/null +++ b/pkg/multicluster/leaderelection/lock.go @@ -0,0 +1,291 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package leaderelection + +import ( + "context" + "errors" + "io" + "log" + "sync" + "time" + + "go.etcd.io/raft/v3" +) + +const ( + defaultHeartbeatInterval = 1 * time.Second + defaultElectionTimeout = 10 * time.Second +) + +var discardLogger = &raft.DefaultLogger{Logger: log.New(io.Discard, "", 0)} + +type LeaderCallbacks struct { + SetLeader func(leader uint64) + OnStartedLeading func(ctx context.Context) + OnStoppedLeading func() +} + +type LockerNode struct { + ID uint64 + Address string +} + +type LockConfiguration struct { + ID uint64 + Address string + CA []byte + PrivateKey []byte + Certificate []byte + Meta []byte + Peers []LockerNode + Insecure bool + Fetcher KubeconfigFetcher + + ElectionTimeout time.Duration + HeartbeatInterval time.Duration + Logger raft.Logger +} + +func (c *LockConfiguration) validate() error { + if c.ID == 0 { + return errors.New("id must be specified") + } + if c.Address == "" { + return errors.New("address must be specified") + } + if !c.Insecure { + if len(c.CA) == 0 { + return errors.New("ca must be specified") + } + if len(c.PrivateKey) == 0 { + return errors.New("private key must be specified") + } + if len(c.Certificate) == 0 { + return errors.New("certificate must be specified") + } + } + if len(c.Peers) == 0 { + return errors.New("peers must be set") + } + + return nil +} + +func (n *LockerNode) asPeer() raft.Peer { + return raft.Peer{ + ID: n.ID, + Context: []byte(n.Address), + } +} + +func peersForNodes(nodes []LockerNode) map[uint64]string { + peers := make(map[uint64]string) + for _, node := range nodes { + peers[node.ID] = node.Address + } + return peers +} + +func asPeers(nodes []LockerNode) []raft.Peer { + peers := []raft.Peer{} + for _, node := range nodes { + peers = append(peers, node.asPeer()) + } + return peers +} + +func Run(ctx context.Context, config LockConfiguration, callbacks *LeaderCallbacks) error { + if err := config.validate(); err != nil { + return err + } + + nodes := peersForNodes(config.Peers) + var transport *grpcTransport + var err error + if config.Insecure { + transport, err = newInsecureGRPCTransport(config.Meta, config.Address, nodes, config.Fetcher) + } else { + transport, err = newGRPCTransport(config.Meta, config.Certificate, config.PrivateKey, config.CA, config.Address, nodes, config.Fetcher) + } + if err != nil { + return err + } + transport.logger = config.Logger + + for node, address := range nodes { + if config.Logger != nil { + config.Logger.Infof("node: %d, address: %s", node, address) + } + } + + errs := make(chan error, 2) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + if err := transport.Run(ctx); err != nil { + errs <- err + } + }() + go func() { + defer wg.Done() + if err := runRaft(ctx, transport, config, callbacks); err != nil { + errs <- err + } + }() + + select { + case err := <-errs: + cancel() + wg.Wait() + return err + case <-ctx.Done(): + config.Logger.Infof("context canceled, waiting for raft and transport to exit") + wg.Wait() + } + + return nil +} + +func runRaft(ctx context.Context, transport *grpcTransport, config LockConfiguration, callbacks *LeaderCallbacks) error { + defer config.Logger.Info("shutting down raft") + + storage := raft.NewMemoryStorage() + + if config.ElectionTimeout == 0 { + config.ElectionTimeout = defaultElectionTimeout + } + if config.HeartbeatInterval == 0 { + config.HeartbeatInterval = defaultHeartbeatInterval + } + if config.Logger == nil { + config.Logger = discardLogger + } + + config.Logger.Infof("starting node") + node := raft.StartNode(&raft.Config{ + ID: config.ID, + ElectionTick: int(config.ElectionTimeout.Milliseconds() / 10), + HeartbeatTick: int(config.HeartbeatInterval.Milliseconds() / 10), + Storage: storage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + CheckQuorum: true, + Logger: config.Logger, + }, asPeers(config.Peers)) + + transport.setNode(node) + + go func() { + compactions := 1000 // every 10 seconds + for { + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + node.Tick() + if compactions == 0 { + if err := storage.Compact(node.Status().Applied); err != nil { + config.Logger.Errorf("error compacting storage: %v", err) + } + compactions = 1000 + } + compactions-- + } + } + }() + + leaderCtx, leaderCancel := context.WithCancel(ctx) + + isLeader := false + initialized := false + for { + select { + case <-ctx.Done(): + leaderCancel() + if isLeader { + if callbacks.OnStoppedLeading != nil { + go callbacks.OnStoppedLeading() + } + } + config.Logger.Infof("context canceled, stopping node") + node.Stop() + return nil + case rd := <-node.Ready(): + // Observe soft state changes for leadership + var nowLeader bool + var leader uint64 + if rd.SoftState != nil { + leader = rd.SoftState.Lead + nowLeader = leader == config.ID || rd.SoftState.RaftState == raft.StateLeader + } else { + status := node.Status() + leader = status.Lead + nowLeader = leader == config.ID || status.RaftState == raft.StateLeader + } + + transport.leader.Store(leader) + transport.isLeader.Store(nowLeader) + + if callbacks != nil && callbacks.SetLeader != nil { + callbacks.SetLeader(leader) + } + + if nowLeader != isLeader || !initialized { + initialized = true + if nowLeader { + // just became leader, start things up + isLeader = true + if callbacks.OnStartedLeading != nil { + go callbacks.OnStartedLeading(leaderCtx) + } + } else { + // we became a follower + leaderCancel() + leaderCtx, leaderCancel = context.WithCancel(ctx) + if callbacks.OnStoppedLeading != nil { + go callbacks.OnStoppedLeading() + } + } + } + + // send out messages + _ = storage.Append(rd.Entries) + for _, msg := range rd.Messages { + if msg.To == config.ID { + if err := node.Step(ctx, msg); err != nil { + config.Logger.Errorf("error stepping node: %v", err) + } + continue + } + for { + applied, err := transport.DoSend(ctx, msg) + if err != nil { + config.Logger.Infof("unreachable %d: %v", msg.To, err) + node.ReportUnreachable(msg.To) + break + } + if !applied { + // attempt to apply again in a second + time.Sleep(1 * time.Second) + } else { + break + } + } + } + + node.Advance() + } + } +} diff --git a/pkg/multicluster/leaderelection/lock_test.go b/pkg/multicluster/leaderelection/lock_test.go new file mode 100644 index 000000000..d0f5f5e5d --- /dev/null +++ b/pkg/multicluster/leaderelection/lock_test.go @@ -0,0 +1,331 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package leaderelection + +import ( + "context" + "fmt" + "log" + "net" + "reflect" + "sync/atomic" + "testing" + "time" + + "go.etcd.io/raft/v3" + + "github.com/redpanda-data/redpanda-operator/pkg/multicluster/bootstrap" +) + +func TestLocker(t *testing.T) { + for name, tt := range map[string]struct { + nodes int + }{ + "13 node quorum": { + nodes: 13, + }, + } { + t.Run(name, func(t *testing.T) { + minQuorum := (tt.nodes + 1) / 2 + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Minute) + defer cancel() + + leaders := setupLockTest(t, ctx, tt.nodes) + + stopped := []*testLeader{} + + var currentLeader *testLeader + currentLeaders := leaders + // scale down til we get to the min followers + for len(currentLeaders) != (minQuorum - 1) { + currentLeader, currentLeaders = waitForAnyLeader(t, 15*time.Second, currentLeaders...) + currentLeader.Stop() + stopped = append(stopped, currentLeader) + t.Log("killing leader", currentLeader.config.ID) + } + + // restart and make sure that they become followers again + for _, leader := range stopped { + leader.Start(t, ctx) + } + + _, _ = waitForAnyLeader(t, 15*time.Second, leaders...) + }) + } +} + +type testLeader struct { + config LockConfiguration + + cancel context.CancelFunc + + leader chan struct{} + follower chan struct{} + onStop chan struct{} + err chan error + + stopped atomic.Bool + isLeader atomic.Bool + initialized atomic.Bool +} + +func newTestLeader(config LockConfiguration) *testLeader { + return &testLeader{ + config: config, + leader: make(chan struct{}, 1), + follower: make(chan struct{}, 1), + err: make(chan error, 1), + onStop: make(chan struct{}, 1), + } +} + +func (t *testLeader) OnStartedLeading(ctx context.Context) { + t.initialized.Store(true) + t.isLeader.Store(true) + select { + case t.leader <- struct{}{}: + default: + } +} + +func (t *testLeader) OnStoppedLeading() { + t.initialized.Store(true) + t.isLeader.Store(false) + + select { + case t.follower <- struct{}{}: + default: + } +} + +func (t *testLeader) IsLeader() bool { + return t.initialized.Load() && t.isLeader.Load() +} + +func (t *testLeader) IsFollower() bool { + return t.initialized.Load() && !t.isLeader.Load() +} + +func (t *testLeader) HandleError(err error) { + if err == nil { + return + } + + select { + case t.err <- err: + default: + } +} + +func (t *testLeader) IsStopped() bool { + return t.stopped.Load() +} + +func (t *testLeader) Start(tst *testing.T, ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + t.cancel = cancel + + t.stopped.Store(false) + t.initialized.Store(false) + t.isLeader.Store(false) + + go func() { + defer t.Stop() + defer tst.Log("leader election goroutine exiting") + + t.HandleError(Run(ctx, t.config, &LeaderCallbacks{ + OnStartedLeading: t.OnStartedLeading, + OnStoppedLeading: t.OnStoppedLeading, + })) + }() +} + +func (t *testLeader) Stop() { + t.cancel() + + select { + case t.onStop <- struct{}{}: + default: + } + t.stopped.Store(true) +} + +func (t *testLeader) WaitForLeader(tst *testing.T, timeout time.Duration) { + tst.Helper() + + select { + case <-t.leader: + case err := <-t.err: + tst.Fatalf("leader election failed: %v", err) + case <-time.After(timeout): + tst.Fatalf("timed out waiting for leader election to start") + } +} + +func (t *testLeader) WaitForFollower(tst *testing.T, timeout time.Duration) { + tst.Helper() + + select { + case <-t.follower: + case err := <-t.err: + tst.Fatalf("leader election failed: %v", err) + case <-time.After(timeout): + tst.Fatalf("timed out waiting for leader election to stop") + } +} + +func (t *testLeader) WaitForStopped(tst *testing.T, timeout time.Duration) { + tst.Helper() + + select { + case <-t.onStop: + case err := <-t.err: + tst.Fatalf("leader election failed: %v", err) + case <-time.After(timeout): + tst.Fatalf("timed out waiting for leader election to stop") + } +} + +func (t *testLeader) Error() error { + select { + case err := <-t.err: + return err + default: + return nil + } +} + +func waitForAnyLeader(t *testing.T, timeout time.Duration, leaders ...*testLeader) (*testLeader, []*testLeader) { + cases := make([]reflect.SelectCase, len(leaders)) + for i, leader := range leaders { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(leader.leader)} + } + timeoutCh := time.After(timeout) + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(timeoutCh)}) + chosen, _, _ := reflect.Select(cases) + if chosen == len(cases)-1 { + t.Fatalf("timed out waiting for leader to be elected") + } + + chosenLeader := leaders[chosen] + + followers := []*testLeader{} + pending := []*testLeader{} + for i, leader := range leaders { + if chosen == i { + continue + } + followers = append(followers, leader) + if leader.IsFollower() { + continue + } + pending = append(pending, leader) + } + + if len(pending) == 0 { + return chosenLeader, followers + } + + followed := 0 + cases = make([]reflect.SelectCase, len(pending)) + for i, leader := range pending { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(leader.follower)} + } + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(timeoutCh)}) + + for { + chosen, _, _ := reflect.Select(cases) + if chosen == len(cases)-1 { + t.Fatalf("timed out waiting for followers to become active") + } + followed++ + if followed >= len(pending) { + return chosenLeader, followers + } + } +} + +func setupLockTest(t *testing.T, ctx context.Context, n int) []*testLeader { + if n <= 0 { + t.Fatalf("at least one lock configuration is required") + } + + ports := getFreePorts(t, n) + + leaders := []*testLeader{} + nodes := []LockerNode{} + for i, port := range ports { + nodes = append(nodes, LockerNode{ + ID: uint64(i + 1), + Address: fmt.Sprintf("127.0.0.1:%d", port), + }) + } + + ca, err := bootstrap.GenerateCA("unit", "Root CA", nil) + if err != nil { + t.Fatalf("error generating ca: %v", err) + } + certificate, err := ca.Sign("127.0.0.1") + if err != nil { + t.Fatalf("error generating certificate: %v", err) + } + + configs := []LockConfiguration{} + for i, node := range nodes { + peers := []LockerNode{} + peers = append(peers, nodes...) + + configs = append(configs, LockConfiguration{ + ID: uint64(i + 1), + Address: node.Address, + Peers: peers, + CA: ca.Bytes(), + PrivateKey: certificate.PrivateKeyBytes(), + Certificate: certificate.Bytes(), + ElectionTimeout: 1 * time.Second, + HeartbeatInterval: 100 * time.Millisecond, + Logger: testLogger(t), + }) + } + + for _, config := range configs { + leader := newTestLeader(config) + leaders = append(leaders, leader) + + leader.Start(t, ctx) + } + + return leaders +} + +func getFreePorts(t *testing.T, n int) []int { + ports := make([]int, 0, n) + listeners := make([]net.Listener, 0, n) + + for i := 0; i < n; i++ { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("error getting free port: %v", err) + } + listeners = append(listeners, l) + ports = append(ports, l.Addr().(*net.TCPAddr).Port) + } + + for _, l := range listeners { + l.Close() + } + + return ports +} + +func testLogger(t *testing.T) raft.Logger { + return &raft.DefaultLogger{Logger: log.New(t.Output(), t.Name()+" ", log.LUTC)} +} diff --git a/pkg/multicluster/leaderelection/manager.go b/pkg/multicluster/leaderelection/manager.go new file mode 100644 index 000000000..a0147046d --- /dev/null +++ b/pkg/multicluster/leaderelection/manager.go @@ -0,0 +1,90 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package leaderelection + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/go-logr/logr" +) + +type runFactory func(ctx context.Context) error + +type LeaderManager struct { + leaderRoutines []func(ctx context.Context) error + + logger logr.Logger + + mutex sync.RWMutex + + isLeader atomic.Bool + runner runFactory +} + +func NewRaftLockManager(configuration LockConfiguration, setLeader func(uint64)) *LeaderManager { + manager := &LeaderManager{} + manager.runner = func(ctx context.Context) error { + return Run(ctx, configuration, &LeaderCallbacks{ + OnStartedLeading: manager.runLeaderRoutines, + OnStoppedLeading: func() { + manager.isLeader.Store(false) + }, + SetLeader: setLeader, + }) + } + + return manager +} + +func (lm *LeaderManager) runLeaderRoutines(ctx context.Context) { + lm.isLeader.Store(true) + + lm.mutex.RLock() + defer lm.mutex.RUnlock() + + for _, fn := range lm.leaderRoutines { + go func() { + for { + err := fn(ctx) + select { + case <-ctx.Done(): + return + default: + if err != nil { + lm.logger.Error(err, "error encountered on leader routine, restarting in 10 seconds") + } + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Second): + } + } + } + }() + } +} + +func (lm *LeaderManager) RegisterRoutine(fn func(ctx context.Context) error) { + lm.mutex.Lock() + defer lm.mutex.Unlock() + + lm.leaderRoutines = append(lm.leaderRoutines, fn) +} + +func (lm *LeaderManager) IsLeader() bool { + return lm.isLeader.Load() +} + +func (lm *LeaderManager) Run(ctx context.Context) error { + return lm.runner(ctx) +} diff --git a/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go new file mode 100644 index 000000000..9e07eb6de --- /dev/null +++ b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go @@ -0,0 +1,380 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc (unknown) +// source: transport/v1/message.proto + +package transportv1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CheckRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + FromLeader bool `protobuf:"varint,1,opt,name=fromLeader,proto3" json:"fromLeader,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CheckRequest) Reset() { + *x = CheckRequest{} + mi := &file_transport_v1_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CheckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CheckRequest) ProtoMessage() {} + +func (x *CheckRequest) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CheckRequest.ProtoReflect.Descriptor instead. +func (*CheckRequest) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{0} +} + +func (x *CheckRequest) GetFromLeader() bool { + if x != nil { + return x.FromLeader + } + return false +} + +type CheckResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + HasLeader bool `protobuf:"varint,1,opt,name=hasLeader,proto3" json:"hasLeader,omitempty"` + UnhealthyNodes []uint64 `protobuf:"varint,2,rep,packed,name=unhealthy_nodes,json=unhealthyNodes,proto3" json:"unhealthy_nodes,omitempty"` + Meta []byte `protobuf:"bytes,3,opt,name=meta,proto3" json:"meta,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CheckResponse) Reset() { + *x = CheckResponse{} + mi := &file_transport_v1_message_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CheckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CheckResponse) ProtoMessage() {} + +func (x *CheckResponse) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CheckResponse.ProtoReflect.Descriptor instead. +func (*CheckResponse) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{1} +} + +func (x *CheckResponse) GetHasLeader() bool { + if x != nil { + return x.HasLeader + } + return false +} + +func (x *CheckResponse) GetUnhealthyNodes() []uint64 { + if x != nil { + return x.UnhealthyNodes + } + return nil +} + +func (x *CheckResponse) GetMeta() []byte { + if x != nil { + return x.Meta + } + return nil +} + +type SendRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SendRequest) Reset() { + *x = SendRequest{} + mi := &file_transport_v1_message_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SendRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendRequest) ProtoMessage() {} + +func (x *SendRequest) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendRequest.ProtoReflect.Descriptor instead. +func (*SendRequest) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{2} +} + +func (x *SendRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +type SendResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Applied bool `protobuf:"varint,1,opt,name=applied,proto3" json:"applied,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SendResponse) Reset() { + *x = SendResponse{} + mi := &file_transport_v1_message_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SendResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendResponse) ProtoMessage() {} + +func (x *SendResponse) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendResponse.ProtoReflect.Descriptor instead. +func (*SendResponse) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{3} +} + +func (x *SendResponse) GetApplied() bool { + if x != nil { + return x.Applied + } + return false +} + +type KubeconfigRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KubeconfigRequest) Reset() { + *x = KubeconfigRequest{} + mi := &file_transport_v1_message_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KubeconfigRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KubeconfigRequest) ProtoMessage() {} + +func (x *KubeconfigRequest) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KubeconfigRequest.ProtoReflect.Descriptor instead. +func (*KubeconfigRequest) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{4} +} + +type KubeconfigResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KubeconfigResponse) Reset() { + *x = KubeconfigResponse{} + mi := &file_transport_v1_message_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KubeconfigResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KubeconfigResponse) ProtoMessage() {} + +func (x *KubeconfigResponse) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KubeconfigResponse.ProtoReflect.Descriptor instead. +func (*KubeconfigResponse) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{5} +} + +func (x *KubeconfigResponse) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +var File_transport_v1_message_proto protoreflect.FileDescriptor + +const file_transport_v1_message_proto_rawDesc = "" + + "\n" + + "\x1atransport/v1/message.proto\x12\ftransport.v1\".\n" + + "\fCheckRequest\x12\x1e\n" + + "\n" + + "fromLeader\x18\x01 \x01(\bR\n" + + "fromLeader\"j\n" + + "\rCheckResponse\x12\x1c\n" + + "\thasLeader\x18\x01 \x01(\bR\thasLeader\x12'\n" + + "\x0funhealthy_nodes\x18\x02 \x03(\x04R\x0eunhealthyNodes\x12\x12\n" + + "\x04meta\x18\x03 \x01(\fR\x04meta\"'\n" + + "\vSendRequest\x12\x18\n" + + "\apayload\x18\x01 \x01(\fR\apayload\"(\n" + + "\fSendResponse\x12\x18\n" + + "\aapplied\x18\x01 \x01(\bR\aapplied\"\x13\n" + + "\x11KubeconfigRequest\".\n" + + "\x12KubeconfigResponse\x12\x18\n" + + "\apayload\x18\x01 \x01(\fR\apayload2\xe4\x01\n" + + "\x10TransportService\x12@\n" + + "\x05Check\x12\x1a.transport.v1.CheckRequest\x1a\x1b.transport.v1.CheckResponse\x12=\n" + + "\x04Send\x12\x19.transport.v1.SendRequest\x1a\x1a.transport.v1.SendResponse\x12O\n" + + "\n" + + "Kubeconfig\x12\x1f.transport.v1.KubeconfigRequest\x1a .transport.v1.KubeconfigResponseB\xe0\x01\n" + + "\x10com.transport.v1B\fMessageProtoP\x01Zmgithub.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1;transportv1\xa2\x02\x03TXX\xaa\x02\fTransport.V1\xca\x02\fTransport\\V1\xe2\x02\x18Transport\\V1\\GPBMetadata\xea\x02\rTransport::V1b\x06proto3" + +var ( + file_transport_v1_message_proto_rawDescOnce sync.Once + file_transport_v1_message_proto_rawDescData []byte +) + +func file_transport_v1_message_proto_rawDescGZIP() []byte { + file_transport_v1_message_proto_rawDescOnce.Do(func() { + file_transport_v1_message_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_transport_v1_message_proto_rawDesc), len(file_transport_v1_message_proto_rawDesc))) + }) + return file_transport_v1_message_proto_rawDescData +} + +var file_transport_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_transport_v1_message_proto_goTypes = []any{ + (*CheckRequest)(nil), // 0: transport.v1.CheckRequest + (*CheckResponse)(nil), // 1: transport.v1.CheckResponse + (*SendRequest)(nil), // 2: transport.v1.SendRequest + (*SendResponse)(nil), // 3: transport.v1.SendResponse + (*KubeconfigRequest)(nil), // 4: transport.v1.KubeconfigRequest + (*KubeconfigResponse)(nil), // 5: transport.v1.KubeconfigResponse +} +var file_transport_v1_message_proto_depIdxs = []int32{ + 0, // 0: transport.v1.TransportService.Check:input_type -> transport.v1.CheckRequest + 2, // 1: transport.v1.TransportService.Send:input_type -> transport.v1.SendRequest + 4, // 2: transport.v1.TransportService.Kubeconfig:input_type -> transport.v1.KubeconfigRequest + 1, // 3: transport.v1.TransportService.Check:output_type -> transport.v1.CheckResponse + 3, // 4: transport.v1.TransportService.Send:output_type -> transport.v1.SendResponse + 5, // 5: transport.v1.TransportService.Kubeconfig:output_type -> transport.v1.KubeconfigResponse + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_transport_v1_message_proto_init() } +func file_transport_v1_message_proto_init() { + if File_transport_v1_message_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_transport_v1_message_proto_rawDesc), len(file_transport_v1_message_proto_rawDesc)), + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_transport_v1_message_proto_goTypes, + DependencyIndexes: file_transport_v1_message_proto_depIdxs, + MessageInfos: file_transport_v1_message_proto_msgTypes, + }.Build() + File_transport_v1_message_proto = out.File + file_transport_v1_message_proto_goTypes = nil + file_transport_v1_message_proto_depIdxs = nil +} diff --git a/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go new file mode 100644 index 000000000..8d96b194c --- /dev/null +++ b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go @@ -0,0 +1,197 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.0 +// - protoc (unknown) +// source: transport/v1/message.proto + +package transportv1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + TransportService_Check_FullMethodName = "/transport.v1.TransportService/Check" + TransportService_Send_FullMethodName = "/transport.v1.TransportService/Send" + TransportService_Kubeconfig_FullMethodName = "/transport.v1.TransportService/Kubeconfig" +) + +// TransportServiceClient is the client API for TransportService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TransportServiceClient interface { + Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*CheckResponse, error) + Send(ctx context.Context, in *SendRequest, opts ...grpc.CallOption) (*SendResponse, error) + Kubeconfig(ctx context.Context, in *KubeconfigRequest, opts ...grpc.CallOption) (*KubeconfigResponse, error) +} + +type transportServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTransportServiceClient(cc grpc.ClientConnInterface) TransportServiceClient { + return &transportServiceClient{cc} +} + +func (c *transportServiceClient) Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*CheckResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(CheckResponse) + err := c.cc.Invoke(ctx, TransportService_Check_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *transportServiceClient) Send(ctx context.Context, in *SendRequest, opts ...grpc.CallOption) (*SendResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(SendResponse) + err := c.cc.Invoke(ctx, TransportService_Send_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *transportServiceClient) Kubeconfig(ctx context.Context, in *KubeconfigRequest, opts ...grpc.CallOption) (*KubeconfigResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(KubeconfigResponse) + err := c.cc.Invoke(ctx, TransportService_Kubeconfig_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TransportServiceServer is the server API for TransportService service. +// All implementations must embed UnimplementedTransportServiceServer +// for forward compatibility. +type TransportServiceServer interface { + Check(context.Context, *CheckRequest) (*CheckResponse, error) + Send(context.Context, *SendRequest) (*SendResponse, error) + Kubeconfig(context.Context, *KubeconfigRequest) (*KubeconfigResponse, error) + mustEmbedUnimplementedTransportServiceServer() +} + +// UnimplementedTransportServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedTransportServiceServer struct{} + +func (UnimplementedTransportServiceServer) Check(context.Context, *CheckRequest) (*CheckResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Check not implemented") +} +func (UnimplementedTransportServiceServer) Send(context.Context, *SendRequest) (*SendResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedTransportServiceServer) Kubeconfig(context.Context, *KubeconfigRequest) (*KubeconfigResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Kubeconfig not implemented") +} +func (UnimplementedTransportServiceServer) mustEmbedUnimplementedTransportServiceServer() {} +func (UnimplementedTransportServiceServer) testEmbeddedByValue() {} + +// UnsafeTransportServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TransportServiceServer will +// result in compilation errors. +type UnsafeTransportServiceServer interface { + mustEmbedUnimplementedTransportServiceServer() +} + +func RegisterTransportServiceServer(s grpc.ServiceRegistrar, srv TransportServiceServer) { + // If the following call panics, it indicates UnimplementedTransportServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&TransportService_ServiceDesc, srv) +} + +func _TransportService_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TransportServiceServer).Check(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TransportService_Check_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TransportServiceServer).Check(ctx, req.(*CheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _TransportService_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SendRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TransportServiceServer).Send(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TransportService_Send_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TransportServiceServer).Send(ctx, req.(*SendRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _TransportService_Kubeconfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(KubeconfigRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TransportServiceServer).Kubeconfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TransportService_Kubeconfig_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TransportServiceServer).Kubeconfig(ctx, req.(*KubeconfigRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// TransportService_ServiceDesc is the grpc.ServiceDesc for TransportService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TransportService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "transport.v1.TransportService", + HandlerType: (*TransportServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Check", + Handler: _TransportService_Check_Handler, + }, + { + MethodName: "Send", + Handler: _TransportService_Send_Handler, + }, + { + MethodName: "Kubeconfig", + Handler: _TransportService_Kubeconfig_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "transport/v1/message.proto", +} diff --git a/pkg/multicluster/leaderelection/proto/transport/v1/message.proto b/pkg/multicluster/leaderelection/proto/transport/v1/message.proto new file mode 100644 index 000000000..ec63f62cf --- /dev/null +++ b/pkg/multicluster/leaderelection/proto/transport/v1/message.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package transport.v1; + +message CheckRequest { + bool fromLeader = 1; +} +message CheckResponse { + bool hasLeader = 1; + repeated uint64 unhealthy_nodes = 2; + bytes meta = 3; +} + +message SendRequest { + bytes payload = 1; +} + +message SendResponse { + bool applied = 1; +} + +message KubeconfigRequest {} + +message KubeconfigResponse { + bytes payload = 1; +} + +service TransportService { + rpc Check(CheckRequest) returns (CheckResponse); + rpc Send(SendRequest) returns (SendResponse); + rpc Kubeconfig(KubeconfigRequest) returns (KubeconfigResponse); +} \ No newline at end of file diff --git a/pkg/multicluster/leaderelection/raft.go b/pkg/multicluster/leaderelection/raft.go new file mode 100644 index 000000000..f6fcb3a65 --- /dev/null +++ b/pkg/multicluster/leaderelection/raft.go @@ -0,0 +1,341 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package leaderelection + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "sync" + "sync/atomic" + + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + transportv1 "github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1" +) + +type peer struct { + addr string + client transportv1.TransportServiceClient +} + +func newPeer(addr string, credentials credentials.TransportCredentials) (*peer, error) { + if credentials == nil { + credentials = insecure.NewCredentials() + } + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(credentials)) + if err != nil { + return nil, err + } + + return &peer{ + addr: addr, + client: transportv1.NewTransportServiceClient(conn), + }, nil +} + +func InsecureClientFor(config LockConfiguration, node LockerNode) (transportv1.TransportServiceClient, error) { + var err error + var credentials credentials.TransportCredentials + + if config.Insecure { + credentials = insecure.NewCredentials() + } else { + credentials, err = clientTLSConfig(config.Certificate, config.PrivateKey, config.CA, true) + } + + if err != nil { + return nil, fmt.Errorf("unable to initialize client credentials: %w", err) + } + + conn, err := grpc.NewClient(node.Address, grpc.WithTransportCredentials(credentials)) + if err != nil { + return nil, err + } + return transportv1.NewTransportServiceClient(conn), nil +} + +func ClientFor(config LockConfiguration, node LockerNode) (transportv1.TransportServiceClient, error) { + var err error + var credentials credentials.TransportCredentials + + if config.Insecure { + credentials = insecure.NewCredentials() + } else { + credentials, err = clientTLSConfig(config.Certificate, config.PrivateKey, config.CA) + } + + if err != nil { + return nil, fmt.Errorf("unable to initialize client credentials: %w", err) + } + + conn, err := grpc.NewClient(node.Address, grpc.WithTransportCredentials(credentials)) + if err != nil { + return nil, err + } + return transportv1.NewTransportServiceClient(conn), nil +} + +type KubeconfigFetcher interface { + Fetch(context.Context) ([]byte, error) +} + +type KubeconfigFetcherFn func(context.Context) ([]byte, error) + +func (fn KubeconfigFetcherFn) Fetch(ctx context.Context) ([]byte, error) { + return fn(ctx) +} + +type grpcTransport struct { + addr string + peers map[uint64]*peer + meta []byte + + leader atomic.Uint64 + isLeader atomic.Bool + + node raft.Node + nodeLock sync.RWMutex + + kubeconfigFetcher KubeconfigFetcher + + serverCredentials credentials.TransportCredentials + clientCredentials credentials.TransportCredentials + + logger raft.Logger + + transportv1.UnimplementedTransportServiceServer +} + +func newGRPCTransport(meta []byte, certPEM, keyPEM, caPEM []byte, addr string, peers map[uint64]string, fetcher KubeconfigFetcher) (*grpcTransport, error) { + serverCredentials, err := serverTLSConfig(certPEM, keyPEM, caPEM) + if err != nil { + return nil, fmt.Errorf("unable to initialize server credentials: %w", err) + } + clientCredentials, err := clientTLSConfig(certPEM, keyPEM, caPEM) + if err != nil { + return nil, fmt.Errorf("unable to initialize client credentials: %w", err) + } + + initializedPeers := make(map[uint64]*peer, len(peers)) + for id, peer := range peers { + initialized, err := newPeer(peer, clientCredentials) + if err != nil { + return nil, err + } + initializedPeers[id] = initialized + } + + return &grpcTransport{ + meta: meta, + addr: addr, + peers: initializedPeers, + serverCredentials: serverCredentials, + clientCredentials: clientCredentials, + kubeconfigFetcher: fetcher, + }, nil +} + +func newInsecureGRPCTransport(meta []byte, addr string, peers map[uint64]string, fetcher KubeconfigFetcher) (*grpcTransport, error) { + initializedPeers := make(map[uint64]*peer, len(peers)) + for id, peer := range peers { + initialized, err := newPeer(peer, nil) + if err != nil { + return nil, err + } + initializedPeers[id] = initialized + } + + return &grpcTransport{ + meta: meta, + addr: addr, + peers: initializedPeers, + kubeconfigFetcher: fetcher, + }, nil +} + +func (t *grpcTransport) setNode(node raft.Node) { + t.nodeLock.Lock() + defer t.nodeLock.Unlock() + t.node = node +} + +func (t *grpcTransport) getNode() raft.Node { + t.nodeLock.RLock() + defer t.nodeLock.RUnlock() + return t.node +} + +func (t *grpcTransport) DoSend(ctx context.Context, msg raftpb.Message) (bool, error) { + peer, ok := t.peers[msg.To] + if !ok { + return false, fmt.Errorf("unknown peer %d", msg.To) + } + + data, err := msg.Marshal() + if err != nil { + return false, fmt.Errorf("marshaling message for peer %q: %w", peer.addr, err) + } + + resp, err := peer.client.Send(ctx, &transportv1.SendRequest{ + Payload: data, + }) + if err != nil { + return false, fmt.Errorf("sending to peer %q: %w", peer.addr, err) + } + + return resp.Applied, nil +} + +func (t *grpcTransport) Send(ctx context.Context, req *transportv1.SendRequest) (*transportv1.SendResponse, error) { + if node := t.getNode(); node != nil { + var msg raftpb.Message + if err := msg.Unmarshal(req.Payload); err != nil { + return &transportv1.SendResponse{Applied: false}, nil + } + if err := node.Step(ctx, msg); err == nil { + return &transportv1.SendResponse{Applied: true}, nil + } + } + return &transportv1.SendResponse{Applied: false}, nil +} + +func (t *grpcTransport) Check(ctx context.Context, req *transportv1.CheckRequest) (*transportv1.CheckResponse, error) { + leader := t.leader.Load() + isLeader := t.isLeader.Load() + if leader != 0 { + response := &transportv1.CheckResponse{HasLeader: true, Meta: t.meta} + if !req.FromLeader && isLeader { + for id, peer := range t.peers { + followerResp, err := peer.client.Check(ctx, &transportv1.CheckRequest{ + FromLeader: true, + }) + if err != nil || !followerResp.HasLeader { + response.UnhealthyNodes = append(response.UnhealthyNodes, id) + } + } + } else if !req.FromLeader { + peer, ok := t.peers[leader] + if !ok { + return &transportv1.CheckResponse{HasLeader: false, Meta: t.meta}, nil + } + return peer.client.Check(ctx, &transportv1.CheckRequest{}) + } + + return response, nil + } + + return &transportv1.CheckResponse{HasLeader: false, Meta: t.meta}, nil +} + +func (t *grpcTransport) Kubeconfig(ctx context.Context, req *transportv1.KubeconfigRequest) (*transportv1.KubeconfigResponse, error) { + if t.kubeconfigFetcher == nil { + return nil, status.Errorf(codes.FailedPrecondition, "no kubeconfig fetcher specified") + } + data, err := t.kubeconfigFetcher.Fetch(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "%s", err.Error()) + } + + return &transportv1.KubeconfigResponse{Payload: data}, nil +} + +func (t *grpcTransport) Run(ctx context.Context) error { + defer t.logger.Info("shutting down grpc transport") + + credentials := t.serverCredentials + if credentials == nil { + credentials = insecure.NewCredentials() + } + + server := grpc.NewServer(grpc.Creds(credentials)) + transportv1.RegisterTransportServiceServer(server, t) + + lis, err := net.Listen("tcp", t.addr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", t.addr, err) + } + defer lis.Close() + + done := make(chan struct{}) + errs := make(chan error, 1) + go func() { + defer close(done) + + if err := server.Serve(lis); err != nil { + errs <- err + } + }() + + select { + case <-ctx.Done(): + server.GracefulStop() + select { + case err := <-errs: + return err + default: + return nil + } + case err := <-errs: + server.Stop() + return err + } +} + +func serverTLSConfig(certPEM, keyPEM, caPEM []byte) (credentials.TransportCredentials, error) { + certificate, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("failed to load server certificate: %w", err) + } + + capool := x509.NewCertPool() + if !capool.AppendCertsFromPEM(caPEM) { + return nil, fmt.Errorf("unable to append the CA certificate to CA pool") + } + + tlsConfig := &tls.Config{ // nolint:gosec // linter complains about TLS min version, we pin all our certs here though, so ignore it + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{certificate}, + ClientCAs: capool, + } + return credentials.NewTLS(tlsConfig), nil +} + +func clientTLSConfig(certPEM, keyPEM, caPEM []byte, insecure ...bool) (credentials.TransportCredentials, error) { + isInsecure := false + if len(insecure) > 0 { + isInsecure = insecure[0] + } + + certificate, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("failed to load client certificate: %w", err) + } + + capool := x509.NewCertPool() + if !capool.AppendCertsFromPEM(caPEM) { + return nil, fmt.Errorf("unable to append the CA certificate to CA pool") + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{certificate}, + RootCAs: capool, + InsecureSkipVerify: isInsecure, // nolint:gosec + } + return credentials.NewTLS(tlsConfig), nil +} diff --git a/pkg/multicluster/load.go b/pkg/multicluster/load.go new file mode 100644 index 000000000..26b625a80 --- /dev/null +++ b/pkg/multicluster/load.go @@ -0,0 +1,36 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "os" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func loadKubeconfig(file string) (*rest.Config, error) { + kubeconfigYAML, err := os.ReadFile(file) + if err != nil { + return nil, err + } + + return loadKubeconfigFromBytes(kubeconfigYAML) +} + +func loadKubeconfigFromBytes(kubeconfigYAML []byte) (*rest.Config, error) { + kubeconfig, err := clientcmd.Load(kubeconfigYAML) + if err != nil { + return nil, err + } + + clientConfig := clientcmd.NewNonInteractiveClientConfig(*kubeconfig, kubeconfig.CurrentContext, &clientcmd.ConfigOverrides{}, nil) + return clientConfig.ClientConfig() +} diff --git a/pkg/multicluster/logger.go b/pkg/multicluster/logger.go new file mode 100644 index 000000000..4a4c6003a --- /dev/null +++ b/pkg/multicluster/logger.go @@ -0,0 +1,94 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "errors" + "fmt" + "os" + + "github.com/go-logr/logr" + "go.etcd.io/raft/v3" +) + +type raftLogr struct { + logger logr.Logger +} + +func (r *raftLogr) Debug(v ...any) { + r.logger.V(1).Info("DEBUG", v...) +} + +func (r *raftLogr) Debugf(format string, v ...any) { + if format == "" { + r.logger.V(0).Info("DEBUG", v...) + } else { + r.logger.V(0).Info(fmt.Sprintf("[DEBUG] %s", fmt.Sprintf(format, v...))) + } +} + +func (r *raftLogr) Error(v ...any) { + r.logger.Error(errors.New("an error occurred"), "ERROR", v...) +} + +func (r *raftLogr) Errorf(format string, v ...any) { + if format == "" { + r.logger.Error(errors.New("an error occurred"), "ERROR", v...) + } else { + text := fmt.Sprintf(format, v...) + r.logger.Error(errors.New(text), text) + } +} + +func (r *raftLogr) Info(v ...any) { + r.logger.V(0).Info("INFO", v...) +} + +func (r *raftLogr) Infof(format string, v ...any) { + if format == "" { + r.logger.V(0).Info("INFO", v...) + } else { + r.logger.V(0).Info(fmt.Sprintf("[INFO] %s", fmt.Sprintf(format, v...))) + } +} + +func (r *raftLogr) Warning(v ...any) { + r.logger.V(0).Info("WARN", v...) +} + +func (r *raftLogr) Warningf(format string, v ...any) { + if format == "" { + r.logger.V(0).Info("WARN", v...) + } else { + r.logger.V(0).Info(fmt.Sprintf("[WARN] %s", fmt.Sprintf(format, v...))) + } +} + +func (r *raftLogr) Fatal(v ...any) { + r.Error(v...) + os.Exit(1) +} + +func (r *raftLogr) Fatalf(format string, v ...any) { + r.Errorf(format, v...) + os.Exit(1) +} + +func (r *raftLogr) Panic(v ...any) { + r.Error(v...) + panic("unexpected error") +} + +func (r *raftLogr) Panicf(format string, v ...any) { + r.Errorf(format, v...) + panic(fmt.Sprintf(format, v...)) +} + +var _ raft.Logger = (*raftLogr)(nil) diff --git a/pkg/multicluster/manager.go b/pkg/multicluster/manager.go new file mode 100644 index 000000000..84b6b3de1 --- /dev/null +++ b/pkg/multicluster/manager.go @@ -0,0 +1,25 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/cluster" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" +) + +type Manager interface { + mcmanager.Manager + GetLeader() string + GetClusterNames() []string + // the context passed here, when canceled will stop the cluster + AddOrReplaceCluster(ctx context.Context, clusterName string, cl cluster.Cluster) error +} diff --git a/pkg/multicluster/raft.go b/pkg/multicluster/raft.go new file mode 100644 index 000000000..1ec872d1f --- /dev/null +++ b/pkg/multicluster/raft.go @@ -0,0 +1,428 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "context" + "errors" + "hash/fnv" + "os" + "sort" + "sync/atomic" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + "sigs.k8s.io/multicluster-runtime/providers/clusters" + + "github.com/redpanda-data/redpanda-operator/pkg/multicluster/bootstrap" + "github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection" + transportv1 "github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1" +) + +func stringToHash(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write([]byte(s)) + return h.Sum64() +} + +type RaftCluster struct { + Name string + Address string + KubeconfigFile string +} + +type RaftConfiguration struct { + Name string + Address string + Peers []RaftCluster + ElectionTimeout time.Duration + HeartbeatInterval time.Duration + Meta []byte + + Scheme *runtime.Scheme + Logger logr.Logger + Metrics bool + RestConfig *rest.Config + + // the are only used when the Insecure flag is set to false + Insecure bool + CAFile string + PrivateKeyFile string + CertificateFile string + + // these are used when bootstrapping mode is enabled + Bootstrap bool + KubernetesAPIServer string + KubeconfigNamespace string + KubeconfigName string +} + +func (r RaftConfiguration) validate() error { + if r.Name == "" { + return errors.New("name must be specified") + } + if r.Address == "" { + return errors.New("address must be specified") + } + if !r.Insecure { + if len(r.CAFile) == 0 { + return errors.New("ca must be specified") + } + if len(r.PrivateKeyFile) == 0 { + return errors.New("private key must be specified") + } + if len(r.CertificateFile) == 0 { + return errors.New("certificate must be specified") + } + } + if len(r.Peers) == 0 { + return errors.New("peers must be set") + } + + return nil +} + +func NewRaftRuntimeManager(config RaftConfiguration) (Manager, error) { + if err := config.validate(); err != nil { + return nil, err + } + + restConfig := config.RestConfig + if restConfig == nil { + var err error + restConfig, err = ctrl.GetConfig() + if err != nil { + return nil, err + } + } + + peers := []string{} + peerAddresses := []string{} + for _, peer := range config.Peers { + peers = append(peers, peer.Name) + peerAddresses = append(peerAddresses, peer.Address) + } + + config.Logger.Info("initializing raft-based runtime manager", "node", config.Name, "peers", peers, "peerAddresses", peerAddresses) + raftPeers := []leaderelection.LockerNode{} + idsToNames := map[uint64]string{} + clusterProvider := clusters.New() + for _, peer := range config.Peers { + id := stringToHash(peer.Name) + raftPeers = append(raftPeers, leaderelection.LockerNode{ + ID: id, + Address: peer.Address, + }) + idsToNames[id] = peer.Name + + if peer.Name == config.Name { + continue + } + + if peer.KubeconfigFile != "" { + kubeConfig, err := loadKubeconfig(peer.KubeconfigFile) + if err != nil { + return nil, err + } + c, err := cluster.New(kubeConfig) + if err != nil { + return nil, err + } + if err := clusterProvider.Add(context.Background(), peer.Name, c); err != nil { + return nil, err + } + } + } + + raftConfig := leaderelection.LockConfiguration{ + ID: stringToHash(config.Name), + Address: config.Address, + Peers: raftPeers, + Meta: config.Meta, + Insecure: config.Insecure, + ElectionTimeout: config.ElectionTimeout, + HeartbeatInterval: config.HeartbeatInterval, + Logger: &raftLogr{logger: config.Logger}, + } + + if config.Bootstrap { + raftConfig.Fetcher = leaderelection.KubeconfigFetcherFn(func(ctx context.Context) ([]byte, error) { + data, err := bootstrap.CreateRemoteKubeconfig(ctx, &bootstrap.RemoteKubernetesConfiguration{ + RESTConfig: restConfig, + APIServer: config.KubernetesAPIServer, + Namespace: config.KubeconfigNamespace, + Name: config.KubeconfigName, + }) + if err != nil { + return nil, err + } + + return data, nil + }) + } + + if !config.Insecure { + var err error + + raftConfig.CA, err = os.ReadFile(config.CAFile) + if err != nil { + return nil, err + } + + raftConfig.Certificate, err = os.ReadFile(config.CertificateFile) + if err != nil { + return nil, err + } + + raftConfig.PrivateKey, err = os.ReadFile(config.PrivateKeyFile) + if err != nil { + return nil, err + } + } + + opts := manager.Options{ + Scheme: config.Scheme, + LeaderElection: false, + Logger: config.Logger, + } + if !config.Metrics { + opts.Metrics = server.Options{ + BindAddress: "0", + } + } + + var currentLeader atomic.Uint64 + lockManager := leaderelection.NewRaftLockManager(raftConfig, func(leader uint64) { + currentLeader.Store(leader) + }) + + restart := make(chan struct{}, 1) + + if config.Bootstrap { + for i, peer := range config.Peers { + if peer.Name != config.Name && peer.KubeconfigFile == "" { + config.Logger.Info("registering leader routine", "peer", peer.Name) + lockManager.RegisterRoutine(func(ctx context.Context) error { + config.Logger.Info("fetching client for peer", "peer", peer.Name) + client, err := leaderelection.ClientFor(raftConfig, raftConfig.Peers[i]) + if err != nil { + config.Logger.Error(err, "fetching client for peer", "peer", peer.Name) + return err + } + config.Logger.Info("fetching kubeconfig for peer", "peer", peer.Name) + response, err := client.Kubeconfig(ctx, &transportv1.KubeconfigRequest{}) + if err != nil { + config.Logger.Error(err, "fetching kubeconfig for peer", "peer", peer.Name) + return err + } + + config.Logger.Info("loading kubeconfig for peer", "peer", peer.Name) + kubeConfig, err := loadKubeconfigFromBytes(response.Payload) + if err != nil { + config.Logger.Error(err, "loading kubeconfig for peer", "peer", peer.Name) + return err + } + config.Logger.Info("initializing cluster for peer", "peer", peer.Name) + c, err := cluster.New(kubeConfig, func(o *cluster.Options) { + o.Scheme = config.Scheme + o.Logger = config.Logger + }) + if err != nil { + config.Logger.Error(err, "initializing cluster for peer", "peer", peer.Name) + return err + } + + config.Logger.Info("adding cluster for peer", "peer", peer.Name) + if err := clusterProvider.AddOrReplace(ctx, peer.Name, c, nil); err != nil { + config.Logger.Error(err, "adding cluster for peer", "peer", peer.Name) + return err + } + select { + case restart <- struct{}{}: + default: + } + + <-ctx.Done() + return nil + }) + } + } + } + + manager, err := newManager(config.Logger, restConfig, clusterProvider, restart, func() string { + return idsToNames[currentLeader.Load()] + }, func() map[string]cluster.Cluster { + clusters := map[string]cluster.Cluster{} + for _, name := range clusterProvider.ClusterNames() { + if c, err := clusterProvider.Get(context.Background(), name); err == nil { + clusters[name] = c + } + } + return clusters + }, func(ctx context.Context, clusterName string, cl cluster.Cluster) error { + if err := clusterProvider.AddOrReplace(ctx, clusterName, cl, nil); err != nil { + return err + } + select { + case restart <- struct{}{}: + default: + } + return nil + }, lockManager, opts) + if err != nil { + return nil, err + } + + return manager, nil +} + +type raftManager struct { + mcmanager.Manager + runnable *leaderRunnable + logger logr.Logger + getLeader func() string + getClusters func() map[string]cluster.Cluster + addOrReplaceCluster func(ctx context.Context, clusterName string, cl cluster.Cluster) error +} + +func (m *raftManager) AddOrReplaceCluster(ctx context.Context, clusterName string, cl cluster.Cluster) error { + return m.addOrReplaceCluster(ctx, clusterName, cl) +} + +func (m *raftManager) GetClusterNames() []string { + clusters := []string{mcmanager.LocalCluster} + if m.getClusters == nil { + return clusters + } + + for cluster := range m.getClusters() { + clusters = append(clusters, cluster) + } + sort.Strings(clusters) + return clusters +} + +func (m *raftManager) GetLeader() string { + if m.getLeader == nil { + return "" + } + return m.getLeader() +} + +func newManager(logger logr.Logger, config *rest.Config, provider multicluster.Provider, restart chan struct{}, getLeader func() string, getClusters func() map[string]cluster.Cluster, addOrReplaceCluster func(ctx context.Context, clusterName string, cl cluster.Cluster) error, manager *leaderelection.LeaderManager, opts manager.Options) (Manager, error) { + mgr, err := mcmanager.New(config, provider, opts) + if err != nil { + return nil, err + } + + manager.RegisterRoutine(func(ctx context.Context) error { + logger.Info("got leader") + <-ctx.Done() + logger.Info("lost leader") + return nil + }) + + runnable := &leaderRunnable{manager: manager, logger: logger, restart: restart, getClusters: getClusters} + if err := mgr.Add(runnable); err != nil { + return nil, err + } + return &raftManager{Manager: mgr, runnable: runnable, logger: logger, getLeader: getLeader, getClusters: getClusters, addOrReplaceCluster: addOrReplaceCluster}, nil +} + +func (m *raftManager) Add(r mcmanager.Runnable) error { + if _, ok := r.(reconcile.TypedReconciler[mcreconcile.Request]); ok { + m.logger.Info("adding multicluster reconciler") + m.runnable.Add(r) + return nil + } + + if _, ok := r.(manager.LeaderElectionRunnable); ok { + m.logger.Info("adding leader election runnable") + m.runnable.Add(r) + return nil + } + + return m.Manager.Add(r) +} + +type warmupRunnable interface { + Warmup(context.Context) error +} + +type leaderRunnable struct { + runnables []mcmanager.Runnable + manager *leaderelection.LeaderManager + logger logr.Logger + restart chan struct{} + getClusters func() map[string]cluster.Cluster +} + +func (l *leaderRunnable) Add(r mcmanager.Runnable) { + doEngage := func() { + for name, cluster := range l.getClusters() { + // engage any static clusters + _ = r.Engage(context.Background(), name, cluster) + } + } + + l.runnables = append(l.runnables, r) + if warmup, ok := r.(warmupRunnable); ok { + // start caches and sources + l.manager.RegisterRoutine(l.wrapStart(doEngage, warmup.Warmup)) + } + l.manager.RegisterRoutine(l.wrapStart(doEngage, r.Start)) +} + +func (l *leaderRunnable) Engage(ctx context.Context, s string, c cluster.Cluster) error { + for _, runnable := range l.runnables { + if err := runnable.Engage(ctx, s, c); err != nil { + l.logger.Info("engaging runnable") + return err + } + } + return nil +} + +func (l *leaderRunnable) Start(ctx context.Context) error { + return l.manager.Run(ctx) +} + +func (l *leaderRunnable) wrapStart(doEngage func(), fn func(context.Context) error) func(context.Context) error { + return func(ctx context.Context) error { + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + doEngage() + + go func() { + for { + select { + case <-cancelCtx.Done(): + return + case <-l.restart: + // re-engage + doEngage() + } + } + }() + + return fn(cancelCtx) + } +} diff --git a/pkg/multicluster/singlecluster.go b/pkg/multicluster/singlecluster.go new file mode 100644 index 000000000..6896255f7 --- /dev/null +++ b/pkg/multicluster/singlecluster.go @@ -0,0 +1,51 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "context" + "errors" + + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/manager" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" +) + +type singleClusterManager struct { + mcmanager.Manager +} + +func (s *singleClusterManager) GetLeader() string { + return mcmanager.LocalCluster +} + +func (s *singleClusterManager) GetClusterNames() []string { + return []string{mcmanager.LocalCluster} +} + +func (s *singleClusterManager) AddOrReplaceCluster(_ context.Context, _ string, _ cluster.Cluster) error { + return errors.New("adding a cluster not supported in single cluster mode") +} + +func NewSingleClusterManager(config *rest.Config, opts manager.Options) (Manager, error) { + mgr, err := ctrl.NewManager(config, opts) + if err != nil { + return nil, err + } + + mcmgr, err := mcmanager.WithMultiCluster(mgr, nil) + if err != nil { + return nil, err + } + + return &singleClusterManager{mcmgr}, nil +}