diff --git a/Taskfile.yml b/Taskfile.yml index a10289200..ad960093b 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -218,14 +218,18 @@ tasks: TEST_REDPANDA_VERSION: '{{ .TEST_REDPANDA_VERSION | default .DEFAULT_TEST_REDPANDA_VERSION }}' TEST_UPGRADE_REDPANDA_VERSION: '{{ .TEST_UPGRADE_REDPANDA_VERSION | default .DEFAULT_TEST_UPGRADE_REDPANDA_VERSION }}' TEST_UPGRADE_OPERATOR_VERSION: '{{ .TEST_UPGRADE_OPERATOR_VERSION | default .DEFAULT_TEST_UPGRADE_OPERATOR_VERSION }}' + IMAGES: + - quay.io/jetstack/cert-manager-controller:{{.TEST_CERTMANAGER_VERSION}} + - quay.io/jetstack/cert-manager-cainjector:{{.TEST_CERTMANAGER_VERSION}} + - quay.io/jetstack/cert-manager-startupapicheck:{{.TEST_CERTMANAGER_VERSION}} + - quay.io/jetstack/cert-manager-webhook:{{.TEST_CERTMANAGER_VERSION}} + - docker.redpanda.com/redpandadata/redpanda:{{.TEST_REDPANDA_VERSION}} + - docker.redpanda.com/redpandadata/redpanda:{{.TEST_UPGRADE_REDPANDA_VERSION}} + - docker.redpanda.com/redpandadata/redpanda-operator:{{.TEST_UPGRADE_OPERATOR_VERSION}} + cmds: - - docker pull quay.io/jetstack/cert-manager-controller:{{.TEST_CERTMANAGER_VERSION}} - - docker pull quay.io/jetstack/cert-manager-cainjector:{{.TEST_CERTMANAGER_VERSION}} - - docker pull quay.io/jetstack/cert-manager-startupapicheck:{{.TEST_CERTMANAGER_VERSION}} - - docker pull quay.io/jetstack/cert-manager-webhook:{{.TEST_CERTMANAGER_VERSION}} - - docker pull docker.redpanda.com/redpandadata/redpanda:{{.TEST_REDPANDA_VERSION}} - - docker pull docker.redpanda.com/redpandadata/redpanda:{{.TEST_UPGRADE_REDPANDA_VERSION}} - - docker pull docker.redpanda.com/redpandadata/redpanda-operator:{{.TEST_UPGRADE_OPERATOR_VERSION}} + - for: {var: IMAGES} + cmd: docker inspect {{.ITEM}} > /dev/null || docker pull {{.ITEM}} pending-prs: desc: "Get all pending PRs for watched branches" diff --git a/harpoon/go.mod b/harpoon/go.mod index db695403a..7515aa869 100644 --- a/harpoon/go.mod +++ b/harpoon/go.mod @@ -94,6 +94,7 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/miekg/dns v1.1.58 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect diff --git a/harpoon/go.sum b/harpoon/go.sum index 508a3b2d8..33da8647e 100644 --- a/harpoon/go.sum +++ b/harpoon/go.sum @@ -36,6 +36,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cert-manager/cert-manager v1.14.5 h1:uuM1O2g2S80nxiH3eW2cZYMGiL2zmDFVdAzg8sibWuc= +github.com/cert-manager/cert-manager v1.14.5/go.mod h1:fmr/cU5jiLxWj69CroDggSOa49RljUK+dU583TaQUXM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= @@ -579,6 +581,8 @@ pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/gateway-api v1.1.0 h1:DsLDXCi6jR+Xz8/xd0Z1PYl2Pn0TyaFMOPPZIj4inDM= +sigs.k8s.io/gateway-api v1.1.0/go.mod h1:ZH4lHrL2sDi0FHZ9jjneb8kKnGzFWyrTya35sWUTrRs= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/kustomize/api v0.19.0 h1:F+2HB2mU1MSiR9Hp1NEgoU2q9ItNOaBJl0I4Dlus5SQ= diff --git a/operator/internal/controller/redpanda/resource_controller.go b/operator/internal/controller/redpanda/resource_controller.go index 9b43026fc..5aa6161b1 100644 --- a/operator/internal/controller/redpanda/resource_controller.go +++ b/operator/internal/controller/redpanda/resource_controller.go @@ -22,12 +22,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" + "github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log" ) -const fieldOwner client.FieldOwner = "redpanda-operator" - type Resource[T any] interface { *T client.Object @@ -100,7 +99,7 @@ func (r *ResourceController[T, U]) Reconcile(ctx context.Context, req ctrl.Reque if !controllerutil.ContainsFinalizer(object, FinalizerKey) { patch := r.reconciler.FinalizerPatch(request) if patch != nil { - if err := r.Patch(ctx, object, patch, client.ForceOwnership, fieldOwner); err != nil { + if err := r.Patch(ctx, object, patch, client.ForceOwnership, lifecycle.DefaultFieldOwner); err != nil { return ctrl.Result{}, err } } @@ -109,7 +108,7 @@ func (r *ResourceController[T, U]) Reconcile(ctx context.Context, req ctrl.Reque patch, err := r.reconciler.SyncResource(ctx, request) var syncError error if patch != nil { - syncError = r.Status().Patch(ctx, object, patch, client.ForceOwnership, fieldOwner) + syncError = r.Status().Patch(ctx, object, patch, client.ForceOwnership, lifecycle.DefaultFieldOwner) } result := ctrl.Result{} diff --git a/operator/internal/lifecycle/client.go b/operator/internal/lifecycle/client.go index 4202fa627..8aa309495 100644 --- a/operator/internal/lifecycle/client.go +++ b/operator/internal/lifecycle/client.go @@ -11,19 +11,16 @@ package lifecycle import ( "context" - "errors" "fmt" + "maps" "slices" + "github.com/cockroachdb/errors" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8sapierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,11 +46,23 @@ func NewClusterObject[T any, U Cluster[T]]() U { // NewResourceClient creates a new instance of a ResourceClient for managing resources. func NewResourceClient[T any, U Cluster[T]](mgr ctrl.Manager, resourcesFn ResourceManagerFactory[T, U]) *ResourceClient[T, U] { ownershipResolver, statusUpdater, nodePoolRenderer, simpleResourceRenderer := resourcesFn(mgr) + ctl, err := kube.FromRESTConfig(mgr.GetConfig(), kube.Options{ + Options: client.Options{ + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + }, + FieldManager: string(DefaultFieldOwner), + }) + if err != nil { + // NB: This is less than ideal but it's exceptionally unlikely that + // FromRESTConfig actually returns an error and this method is only + // ever called at initializtion time, not runtime. + panic(err) + } + return &ResourceClient[T, U]{ + ctl: ctl, logger: mgr.GetLogger().WithName("ResourceClient"), - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - mapper: mgr.GetRESTMapper(), ownershipResolver: ownershipResolver, statusUpdater: statusUpdater, nodePoolRenderer: nodePoolRenderer, @@ -64,10 +73,8 @@ func NewResourceClient[T any, U Cluster[T]](mgr ctrl.Manager, resourcesFn Resour // ResourceClient is a client used to manage dependent resources, // both simple and node pools, for a given cluster type. type ResourceClient[T any, U Cluster[T]] struct { - client client.Client + ctl *kube.Ctl logger logr.Logger - scheme *runtime.Scheme - mapper apimeta.RESTMapper ownershipResolver OwnershipResolver[T, U] statusUpdater ClusterStatusUpdater[T, U] nodePoolRenderer NodePoolRenderer[T, U] @@ -76,7 +83,12 @@ type ResourceClient[T any, U Cluster[T]] struct { // PatchNodePoolSet updates a StatefulSet for a specific node pool. func (r *ResourceClient[T, U]) PatchNodePoolSet(ctx context.Context, owner U, set *appsv1.StatefulSet) error { - return r.patchOwnedResource(ctx, owner, set) + if set.GetLabels() == nil { + set.SetLabels(map[string]string{}) + } + maps.Copy(set.GetLabels(), r.ownershipResolver.AddLabels(owner)) + set.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(owner, owner.GetObjectKind().GroupVersionKind())}) + return r.ctl.Apply(ctx, set, client.ForceOwnership) } // SetClusterStatus sets the status of the given cluster. @@ -84,54 +96,47 @@ func (r *ResourceClient[T, U]) SetClusterStatus(cluster U, status *ClusterStatus return r.statusUpdater.Update(cluster, status) } -type gvkObject struct { - gvk schema.GroupVersionKind - nn types.NamespacedName +type renderer[T any, U Cluster[T]] struct { + SimpleResourceRenderer[T, U] + Cluster U } -// SyncAll synchronizes the simple resources associated with the given cluster, -// cleaning up any resources that should no longer exist. -func (r *ResourceClient[T, U]) SyncAll(ctx context.Context, owner U) error { - // we don't sync node pools here - resources, err := r.listAllOwnedResources(ctx, owner, false) - if err != nil { - return err - } - toDelete := map[gvkObject]client.Object{} - for _, resource := range resources { - toDelete[gvkObject{ - gvk: resource.GetObjectKind().GroupVersionKind(), - nn: client.ObjectKeyFromObject(resource), - }] = resource - } - - toSync, err := r.simpleResourceRenderer.Render(ctx, owner) - if err != nil { - return err - } - - // attempt to create as many resources in one pass as we can - errs := []error{} +func (r *renderer[T, U]) Render(ctx context.Context) ([]kube.Object, error) { + return r.SimpleResourceRenderer.Render(ctx, r.Cluster) +} - for _, resource := range toSync { - if err := r.patchOwnedResource(ctx, owner, resource); err != nil { - errs = append(errs, err) - } - delete(toDelete, gvkObject{ - gvk: resource.GetObjectKind().GroupVersionKind(), - nn: client.ObjectKeyFromObject(resource), - }) - } +func (r *renderer[T, U]) Types() []kube.Object { + types := r.SimpleResourceRenderer.WatchedResourceTypes() + return slices.DeleteFunc(types, func(o kube.Object) bool { + _, ok := o.(*appsv1.StatefulSet) + return ok + }) +} - for _, resource := range toDelete { - if err := r.client.Delete(ctx, resource); err != nil { - if !k8sapierrors.IsNotFound(err) { - errs = append(errs, err) +func (r *ResourceClient[T, U]) syncer(owner U) *kube.Syncer { + return &kube.Syncer{ + Ctl: r.ctl, + Namespace: owner.GetNamespace(), + Renderer: &renderer[T, U]{ + Cluster: owner, + SimpleResourceRenderer: r.simpleResourceRenderer, + }, + OwnershipLabels: r.ownershipResolver.GetOwnerLabels(owner), + Preprocess: func(o kube.Object) { + if o.GetLabels() == nil { + o.SetLabels(map[string]string{}) } - } + maps.Copy(o.GetLabels(), r.ownershipResolver.AddLabels(owner)) + }, + Owner: *metav1.NewControllerRef(owner, owner.GetObjectKind().GroupVersionKind()), } +} - return errors.Join(errs...) +// SyncAll synchronizes the simple resources associated with the given cluster, +// cleaning up any resources that should no longer exist. +func (r *ResourceClient[T, U]) SyncAll(ctx context.Context, owner U) error { + _, err := r.syncer(owner).Sync(ctx) + return err } // FetchExistingAndDesiredPools fetches the existing and desired node pools for a given cluster, returning @@ -184,7 +189,12 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob builder.Owns(&appsv1.StatefulSet{}) for _, resourceType := range r.simpleResourceRenderer.WatchedResourceTypes() { - mapping, err := getResourceScope(r.mapper, r.scheme, resourceType) + gvk, err := kube.GVKFor(r.ctl.Scheme(), resourceType) + if err != nil { + return err + } + + mapping, err := r.ctl.ScopeOf(gvk) if err != nil { if !apimeta.IsNoMatchError(err) { return err @@ -196,7 +206,7 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob continue } - if mapping.Name() == apimeta.RESTScopeNamespace.Name() { + if mapping == apimeta.RESTScopeNameNamespace { // we're working with a namespace scoped resource, so we can work with ownership builder.Owns(resourceType) continue @@ -220,189 +230,52 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob // DeleteAll deletes all resources owned by the given cluster, including node pools. func (r *ResourceClient[T, U]) DeleteAll(ctx context.Context, owner U) (bool, error) { - // since this is a widespread deletion, we can delete even stateful sets - resources, err := r.listAllOwnedResources(ctx, owner, true) - if err != nil { - return false, err - } - - alive := []client.Object{} - for _, o := range resources { - if o.GetDeletionTimestamp() == nil { - alive = append(alive, o) - } - } - - // attempt to delete as many resources in one pass as we can errs := []error{} - for _, resource := range alive { - if err := r.client.Delete(ctx, resource); err != nil { - errs = append(errs, err) - } - } - - return len(alive) > 0, errors.Join(errs...) -} - -// listResources lists resources of a specific type and object, returning them as an array. -func (r *ResourceClient[T, U]) listResources(ctx context.Context, object client.Object, opts ...client.ListOption) ([]client.Object, error) { - kind, err := getGroupVersionKind(r.client.Scheme(), object) - if err != nil { - return nil, err - } - - olist, err := r.client.Scheme().New(schema.GroupVersionKind{ - Group: kind.Group, - Version: kind.Version, - Kind: kind.Kind + "List", - }) + allDeleted, err := r.syncer(owner).DeleteAll(ctx) if err != nil { - return nil, fmt.Errorf("initializing list: %w", err) - } - list, ok := olist.(client.ObjectList) - if !ok { - return nil, fmt.Errorf("invalid object list type: %T", object) - } - - if err := r.client.List(ctx, list, opts...); err != nil { - // no-op list on unregistered resources, this happens when we - // don't actually have a CRD installed for some resource type - // we're trying to list - if !apimeta.IsNoMatchError(err) { - return nil, fmt.Errorf("listing resources: %w", err) - } + errs = append(errs, err) } - items, err := kube.Items[client.Object](list) + pools, err := r.fetchExistingPools(ctx, owner) if err != nil { - return nil, err - } - - for _, item := range items { - item.GetObjectKind().SetGroupVersionKind(*kind) + return false, err } - return sortCreation(items), nil -} - -// listAllOwnedResources lists all resources owned by a given cluster, optionally including node pools. -func (r *ResourceClient[T, U]) listAllOwnedResources(ctx context.Context, owner U, includeNodePools bool) ([]client.Object, error) { - resources := []client.Object{} - for _, resourceType := range append(r.simpleResourceRenderer.WatchedResourceTypes(), &appsv1.StatefulSet{}) { - matching, err := r.listResources(ctx, resourceType, client.MatchingLabels(r.ownershipResolver.GetOwnerLabels(owner))) - if err != nil { - return nil, err + for _, pool := range pools { + if pool.set.DeletionTimestamp != nil { + allDeleted = false } - filtered := []client.Object{} - for i := range matching { - object := matching[i] - - // filter out unowned resources - mapping, err := getResourceScope(r.mapper, r.scheme, object) - if err != nil { - if !apimeta.IsNoMatchError(err) { - return nil, err - } - - // we have an unknown mapping so just ignore this - continue - } - // isOwner defaults to true here because we don't set - // owner refs on ClusterScoped resources. We only check - // for ownership if it's namespace scoped. - isOwner := true - if mapping.Name() == apimeta.RESTScopeNameNamespace { - isOwner = slices.ContainsFunc(object.GetOwnerReferences(), func(ref metav1.OwnerReference) bool { - return ref.UID == owner.GetUID() - }) - } - - // special case the node pools - if (includeNodePools || !r.nodePoolRenderer.IsNodePool(object)) && isOwner { - filtered = append(filtered, object) - } + if err := r.ctl.Delete(ctx, pool.set); err != nil { + errs = append(errs, err) } - resources = append(resources, filtered...) } - return resources, nil -} -// patchOwnedResource applies a patch to a resource owned by the cluster. -func (r *ResourceClient[T, U]) patchOwnedResource(ctx context.Context, owner U, object client.Object, extraLabels ...map[string]string) error { - if err := r.normalize(object, owner, extraLabels...); err != nil { - return err - } - return r.client.Patch(ctx, object, client.Apply, defaultFieldOwner, client.ForceOwnership) + return allDeleted, errors.Join(errs...) } -// normalize normalizes an object by setting its labels and owner references. Any labels passed in as `extraLabels` -// will potentially override those set by the ownership resolver. -func (r *ResourceClient[T, U]) normalize(object client.Object, owner U, extraLabels ...map[string]string) error { - kind, err := getGroupVersionKind(r.scheme, object) - if err != nil { - return err - } - - unknownMapping := false - - mapping, err := getResourceScope(r.mapper, r.scheme, object) +// fetchExistingPools fetches the existing pools (StatefulSets) for a given cluster. +func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U) ([]*poolWithOrdinals, error) { + sets, err := kube.List[appsv1.StatefulSetList](ctx, r.ctl, cluster.GetNamespace(), client.MatchingLabels(r.ownershipResolver.GetOwnerLabels(cluster))) if err != nil { - if !apimeta.IsNoMatchError(err) { - return err - } - - // we have an unknown mapping so err on the side of not setting - // an owner reference - unknownMapping = true - } - - // nil out the managed fields since with some resources that actually do - // a fetch (i.e. secrets that are created only once), we get an error trying - // to patch a second time - object.SetManagedFields(nil) - - // This needs to be set explicitly in order for SSA to function properly. - // If an initialized pointer to a concrete CR has not specified its GVK - // explicitly, SSA will fail. - object.GetObjectKind().SetGroupVersionKind(*kind) - - labels := object.GetLabels() - if labels == nil { - labels = map[string]string{} - } - - for name, value := range r.ownershipResolver.AddLabels(owner) { - labels[name] = value + return nil, errors.Wrapf(err, "listing StatefulSets") } - for _, extra := range extraLabels { - for name, value := range extra { - labels[name] = value + i := 0 + for _, set := range sets.Items { + isOwned := slices.ContainsFunc(set.OwnerReferences, func(ref metav1.OwnerReference) bool { + return ref.UID == cluster.GetUID() + }) + if isOwned { + sets.Items[i] = set + i++ } } - - object.SetLabels(labels) - - if !unknownMapping && mapping.Name() == apimeta.RESTScopeNamespace.Name() { - object.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(owner, owner.GetObjectKind().GroupVersionKind())}) - } - - return nil -} - -// fetchExistingPools fetches the existing pools (StatefulSets) for a given cluster. -func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U) ([]*poolWithOrdinals, error) { - sets, err := r.listResources(ctx, &appsv1.StatefulSet{}, client.MatchingLabels(r.ownershipResolver.GetOwnerLabels(cluster))) - if err != nil { - return nil, fmt.Errorf("listing StatefulSets: %w", err) - } + sets.Items = sets.Items[:i] existing := []*poolWithOrdinals{} - for _, set := range sets { - statefulSet := set.(*appsv1.StatefulSet) - - if !r.nodePoolRenderer.IsNodePool(statefulSet) { + for _, statefulSet := range sets.Items { + if !r.nodePoolRenderer.IsNodePool(&statefulSet) { continue } @@ -412,31 +285,32 @@ func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U } // based on https://github.com/kubernetes/kubernetes/blob/c90a4b16b6aa849ed362ee40997327db09e3a62d/pkg/controller/history/controller_history.go#L222 - revisions, err := r.listResources(ctx, &appsv1.ControllerRevision{}, client.MatchingLabelsSelector{ + revisions, err := kube.List[appsv1.ControllerRevisionList](ctx, r.ctl, cluster.GetNamespace(), client.MatchingLabelsSelector{ Selector: selector, }) if err != nil { - return nil, fmt.Errorf("listing ControllerRevisions: %w", err) + return nil, errors.Wrapf(err, "listing ControllerRevisions") } + ownedRevisions := []*appsv1.ControllerRevision{} - for i := range revisions { - ref := metav1.GetControllerOfNoCopy(revisions[i]) - if ref == nil || ref.UID == set.GetUID() { - ownedRevisions = append(ownedRevisions, revisions[i].(*appsv1.ControllerRevision)) + for i := range revisions.Items { + ref := metav1.GetControllerOfNoCopy(&revisions.Items[i]) + if ref == nil || ref.UID == statefulSet.GetUID() { + ownedRevisions = append(ownedRevisions, &revisions.Items[i]) } } - pods, err := r.listResources(ctx, &corev1.Pod{}, client.MatchingLabelsSelector{ + pods, err := kube.List[corev1.PodList](ctx, r.ctl, cluster.GetNamespace(), client.MatchingLabelsSelector{ Selector: selector, }) if err != nil { return nil, fmt.Errorf("listing Pods: %w", err) } - ownedPods := []*corev1.Pod{} - for i := range pods { - ownedPods = append(ownedPods, pods[i].(*corev1.Pod)) + ownedPods := make([]*corev1.Pod, len(pods.Items)) + for i := range pods.Items { + ownedPods[i] = &pods.Items[i] } withOrdinals, err := sortPodsByOrdinal(ownedPods...) @@ -445,7 +319,7 @@ func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U } existing = append(existing, &poolWithOrdinals{ - set: statefulSet, + set: &statefulSet, revisions: sortRevisions(ownedRevisions), pods: withOrdinals, }) diff --git a/operator/internal/lifecycle/client_test.go b/operator/internal/lifecycle/client_test.go index a2f9d9e46..c90700366 100644 --- a/operator/internal/lifecycle/client_test.go +++ b/operator/internal/lifecycle/client_test.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap/zapcore" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" k8sapierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -393,94 +392,6 @@ func TestClientWatchResources(t *testing.T) { } } -func TestClientSyncAll(t *testing.T) { - for name, tt := range map[string]struct { - renderLoops [][]client.Object - testParams clientTest - }{ - "no-op": {}, - "render-error": { - testParams: clientTest{ - resourcesRenderError: errors.New("render"), - }, - }, - "overlapping-resource-names": { - renderLoops: [][]client.Object{ - { - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - }, - }, - testParams: clientTest{ - watchedResources: []client.Object{ - &corev1.ConfigMap{}, - &corev1.Secret{}, - &rbacv1.ClusterRole{}, - }, - resources: []client.Object{ - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - &rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - }, - }, - }, - } { - tt.testParams.Run(parentCtx, t, name, func(t *testing.T, instances *clientTestInstances, cluster *MockCluster) { - ctx, cancel := setupContext() - defer cancel() - - ensureSynced := func(resources []client.Object) { - objects, err := instances.resourceClient.listAllOwnedResources(ctx, cluster, false) - require.NoError(t, err) - require.Len(t, objects, len(resources)) - } - - for _, resource := range tt.testParams.resources { - err := instances.checkObject(ctx, t, resource) - require.Error(t, err) - require.True(t, k8sapierrors.IsNotFound(err)) - } - - err := instances.resourceClient.SyncAll(ctx, cluster) - if tt.testParams.resourcesRenderError != nil { - require.Error(t, err) - require.ErrorIs(t, err, tt.testParams.resourcesRenderError) - return - } - - require.NoError(t, err) - - ensureSynced(tt.testParams.resources) - - for _, resources := range tt.renderLoops { - instances.resourceRenderer.SetResources(cluster, resources) - - require.NoError(t, instances.resourceClient.SyncAll(ctx, cluster)) - ensureSynced(resources) - } - }) - } -} - func TestClientFetchExistingAndDesiredPools(t *testing.T) { for name, tt := range map[string]*clientTest{ "no-op": {}, diff --git a/operator/internal/lifecycle/constants.go b/operator/internal/lifecycle/constants.go index bd8052f58..8aea25656 100644 --- a/operator/internal/lifecycle/constants.go +++ b/operator/internal/lifecycle/constants.go @@ -12,7 +12,7 @@ package lifecycle import "sigs.k8s.io/controller-runtime/pkg/client" const ( - defaultFieldOwner = client.FieldOwner("cluster.redpanda.com/operator") + DefaultFieldOwner = client.FieldOwner("cluster.redpanda.com/operator") defaultNamespaceLabel = "cluster.redpanda.com/namespace" defaultOperatorLabel = "cluster.redpanda.com/operator" defaultOwnerLabel = "cluster.redpanda.com/owner" diff --git a/operator/internal/lifecycle/helpers.go b/operator/internal/lifecycle/helpers.go index dc6075f25..601f03cff 100644 --- a/operator/internal/lifecycle/helpers.go +++ b/operator/internal/lifecycle/helpers.go @@ -25,13 +25,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func isStatefulSet(object client.Object) bool { - gvk := object.GetObjectKind().GroupVersionKind() - return gvk.Group == appsv1.SchemeGroupVersion.Group && - gvk.Version == appsv1.SchemeGroupVersion.Version && - gvk.Kind == "StatefulSet" -} - // getGroupVersionKind gets a GVK for an object based on all // GVKs registered with a runtime scheme. func getGroupVersionKind(scheme *runtime.Scheme, object client.Object) (*schema.GroupVersionKind, error) { diff --git a/operator/internal/lifecycle/interfaces_test.go b/operator/internal/lifecycle/interfaces_test.go index a6c9b6d68..d25c1923b 100644 --- a/operator/internal/lifecycle/interfaces_test.go +++ b/operator/internal/lifecycle/interfaces_test.go @@ -364,10 +364,7 @@ func (r *MockNodePoolRenderer) Render(ctx context.Context, cluster *MockCluster) } func (r *MockNodePoolRenderer) IsNodePool(object client.Object) bool { - gvk := object.GetObjectKind().GroupVersionKind() - if gvk.Group != appsv1.SchemeGroupVersion.Group || - gvk.Version != appsv1.SchemeGroupVersion.Version || - gvk.Kind != "StatefulSet" { + if !isNodePool(object) { return false } diff --git a/operator/internal/lifecycle/testdata/cases.pools.golden.txtar b/operator/internal/lifecycle/testdata/cases.pools.golden.txtar index b64d9bdf1..01657718c 100644 --- a/operator/internal/lifecycle/testdata/cases.pools.golden.txtar +++ b/operator/internal/lifecycle/testdata/cases.pools.golden.txtar @@ -740,3 +740,370 @@ status: availableReplicas: 0 replicas: 0 +-- rack-awareness -- +- apiVersion: apps/v1 + kind: StatefulSet + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness + namespace: rack-awareness + spec: + podManagementPolicy: Parallel + replicas: 3 + selector: + matchLabels: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + serviceName: rack-awareness + template: + metadata: + annotations: + config.redpanda.com/checksum: a90b21628d89546d234075143f437a7118e87dca2eb009f7ffb653e7b8f09eca + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + helm.sh/chart: redpanda-5.10.4 + redpanda.com/poddisruptionbudget: rack-awareness + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + topologyKey: kubernetes.io/hostname + automountServiceAccountToken: false + containers: + - command: + - rpk + - redpanda + - start + - --advertise-rpc-addr=$(SERVICE_NAME).rack-awareness.rack-awareness.svc.cluster.local.:33145 + env: + - name: SERVICE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + image: docker.redpanda.com/redpandadata/redpanda:v25.1.7 + lifecycle: + postStart: + exec: + command: + - bash + - -c + - 'timeout -v 45 bash -x /var/lifecycle/postStart.sh 2>&1 | sed "s/^/lifecycle-hook + post-start $(date): /" | tee /proc/1/fd/1; true' + preStop: + exec: + command: + - bash + - -c + - 'timeout -v 45 bash -x /var/lifecycle/preStop.sh 2>&1 | sed "s/^/lifecycle-hook + pre-stop $(date): /" | tee /proc/1/fd/1; true' + livenessProbe: + exec: + command: + - /bin/sh + - -c + - curl --silent --fail -k -m 5 --cacert /etc/tls/certs/default/ca.crt + "https://${SERVICE_NAME}.rack-awareness.rack-awareness.svc.cluster.local.:9644/v1/status/ready" + failureThreshold: 3 + initialDelaySeconds: 10 + periodSeconds: 10 + name: redpanda + ports: + - containerPort: 9644 + name: admin + - containerPort: 9645 + name: admin-default + - containerPort: 8082 + name: http + - containerPort: 8083 + name: http-default + - containerPort: 9093 + name: kafka + - containerPort: 9094 + name: kafka-default + - containerPort: 33145 + name: rpc + - containerPort: 8081 + name: schemaregistry + - containerPort: 8084 + name: schema-default + resources: + limits: + cpu: "1" + memory: 2560Mi + securityContext: + runAsGroup: 101 + runAsUser: 101 + startupProbe: + exec: + command: + - /bin/sh + - -c + - | + set -e + RESULT=$(curl --silent --fail -k -m 5 --cacert /etc/tls/certs/default/ca.crt "https://${SERVICE_NAME}.rack-awareness.rack-awareness.svc.cluster.local.:9644/v1/status/ready") + echo $RESULT + echo $RESULT | grep ready + failureThreshold: 120 + initialDelaySeconds: 1 + periodSeconds: 10 + volumeMounts: + - mountPath: /etc/tls/certs/default + name: redpanda-default-cert + - mountPath: /etc/tls/certs/external + name: redpanda-external-cert + - mountPath: /etc/redpanda + name: config + - mountPath: /tmp/base-config + name: base-config + - mountPath: /var/lifecycle + name: lifecycle-scripts + - mountPath: /var/lib/redpanda/data + name: datadir + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + - args: + - supervisor + - -- + - /redpanda-operator + - sidecar + - --redpanda-yaml + - /etc/redpanda/redpanda.yaml + - --redpanda-cluster-namespace + - rack-awareness + - --redpanda-cluster-name + - rack-awareness + - --run-broker-probe + - --broker-probe-broker-url + - $(SERVICE_NAME).rack-awareness.rack-awareness.svc.cluster.local.:9644 + command: + - /redpanda-operator + env: + - name: SERVICE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + image: localhost/redpanda-operator:dev + name: sidecar + readinessProbe: + failureThreshold: 3 + httpGet: + path: /healthz + port: 8093 + initialDelaySeconds: 1 + periodSeconds: 10 + successThreshold: 1 + resources: {} + securityContext: {} + volumeMounts: + - mountPath: /etc/tls/certs/default + name: redpanda-default-cert + - mountPath: /etc/tls/certs/external + name: redpanda-external-cert + - mountPath: /etc/redpanda + name: config + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + initContainers: + - command: + - /bin/bash + - -c + - rpk redpanda tune all + image: docker.redpanda.com/redpandadata/redpanda:v25.1.7 + name: tuning + resources: {} + securityContext: + capabilities: + add: + - SYS_RESOURCE + privileged: true + runAsGroup: 0 + runAsUser: 0 + volumeMounts: + - mountPath: /etc/tls/certs/default + name: redpanda-default-cert + - mountPath: /etc/tls/certs/external + name: redpanda-external-cert + - mountPath: /etc/redpanda + name: base-config + - command: + - /bin/bash + - -c + - trap "exit 0" TERM; exec $CONFIGURATOR_SCRIPT "${SERVICE_NAME}" "${KUBERNETES_NODE_NAME}" + & wait $! + env: + - name: CONFIGURATOR_SCRIPT + value: /etc/secrets/configurator/scripts/configurator.sh + - name: SERVICE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KUBERNETES_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: HOST_IP_ADDRESS + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.hostIP + image: docker.redpanda.com/redpandadata/redpanda:v25.1.7 + name: redpanda-configurator + resources: {} + securityContext: + runAsGroup: 101 + runAsUser: 101 + volumeMounts: + - mountPath: /etc/tls/certs/default + name: redpanda-default-cert + - mountPath: /etc/tls/certs/external + name: redpanda-external-cert + - mountPath: /etc/redpanda + name: config + - mountPath: /tmp/base-config + name: base-config + - mountPath: /etc/secrets/configurator/scripts/ + name: rack-awareness-configurator + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access + readOnly: true + - command: + - /redpanda-operator + - bootstrap + - --in-dir + - /tmp/base-config + - --out-dir + - /tmp/config + image: localhost/redpanda-operator:dev + name: bootstrap-yaml-envsubst + resources: + limits: + cpu: 100m + memory: 125Mi + requests: + cpu: 100m + memory: 125Mi + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + runAsNonRoot: true + volumeMounts: + - mountPath: /tmp/config/ + name: config + - mountPath: /tmp/base-config/ + name: base-config + securityContext: + fsGroup: 101 + fsGroupChangePolicy: OnRootMismatch + serviceAccountName: rack-awareness + terminationGracePeriodSeconds: 90 + topologySpreadConstraints: + - labelSelector: + matchLabels: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: ScheduleAnyway + volumes: + - name: redpanda-default-cert + secret: + defaultMode: 288 + secretName: rack-awareness-default-cert + - name: redpanda-external-cert + secret: + defaultMode: 288 + secretName: rack-awareness-external-cert + - name: lifecycle-scripts + secret: + defaultMode: 509 + secretName: rack-awareness-sts-lifecycle + - configMap: + name: rack-awareness + name: base-config + - emptyDir: {} + name: config + - name: rack-awareness-configurator + secret: + defaultMode: 509 + secretName: rack-awareness-configurator + - name: datadir + persistentVolumeClaim: + claimName: datadir + - name: kube-api-access + projected: + defaultMode: 420 + sources: + - serviceAccountToken: + expirationSeconds: 3607 + path: token + - configMap: + items: + - key: ca.crt + path: ca.crt + name: kube-root-ca.crt + - downwardAPI: + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + path: namespace + updateStrategy: + type: RollingUpdate + volumeClaimTemplates: + - metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + name: datadir + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 20Gi + status: {} + status: + availableReplicas: 0 + replicas: 0 diff --git a/operator/internal/lifecycle/testdata/cases.resources.golden.txtar b/operator/internal/lifecycle/testdata/cases.resources.golden.txtar index d09684fb2..5e4c52729 100644 --- a/operator/internal/lifecycle/testdata/cases.resources.golden.txtar +++ b/operator/internal/lifecycle/testdata/cases.resources.golden.txtar @@ -2104,3 +2104,1109 @@ type: ClusterIP status: loadBalancer: {} +-- rack-awareness -- +- apiVersion: v1 + kind: Service + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-external + namespace: rack-awareness + spec: + externalTrafficPolicy: Local + ports: + - name: admin-default + nodePort: 31644 + port: 9645 + protocol: TCP + targetPort: 0 + - name: kafka-default + nodePort: 31092 + port: 9094 + protocol: TCP + targetPort: 0 + - name: http-default + nodePort: 30082 + port: 8083 + protocol: TCP + targetPort: 0 + - name: schema-default + nodePort: 30081 + port: 8084 + protocol: TCP + targetPort: 0 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + sessionAffinity: None + type: NodePort + status: + loadBalancer: {} +- apiVersion: policy/v1 + kind: PodDisruptionBudget + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness + namespace: rack-awareness + spec: + maxUnavailable: 1 + selector: + matchLabels: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + redpanda.com/poddisruptionbudget: rack-awareness + status: + currentHealthy: 0 + desiredHealthy: 0 + disruptionsAllowed: 0 + expectedPods: 0 +- apiVersion: v1 + automountServiceAccountToken: false + kind: ServiceAccount + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness + namespace: rack-awareness +- apiVersion: v1 + kind: Service + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + monitoring.redpanda.com/enabled: "false" + name: rack-awareness + namespace: rack-awareness + spec: + clusterIP: None + ports: + - name: admin + port: 9644 + protocol: TCP + targetPort: 9644 + - name: http + port: 8082 + protocol: TCP + targetPort: 8082 + - name: kafka + port: 9093 + protocol: TCP + targetPort: 9093 + - name: rpc + port: 33145 + protocol: TCP + targetPort: 33145 + - name: schemaregistry + port: 8081 + protocol: TCP + targetPort: 8081 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: redpanda-statefulset + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: redpanda + type: ClusterIP + status: + loadBalancer: {} +- apiVersion: v1 + data: + .bootstrap.json.in: '{"audit_enabled":"false","cloud_storage_cache_size":"5368709120","cloud_storage_enable_remote_read":"true","cloud_storage_enable_remote_write":"true","cloud_storage_enabled":"false","compacted_log_segment_size":"67108864","default_topic_replications":"3","enable_rack_awareness":"true","enable_sasl":"false","kafka_connection_rate_limit":"1000","kafka_enable_authorization":"false","log_segment_size_max":"268435456","log_segment_size_min":"16777216","max_compacted_log_segment_size":"536870912","storage_min_free_bytes":"1073741824"}' + bootstrap.yaml.fixups: '[]' + redpanda.yaml: |- + config_file: /etc/redpanda/redpanda.yaml + pandaproxy: + pandaproxy_api: + - address: 0.0.0.0 + name: internal + port: 8082 + - address: 0.0.0.0 + name: default + port: 8083 + pandaproxy_api_tls: + - cert_file: /etc/tls/certs/default/tls.crt + enabled: true + key_file: /etc/tls/certs/default/tls.key + name: internal + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + - cert_file: /etc/tls/certs/external/tls.crt + enabled: true + key_file: /etc/tls/certs/external/tls.key + name: default + require_client_auth: false + truststore_file: /etc/tls/certs/external/ca.crt + pandaproxy_client: + broker_tls: + enabled: true + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + brokers: + - address: rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local. + port: 9093 + - address: rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local. + port: 9093 + - address: rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local. + port: 9093 + redpanda: + admin: + - address: 0.0.0.0 + name: internal + port: 9644 + - address: 0.0.0.0 + name: default + port: 9645 + admin_api_tls: + - cert_file: /etc/tls/certs/default/tls.crt + enabled: true + key_file: /etc/tls/certs/default/tls.key + name: internal + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + - cert_file: /etc/tls/certs/external/tls.crt + enabled: true + key_file: /etc/tls/certs/external/tls.key + name: default + require_client_auth: false + truststore_file: /etc/tls/certs/external/ca.crt + crash_loop_limit: 5 + empty_seed_starts_cluster: false + kafka_api: + - address: 0.0.0.0 + name: internal + port: 9093 + - address: 0.0.0.0 + name: default + port: 9094 + kafka_api_tls: + - cert_file: /etc/tls/certs/default/tls.crt + enabled: true + key_file: /etc/tls/certs/default/tls.key + name: internal + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + - cert_file: /etc/tls/certs/external/tls.crt + enabled: true + key_file: /etc/tls/certs/external/tls.key + name: default + require_client_auth: false + truststore_file: /etc/tls/certs/external/ca.crt + rpc_server: + address: 0.0.0.0 + port: 33145 + rpc_server_tls: + cert_file: /etc/tls/certs/default/tls.crt + enabled: true + key_file: /etc/tls/certs/default/tls.key + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + seed_servers: + - host: + address: rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local. + port: 33145 + - host: + address: rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local. + port: 33145 + - host: + address: rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local. + port: 33145 + rpk: + additional_start_flags: + - --default-log-level=info + - --memory=2048M + - --reserve-memory=205M + - --smp=1 + admin_api: + addresses: + - rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local.:9644 + - rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local.:9644 + - rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local.:9644 + tls: + ca_file: /etc/tls/certs/default/ca.crt + enable_memory_locking: false + kafka_api: + brokers: + - rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local.:9093 + - rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local.:9093 + - rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local.:9093 + tls: + ca_file: /etc/tls/certs/default/ca.crt + overprovisioned: false + schema_registry: + addresses: + - rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local.:8081 + - rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local.:8081 + - rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local.:8081 + tls: + ca_file: /etc/tls/certs/default/ca.crt + tune_aio_events: true + schema_registry: + schema_registry_api: + - address: 0.0.0.0 + name: internal + port: 8081 + - address: 0.0.0.0 + name: default + port: 8084 + schema_registry_api_tls: + - cert_file: /etc/tls/certs/default/tls.crt + enabled: true + key_file: /etc/tls/certs/default/tls.key + name: internal + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + - cert_file: /etc/tls/certs/external/tls.crt + enabled: true + key_file: /etc/tls/certs/external/tls.key + name: default + require_client_auth: false + truststore_file: /etc/tls/certs/external/ca.crt + schema_registry_client: + broker_tls: + enabled: true + require_client_auth: false + truststore_file: /etc/tls/certs/default/ca.crt + brokers: + - address: rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local. + port: 9093 + - address: rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local. + port: 9093 + - address: rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local. + port: 9093 + kind: ConfigMap + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness + namespace: rack-awareness +- apiVersion: v1 + data: + profile: |- + admin_api: + addresses: + - rack-awareness-0:31644 + - rack-awareness-1:31644 + - rack-awareness-2:31644 + tls: + ca_file: ca.crt + kafka_api: + brokers: + - rack-awareness-0:31092 + - rack-awareness-1:31092 + - rack-awareness-2:31092 + tls: + ca_file: ca.crt + name: default + schema_registry: + addresses: + - rack-awareness-0:30081 + - rack-awareness-1:30081 + - rack-awareness-2:30081 + tls: + ca_file: ca.crt + kind: ConfigMap + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-rpk + namespace: rack-awareness +- apiVersion: cert-manager.io/v1 + kind: Issuer + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-default-selfsigned-issuer + namespace: rack-awareness + spec: + selfSigned: {} + status: {} +- apiVersion: cert-manager.io/v1 + kind: Issuer + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-default-root-issuer + namespace: rack-awareness + spec: + ca: + secretName: rack-awareness-default-root-certificate + status: {} +- apiVersion: cert-manager.io/v1 + kind: Issuer + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-external-selfsigned-issuer + namespace: rack-awareness + spec: + selfSigned: {} + status: {} +- apiVersion: cert-manager.io/v1 + kind: Issuer + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-external-root-issuer + namespace: rack-awareness + spec: + ca: + secretName: rack-awareness-external-root-certificate + status: {} +- apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-default-root-certificate + namespace: rack-awareness + spec: + commonName: rack-awareness-default-root-certificate + duration: 43800h0m0s + isCA: true + issuerRef: + group: cert-manager.io + kind: Issuer + name: rack-awareness-default-selfsigned-issuer + privateKey: + algorithm: ECDSA + size: 256 + secretName: rack-awareness-default-root-certificate + status: {} +- apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-external-root-certificate + namespace: rack-awareness + spec: + commonName: rack-awareness-external-root-certificate + duration: 43800h0m0s + isCA: true + issuerRef: + group: cert-manager.io + kind: Issuer + name: rack-awareness-external-selfsigned-issuer + privateKey: + algorithm: ECDSA + size: 256 + secretName: rack-awareness-external-root-certificate + status: {} +- apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-default-cert + namespace: rack-awareness + spec: + dnsNames: + - rack-awareness-cluster.rack-awareness.rack-awareness.svc.cluster.local + - rack-awareness-cluster.rack-awareness.rack-awareness.svc + - rack-awareness-cluster.rack-awareness.rack-awareness + - '*.rack-awareness-cluster.rack-awareness.rack-awareness.svc.cluster.local' + - '*.rack-awareness-cluster.rack-awareness.rack-awareness.svc' + - '*.rack-awareness-cluster.rack-awareness.rack-awareness' + - rack-awareness.rack-awareness.svc.cluster.local + - rack-awareness.rack-awareness.svc + - rack-awareness.rack-awareness + - '*.rack-awareness.rack-awareness.svc.cluster.local' + - '*.rack-awareness.rack-awareness.svc' + - '*.rack-awareness.rack-awareness' + duration: 43800h0m0s + issuerRef: + group: cert-manager.io + kind: Issuer + name: rack-awareness-default-root-issuer + privateKey: + algorithm: ECDSA + size: 256 + secretName: rack-awareness-default-cert + status: {} +- apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-external-cert + namespace: rack-awareness + spec: + dnsNames: + - rack-awareness-cluster.rack-awareness.rack-awareness.svc.cluster.local + - rack-awareness-cluster.rack-awareness.rack-awareness.svc + - rack-awareness-cluster.rack-awareness.rack-awareness + - '*.rack-awareness-cluster.rack-awareness.rack-awareness.svc.cluster.local' + - '*.rack-awareness-cluster.rack-awareness.rack-awareness.svc' + - '*.rack-awareness-cluster.rack-awareness.rack-awareness' + - rack-awareness.rack-awareness.svc.cluster.local + - rack-awareness.rack-awareness.svc + - rack-awareness.rack-awareness + - '*.rack-awareness.rack-awareness.svc.cluster.local' + - '*.rack-awareness.rack-awareness.svc' + - '*.rack-awareness.rack-awareness' + duration: 43800h0m0s + issuerRef: + group: cert-manager.io + kind: Issuer + name: rack-awareness-external-root-issuer + privateKey: + algorithm: ECDSA + size: 256 + secretName: rack-awareness-external-cert + status: {} +- apiVersion: rbac.authorization.k8s.io/v1 + kind: Role + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-rpk-debug-bundle + namespace: rack-awareness + rules: + - apiGroups: + - "" + resources: + - configmaps + - endpoints + - events + - limitranges + - persistentvolumeclaims + - pods + - pods/log + - replicationcontrollers + - resourcequotas + - serviceaccounts + - services + verbs: + - get + - list +- apiVersion: rbac.authorization.k8s.io/v1 + kind: Role + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-sidecar + namespace: rack-awareness + rules: + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRole + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-rack-awareness-rack-awareness + rules: + - apiGroups: + - "" + resources: + - nodes + verbs: + - get +- apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-rpk-debug-bundle + namespace: rack-awareness + roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: rack-awareness-rpk-debug-bundle + subjects: + - kind: ServiceAccount + name: rack-awareness + namespace: rack-awareness +- apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-sidecar + namespace: rack-awareness + roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: rack-awareness-sidecar + subjects: + - kind: ServiceAccount + name: rack-awareness + namespace: rack-awareness +- apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-rack-awareness-rack-awareness + roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: rack-awareness-rack-awareness-rack-awareness + subjects: + - kind: ServiceAccount + name: rack-awareness + namespace: rack-awareness +- apiVersion: v1 + kind: Secret + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-sts-lifecycle + namespace: rack-awareness + stringData: + common.sh: |- + #!/usr/bin/env bash + + # the SERVICE_NAME comes from the metadata.name of the pod, essentially the POD_NAME + CURL_URL="https://${SERVICE_NAME}.rack-awareness.rack-awareness.svc.cluster.local:9644" + + # commands used throughout + CURL_NODE_ID_CMD="curl --silent --fail --cacert /etc/tls/certs/default/ca.crt ${CURL_URL}/v1/node_config" + + CURL_MAINTENANCE_DELETE_CMD_PREFIX='curl -X DELETE --silent -o /dev/null -w "%{http_code}"' + CURL_MAINTENANCE_PUT_CMD_PREFIX='curl -X PUT --silent -o /dev/null -w "%{http_code}"' + CURL_MAINTENANCE_GET_CMD="curl -X GET --silent --cacert /etc/tls/certs/default/ca.crt ${CURL_URL}/v1/maintenance" + postStart.sh: |- + #!/usr/bin/env bash + # This code should be similar if not exactly the same as that found in the panda-operator, see + # https://github.com/redpanda-data/redpanda/blob/e51d5b7f2ef76d5160ca01b8c7a8cf07593d29b6/src/go/k8s/pkg/resources/secret.go + + # path below should match the path defined on the statefulset + source /var/lifecycle/common.sh + + postStartHook () { + set -x + + touch /tmp/postStartHookStarted + + until NODE_ID=$(${CURL_NODE_ID_CMD} | grep -o '\"node_id\":[^,}]*' | grep -o '[^: ]*$'); do + sleep 0.5 + done + + echo "Clearing maintenance mode on node ${NODE_ID}" + CURL_MAINTENANCE_DELETE_CMD="${CURL_MAINTENANCE_DELETE_CMD_PREFIX} --cacert /etc/tls/certs/default/ca.crt ${CURL_URL}/v1/brokers/${NODE_ID}/maintenance" + # a 400 here would mean not in maintenance mode + until [ "${status:-}" = '"200"' ] || [ "${status:-}" = '"400"' ]; do + status=$(${CURL_MAINTENANCE_DELETE_CMD}) + sleep 0.5 + done + + touch /tmp/postStartHookFinished + } + + postStartHook + true + preStop.sh: |- + #!/usr/bin/env bash + # This code should be similar if not exactly the same as that found in the panda-operator, see + # https://github.com/redpanda-data/redpanda/blob/e51d5b7f2ef76d5160ca01b8c7a8cf07593d29b6/src/go/k8s/pkg/resources/secret.go + + touch /tmp/preStopHookStarted + + # path below should match the path defined on the statefulset + source /var/lifecycle/common.sh + + set -x + + preStopHook () { + until NODE_ID=$(${CURL_NODE_ID_CMD} | grep -o '\"node_id\":[^,}]*' | grep -o '[^: ]*$'); do + sleep 0.5 + done + + echo "Setting maintenance mode on node ${NODE_ID}" + CURL_MAINTENANCE_PUT_CMD="${CURL_MAINTENANCE_PUT_CMD_PREFIX} --cacert /etc/tls/certs/default/ca.crt ${CURL_URL}/v1/brokers/${NODE_ID}/maintenance" + until [ "${status:-}" = '"200"' ]; do + status=$(${CURL_MAINTENANCE_PUT_CMD}) + sleep 0.5 + done + + until [ "${finished:-}" = "true" ] || [ "${draining:-}" = "false" ]; do + res=$(${CURL_MAINTENANCE_GET_CMD}) + finished=$(echo $res | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$') + draining=$(echo $res | grep -o '\"draining\":[^,}]*' | grep -o '[^: ]*$') + sleep 0.5 + done + + touch /tmp/preStopHookFinished + } + preStopHook + true + type: Opaque +- apiVersion: v1 + kind: Secret + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: redpanda + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redpanda + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: redpanda-5.10.4 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-configurator + namespace: rack-awareness + stringData: + configurator.sh: |- + set -xe + SERVICE_NAME=$1 + KUBERNETES_NODE_NAME=$2 + POD_ORDINAL=${SERVICE_NAME##*-} + BROKER_INDEX=`expr $POD_ORDINAL + 1` + + CONFIG=/etc/redpanda/redpanda.yaml + + # Setup config files + cp /tmp/base-config/redpanda.yaml "${CONFIG}" + + LISTENER="{\"address\":\"${SERVICE_NAME}.rack-awareness.rack-awareness.svc.cluster.local.\",\"name\":\"internal\",\"port\":9093}" + rpk redpanda config --config "$CONFIG" set redpanda.advertised_kafka_api[0] "$LISTENER" + + ADVERTISED_KAFKA_ADDRESSES=() + + PREFIX_TEMPLATE="" + ADVERTISED_KAFKA_ADDRESSES+=("{\"address\":\"${SERVICE_NAME}\",\"name\":\"default\",\"port\":31092}") + + PREFIX_TEMPLATE="" + ADVERTISED_KAFKA_ADDRESSES+=("{\"address\":\"${SERVICE_NAME}\",\"name\":\"default\",\"port\":31092}") + + PREFIX_TEMPLATE="" + ADVERTISED_KAFKA_ADDRESSES+=("{\"address\":\"${SERVICE_NAME}\",\"name\":\"default\",\"port\":31092}") + + rpk redpanda config --config "$CONFIG" set redpanda.advertised_kafka_api[1] "${ADVERTISED_KAFKA_ADDRESSES[$POD_ORDINAL]}" + + LISTENER="{\"address\":\"${SERVICE_NAME}.rack-awareness.rack-awareness.svc.cluster.local.\",\"name\":\"internal\",\"port\":8082}" + rpk redpanda config --config "$CONFIG" set pandaproxy.advertised_pandaproxy_api[0] "$LISTENER" + + ADVERTISED_HTTP_ADDRESSES=() + + PREFIX_TEMPLATE="" + ADVERTISED_HTTP_ADDRESSES+=("{\"address\":\"${SERVICE_NAME}\",\"name\":\"default\",\"port\":30082}") + + PREFIX_TEMPLATE="" + ADVERTISED_HTTP_ADDRESSES+=("{\"address\":\"${SERVICE_NAME}\",\"name\":\"default\",\"port\":30082}") + + PREFIX_TEMPLATE="" + ADVERTISED_HTTP_ADDRESSES+=("{\"address\":\"${SERVICE_NAME}\",\"name\":\"default\",\"port\":30082}") + + rpk redpanda config --config "$CONFIG" set pandaproxy.advertised_pandaproxy_api[1] "${ADVERTISED_HTTP_ADDRESSES[$POD_ORDINAL]}" + + # Configure Rack Awareness + set +x + RACK=$(curl --silent --cacert /run/secrets/kubernetes.io/serviceaccount/ca.crt --fail -H 'Authorization: Bearer '$(cat /run/secrets/kubernetes.io/serviceaccount/token) "https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}/api/v1/nodes/${KUBERNETES_NODE_NAME}?pretty=true" | grep '"kubernetes.io/os"' | grep -v '\"key\":' | sed 's/.*": "\([^"]\+\).*/\1/') + set -x + rpk --config "$CONFIG" redpanda config set redpanda.rack "${RACK}" + type: Opaque +- apiVersion: v1 + data: + config.yaml: | + # from .Values.console.config + kafka: + brokers: + - rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local.:9093 + - rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local.:9093 + - rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local.:9093 + sasl: + enabled: false + schemaRegistry: + enabled: true + tls: + caFilepath: /etc/tls/certs/default/ca.crt + certFilepath: "" + enabled: true + insecureSkipTlsVerify: false + keyFilepath: "" + urls: + - https://rack-awareness-0.rack-awareness.rack-awareness.svc.cluster.local.:8081 + - https://rack-awareness-1.rack-awareness.rack-awareness.svc.cluster.local.:8081 + - https://rack-awareness-2.rack-awareness.rack-awareness.svc.cluster.local.:8081 + tls: + caFilepath: /etc/tls/certs/default/ca.crt + certFilepath: "" + enabled: true + insecureSkipTlsVerify: false + keyFilepath: "" + redpanda: + adminApi: + enabled: true + tls: + caFilepath: /etc/tls/certs/default/ca.crt + certFilepath: "" + enabled: true + insecureSkipTlsVerify: false + keyFilepath: "" + urls: + - https://rack-awareness.rack-awareness.svc.cluster.local.:9644 + kind: ConfigMap + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: console + app.kubernetes.io/version: v2.8.0 + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: console-0.7.31 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-console + namespace: rack-awareness +- apiVersion: apps/v1 + kind: Deployment + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: console + app.kubernetes.io/version: v2.8.0 + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: console-0.7.31 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-console + namespace: rack-awareness + spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: console + strategy: {} + template: + metadata: + annotations: + checksum-redpanda-chart/config: 0d2fee8fc52d84f752cbe78a81c519531ed9227e33cd7ad8144f57250a121940 + checksum/config: 64f3b5984ecaf99055c6b45a56baef56a9f7889a06431d63526f92f8ad0192bc + creationTimestamp: null + labels: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: console + spec: + affinity: {} + automountServiceAccountToken: false + containers: + - args: + - --config.filepath=/etc/console/configs/config.yaml + image: docker.redpanda.com/redpandadata/console:v2.8.0 + imagePullPolicy: IfNotPresent + livenessProbe: + failureThreshold: 3 + httpGet: + path: /admin/health + port: http + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 1 + name: console + ports: + - containerPort: 8080 + name: http + protocol: TCP + readinessProbe: + failureThreshold: 3 + httpGet: + path: /admin/health + port: http + initialDelaySeconds: 10 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 1 + resources: {} + securityContext: + runAsNonRoot: true + volumeMounts: + - mountPath: /etc/console/configs + name: configs + readOnly: true + - mountPath: /etc/tls/certs/default + name: redpanda-default-cert + securityContext: + fsGroup: 99 + runAsUser: 99 + serviceAccountName: rack-awareness-console + volumes: + - configMap: + name: rack-awareness-console + name: configs + - name: redpanda-default-cert + secret: + defaultMode: 272 + secretName: rack-awareness-default-cert + status: {} +- apiVersion: v1 + automountServiceAccountToken: false + kind: ServiceAccount + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: console + app.kubernetes.io/version: v2.8.0 + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: console-0.7.31 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-console + namespace: rack-awareness +- apiVersion: v1 + kind: Service + metadata: + creationTimestamp: null + labels: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: console + app.kubernetes.io/version: v2.8.0 + cluster.redpanda.com/namespace: rack-awareness + cluster.redpanda.com/operator: v2 + cluster.redpanda.com/owner: rack-awareness + helm.sh/chart: console-0.7.31 + helm.toolkit.fluxcd.io/name: rack-awareness + helm.toolkit.fluxcd.io/namespace: rack-awareness + name: rack-awareness-console + namespace: rack-awareness + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: 0 + selector: + app.kubernetes.io/instance: rack-awareness + app.kubernetes.io/name: console + type: ClusterIP + status: + loadBalancer: {} diff --git a/operator/internal/lifecycle/testdata/cases.txtar b/operator/internal/lifecycle/testdata/cases.txtar index 14d6ab8b9..0fb629b97 100644 --- a/operator/internal/lifecycle/testdata/cases.txtar +++ b/operator/internal/lifecycle/testdata/cases.txtar @@ -7,3 +7,11 @@ spec: initContainers: setDataDirOwnership: enabled: true + +-- rack-awareness -- +# This causes a ClusterRoleBinding to be generated. +spec: + clusterSpec: + rackAwareness: + enabled: true + nodeAnnotation: 'kubernetes.io/os' diff --git a/operator/internal/lifecycle/v2_node_pools.go b/operator/internal/lifecycle/v2_node_pools.go index c421e0564..a880af98e 100644 --- a/operator/internal/lifecycle/v2_node_pools.go +++ b/operator/internal/lifecycle/v2_node_pools.go @@ -133,7 +133,8 @@ func (m *V2NodePoolRenderer) Render(ctx context.Context, cluster *ClusterWithPoo // For now, this concrete implementation just looks for any StatefulSets and says that they are a // node pool. func isNodePool(object client.Object) bool { - return isStatefulSet(object) + _, ok := object.(*appsv1.StatefulSet) + return ok } // IsNodePool returns whether or not the object passed to it should be considered a node pool. diff --git a/operator/internal/lifecycle/v2_simple_resources.go b/operator/internal/lifecycle/v2_simple_resources.go index 51e8e32b3..7beb1ab3d 100644 --- a/operator/internal/lifecycle/v2_simple_resources.go +++ b/operator/internal/lifecycle/v2_simple_resources.go @@ -12,6 +12,7 @@ package lifecycle import ( "context" + rbacv1 "k8s.io/api/rbac/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -67,6 +68,14 @@ func (m *V2SimpleResourceRenderer) Render(ctx context.Context, cluster *ClusterW _, isHook = annotations["helm.sh/hook"] } + // Work around a bug where the helm chart incorrectly sets Namespace on + // ClusterRoleBindings which triggers a bug in kube.Syncer. + // This is fixed here to avoid having to juggle very specific redpanda + // chart versions. + if _, ok := object.(*rbacv1.ClusterRoleBinding); ok { + object.SetNamespace("") + } + if !isNodePool(object) && !isHook { resources = append(resources, object) } diff --git a/operator/internal/lifecycle/v2_test.go b/operator/internal/lifecycle/v2_test.go index 495331f0e..bf8aad68d 100644 --- a/operator/internal/lifecycle/v2_test.go +++ b/operator/internal/lifecycle/v2_test.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/tools/txtar" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -160,6 +161,9 @@ func TestV2ResourceClient(t *testing.T) { for _, resource := range resources { assertOwnership(resource) require.False(t, resourceClient.nodePoolRenderer.IsNodePool(resource)) + if _, ok := resource.(*rbacv1.ClusterRoleBinding); ok { + require.Empty(t, resource.GetNamespace()) + } } resourceBytes, err := yaml.Marshal(resources) diff --git a/pkg/go.mod b/pkg/go.mod index 62f9d5a3e..f452b5b85 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -5,6 +5,7 @@ go 1.24.3 replace pgregory.net/rapid => github.com/chrisseto/rapid v0.0.0-20240815210052-cdeef406c65c require ( + github.com/cert-manager/cert-manager v1.14.5 github.com/cockroachdb/errors v1.11.3 github.com/go-logr/logr v1.4.2 github.com/gonvenience/ytbx v1.4.4 @@ -37,6 +38,7 @@ require ( google.golang.org/protobuf v1.36.6 helm.sh/helm/v3 v3.18.5 k8s.io/api v0.33.3 + k8s.io/apiextensions-apiserver v0.33.3 k8s.io/apimachinery v0.33.3 k8s.io/client-go v0.33.3 k8s.io/klog/v2 v2.130.1 @@ -136,7 +138,6 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect - github.com/miekg/dns v1.1.58 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect @@ -208,13 +209,13 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.33.3 // indirect k8s.io/apiserver v0.33.3 // indirect k8s.io/cli-runtime v0.33.3 // indirect k8s.io/component-base v0.33.3 // indirect k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect k8s.io/kubectl v0.33.3 // indirect oras.land/oras-go/v2 v2.6.0 // indirect + sigs.k8s.io/gateway-api v1.1.0 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/kustomize/api v0.19.0 // indirect sigs.k8s.io/kustomize/kyaml v0.19.0 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index ac0c895b8..797ca9ba5 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -40,6 +40,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cert-manager/cert-manager v1.14.5 h1:uuM1O2g2S80nxiH3eW2cZYMGiL2zmDFVdAzg8sibWuc= +github.com/cert-manager/cert-manager v1.14.5/go.mod h1:fmr/cU5jiLxWj69CroDggSOa49RljUK+dU583TaQUXM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= @@ -565,6 +567,8 @@ oras.land/oras-go/v2 v2.6.0 h1:X4ELRsiGkrbeox69+9tzTu492FMUu7zJQW6eJU+I2oc= oras.land/oras-go/v2 v2.6.0/go.mod h1:magiQDfG6H1O9APp+rOsvCPcW1GD2MM7vgnKY0Y+u1o= sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/gateway-api v1.1.0 h1:DsLDXCi6jR+Xz8/xd0Z1PYl2Pn0TyaFMOPPZIj4inDM= +sigs.k8s.io/gateway-api v1.1.0/go.mod h1:ZH4lHrL2sDi0FHZ9jjneb8kKnGzFWyrTya35sWUTrRs= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/kustomize/api v0.19.0 h1:F+2HB2mU1MSiR9Hp1NEgoU2q9ItNOaBJl0I4Dlus5SQ= diff --git a/pkg/kube/ctl.go b/pkg/kube/ctl.go index c2d2581c1..ef0a3ebbf 100644 --- a/pkg/kube/ctl.go +++ b/pkg/kube/ctl.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/errors" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -112,20 +113,20 @@ func FromRESTConfig(cfg *RESTConfig, opts ...Option) (*Ctl, error) { return nil, err } - fieldManager := options.FieldManager - if fieldManager == "" { - fieldManager = "*kube.Ctl" + fieldOwner := options.FieldManager + if fieldOwner == "" { + fieldOwner = "*kube.Ctl" } - return &Ctl{config: cfg, client: c, fieldManager: fieldManager}, nil + return &Ctl{config: cfg, client: c, fieldOwner: client.FieldOwner(fieldOwner)}, nil } // Ctl is a Kubernetes client inspired by the shape of the `kubectl` CLI with a // focus on being ergonomic. type Ctl struct { - config *rest.Config - client client.Client - fieldManager string + config *rest.Config + client client.Client + fieldOwner client.FieldOwner } // RestConfig returns a deep copy of the [rest.Config] used by this [Ctl]. @@ -133,6 +134,19 @@ func (c *Ctl) RestConfig() *rest.Config { return rest.CopyConfig(c.config) } +// Scheme returns the [runtime.Scheme] used by this instance. +func (c *Ctl) Scheme() *runtime.Scheme { + return c.client.Scheme() +} + +func (c *Ctl) ScopeOf(gvk schema.GroupVersionKind) (meta.RESTScopeName, error) { + mapping, err := c.client.RESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return meta.RESTScopeName(""), errors.WithStack(err) + } + return mapping.Scope.Name(), nil +} + // Get fetches the latest state of an object into `obj` from Kubernetes. // Usage: // @@ -172,17 +186,17 @@ func (c *Ctl) List(ctx context.Context, namespace string, objs ObjectList, opts } // Apply "applies" the provided [Object] via SSA (Server Side Apply). -func (c *Ctl) Apply(ctx context.Context, obj Object) error { - kinds, _, err := c.client.Scheme().ObjectKinds(obj) - if err != nil { - return errors.WithStack(err) - } - +func (c *Ctl) Apply(ctx context.Context, obj Object, opts ...client.PatchOption) error { obj.SetManagedFields(nil) - obj.GetObjectKind().SetGroupVersionKind(kinds[0]) + if err := setGVK(c.Scheme(), obj); err != nil { + return err + } + + // Prepend field owner to allow caller's to override it. + opts = append([]client.PatchOption{c.fieldOwner}, opts...) - if err := c.client.Patch(ctx, obj, client.Apply, client.FieldOwner(c.fieldManager)); err != nil { + if err := c.client.Patch(ctx, obj, client.Apply, opts...); err != nil { return errors.WithStack(err) } @@ -248,9 +262,16 @@ func (c *Ctl) CreateAndWait(ctx context.Context, obj Object, cond CondFn[Object] return c.WaitFor(ctx, obj, cond) } -// Delete initiates the deletion the given [Object]. +// Delete declaratively initiates deletion the given [Object]. +// +// Unlike other Ctl methods, Delete does not update obj. +// +// If obj is already being deleted or has been successfully delete (e.g. +// returns a 404), Delete returns nil. func (c *Ctl) Delete(ctx context.Context, obj Object) error { if err := c.client.Delete(ctx, obj); err != nil { + // Swallow not found errors to behave as a "declarative" delete. + _, err := IsDeleted(obj, err) return errors.WithStack(err) } return nil diff --git a/pkg/kube/ctl_test.go b/pkg/kube/ctl_test.go index 2f469454a..6513d2dae 100644 --- a/pkg/kube/ctl_test.go +++ b/pkg/kube/ctl_test.go @@ -53,4 +53,35 @@ func TestCtl(t *testing.T) { cms, err := kube.List[corev1.ConfigMapList](ctx, ctl, "hello-world") require.NoError(t, err) require.Len(t, cms.Items, 3) + + t.Run("Delete", func(t *testing.T) { + s := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "delete-me", + Namespace: "hello-world", + // Set a finalizer to stall deletion. + Finalizers: []string{"i-prevent.com/deletion"}, + }, + } + + require.NoError(t, ctl.Apply(ctx, s)) + require.NotZero(t, s.UID) + + require.NoError(t, ctl.Delete(ctx, s)) + + // Refresh s and assert that it's now deleting. + require.NoError(t, ctl.Get(ctx, kube.AsKey(s), s)) + require.NotNil(t, s.DeletionTimestamp) + + // Re-issue delete with DeletionTimestamp already set to showcase that + // doesn't result in errors. + require.NoError(t, ctl.Delete(ctx, s)) + + // Clear finalizers to allow deletion to progress. + s.Finalizers = nil + require.NoError(t, ctl.ApplyAndWait(ctx, s, kube.IsDeleted)) + + // No 404s if we attempt o delete an already deleted object. + require.NoError(t, ctl.Delete(ctx, s)) + }) } diff --git a/pkg/kube/syncer.go b/pkg/kube/syncer.go new file mode 100644 index 000000000..eb8ced5cb --- /dev/null +++ b/pkg/kube/syncer.go @@ -0,0 +1,277 @@ +package kube + +import ( + "context" + "maps" + "reflect" + "slices" + + "github.com/cockroachdb/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log" +) + +type Renderer interface { + Render(context.Context) ([]Object, error) + Types() []Object +} + +// Syncer synchronizes a set of [Object]s into the Kubernetes API. Objects will +// be upated via SSA and deleted when they are no longer returned from +// [Renderer]. +type Syncer struct { + Ctl *Ctl + Renderer Renderer + + // Namespace is the namespace [Syncer] will use for listing [Object]s. If + // Renderer returns an Object in a namespace other than this one, it WILL + // NOT be gabage collected. + Namespace string + + // Owner is the [metav1.OwnerReference] that will be set on all **Namespace + // scoped** Objects returned by Renderer. + // It is additionally used to filter **Namespace scoped** objects. + // + // Owner CAN NOT be changed without abandoning objects. + Owner metav1.OwnerReference + + // OwnershipLabels functions similar to Owner. They're applied to all + // objects and used for filtering. In the case of cluster wide objects, + // OwnershipLabels is the sole method of determining ownership. + // + // OwnershipLabels CAN NOT be changed without abandoning objects. + OwnershipLabels map[string]string + + // Preprocess, if provided, is run ahead of applying Objects. It may be + // used to add additional labels, annotation, etc uniformly. + Preprocess func(Object) +} + +func (s *Syncer) Sync(ctx context.Context) ([]Object, error) { + toSync, err := s.toSync(ctx) + if err != nil { + return nil, err + } + + existing, err := s.listInPurview(ctx) + if err != nil { + return nil, err + } + + // Diff toSync and existing to create a list of Objects that should be GC'd. + toDelete := make(map[gvkObject]Object, len(existing)) + for _, resource := range existing { + gvk, err := GVKFor(s.Ctl.Scheme(), resource) + if err != nil { + return nil, err + } + + toDelete[gvkObject{ + gvk: gvk, + key: AsKey(resource), + }] = resource + } + + for _, resource := range toSync { + gvk, err := GVKFor(s.Ctl.Scheme(), resource) + if err != nil { + return nil, err + } + + delete(toDelete, gvkObject{ + gvk: gvk, + key: AsKey(resource), + }) + } + + for _, obj := range toSync { + if err := s.Ctl.Apply(ctx, obj, client.ForceOwnership); err != nil { + // Similarly to our list function, ignore unregistered values and log a warning. + if meta.IsNoMatchError(err) { + gvk, err := GVKFor(s.Ctl.Scheme(), obj) + if err != nil { + return nil, err + } + + log.Error(ctx, err, "WARNING no registered value for resource type", "gvk", gvk.String(), "key", AsKey(obj)) + continue + } + return nil, err + } + } + + for key, obj := range toDelete { + log.Info(ctx, "GC'ing object", "key", key.key.String(), "gvk", key.gvk.String()) + if err := s.Ctl.Delete(ctx, obj); err != nil { + return nil, err + } + } + + // Return the applied objects. They're mutated in place by ApplyAll + // which will allow callers to extract information from their + // statuses and the like. + return toSync, nil +} + +func (s *Syncer) DeleteAll(ctx context.Context) (bool, error) { + toDelete, err := s.listInPurview(ctx) + if err != nil { + return true, err + } + + alive := 0 + for _, obj := range toDelete { + if obj.GetDeletionTimestamp() == nil { + alive++ + } + + if err := s.Ctl.Delete(ctx, obj); err != nil { + return true, err + } + } + + return alive > 0, nil +} + +func (s *Syncer) listInPurview(ctx context.Context) ([]Object, error) { + var objects []Object + for _, t := range s.Renderer.Types() { + gvk, err := GVKFor(s.Ctl.Scheme(), t) + if err != nil { + return nil, err + } + + scope, err := s.Ctl.ScopeOf(gvk) + if err != nil { + // If we encounter an unknown type, e.g. someone hasn't installed + // cert-manager, don't block the entire sync process. Instead we'll + // log a warning and move on. + if meta.IsNoMatchError(err) { + log.Error(ctx, err, "WARNING no registered value for resource type", "gvk", gvk.String()) + continue + } + return nil, err + } + + list, err := listFor(s.Ctl.client.Scheme(), t) + if err != nil { + return nil, err + } + + if err := s.Ctl.List(ctx, s.Namespace, list, client.MatchingLabels(s.OwnershipLabels)); err != nil { + return nil, err + } + + items, err := Items[Object](list) + if err != nil { + return nil, err + } + + // If resources are Namespace scoped, we additionally filter on whether + // or not OwnerRef is set correctly. + if scope == meta.RESTScopeNameNamespace { + i := 0 + for _, obj := range items { + owned := slices.ContainsFunc(obj.GetOwnerReferences(), func(ref metav1.OwnerReference) bool { + return ref.UID == s.Owner.UID + }) + + if owned { + items[i] = obj + i++ + } + + } + + items = items[:i] + } + + objects = append(objects, items...) + } + + return objects, nil +} + +func (s *Syncer) toSync(ctx context.Context) ([]Object, error) { + objs, err := s.Renderer.Render(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + + types := s.Renderer.Types() + expectedTypes := make(map[reflect.Type]struct{}, len(types)) + for _, t := range types { + expectedTypes[reflect.TypeOf(t)] = struct{}{} + } + + for _, obj := range objs { + // Ensure that all types returned are present in s.Types. If they aren't + // we'd potentially "leak" objects. + if _, ok := expectedTypes[reflect.TypeOf(obj)]; !ok { + return nil, errors.Newf(".Render returned %T which isn't present in .Types", obj) + } + + // Run Preprocessors, if any. + if s.Preprocess != nil { + s.Preprocess(obj) + } + + // Additionally apply Owners (if non-namespace scoped) and OwnershipLabels + s.applyOwnerLabels(obj) + if err := s.applyOwnerReferences(obj); err != nil { + return nil, err + } + } + + return objs, nil +} + +func (s *Syncer) applyOwnerLabels(obj Object) { + if obj.GetLabels() == nil { + obj.SetLabels(map[string]string{}) + } + maps.Copy(obj.GetLabels(), s.OwnershipLabels) +} + +func (s *Syncer) applyOwnerReferences(obj Object) error { + gvk, err := GVKFor(s.Ctl.Scheme(), obj) + if err != nil { + return err + } + + scope, err := s.Ctl.ScopeOf(gvk) + if err != nil { + // Ignore no match errors that stem from ScopeOf. We'll handle them + // elsewhere. There's no risk of applying the object with a missing + // ownerreference as the API server won't accept objects of this type. + if meta.IsNoMatchError(err) { + return nil + } + return err + } + + // no owners on namespace scoped items. + if scope == meta.RESTScopeNameRoot { + // Cluster scoped objects should NOT have a namespace specified. If they + // do, they'll be incorrectly deleted due to their ObjectKey containing a + // Namespace and the Kubernetes version not containing a Namespace. + // Validate there here to prevent confusing and unexpected behavior. + if obj.GetNamespace() != "" { + return errors.Newf("cluster scoped object incorrectly has namespace set: %T %s", obj, AsKey(obj)) + } + return nil + } + + obj.SetOwnerReferences([]metav1.OwnerReference{s.Owner}) + + return nil +} + +type gvkObject struct { + gvk schema.GroupVersionKind + key ObjectKey +} diff --git a/pkg/kube/syncer_test.go b/pkg/kube/syncer_test.go new file mode 100644 index 000000000..d3d590cdb --- /dev/null +++ b/pkg/kube/syncer_test.go @@ -0,0 +1,269 @@ +package kube_test + +import ( + "context" + "fmt" + "testing" + "time" + + certmanagerv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/redpanda-data/redpanda-operator/pkg/kube" + "github.com/redpanda-data/redpanda-operator/pkg/kube/kubetest" +) + +func TestSyncer(t *testing.T) { + // Notably: + // - excludes apiextensions. + // - includes certmanagerv1 but CRDs are NOT installed. + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + require.NoError(t, certmanagerv1.AddToScheme(scheme)) + + ctl := kubetest.NewEnv(t, kube.Options{ + Options: client.Options{ + Scheme: scheme, + }, + }) + + // This namespace with hold our test resources and act as the "owner" of + // them. + ns, err := kube.Create[corev1.Namespace](t.Context(), ctl, corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + Labels: map[string]string{ + "owned_by": "no-one", + }, + }, + }) + require.NoError(t, err) + + r := &renderer{Prefix: "test", Namespace: ns.Name} + syncer := kube.Syncer{ + Ctl: ctl, + Renderer: r, + Namespace: ns.Name, + Owner: metav1.OwnerReference{ + Name: ns.Name, + APIVersion: "v1", + Kind: "Namespace", + UID: ns.UID, + }, + OwnershipLabels: map[string]string{ + "owned_by": "syncer", + }, + } + + t.Run("NotInTypes", func(t *testing.T) { + r.ObjNotInTypes = true + defer func() { r.ObjNotInTypes = false }() + + _, err := syncer.Sync(t.Context()) + require.EqualError(t, err, ".Render returned *v1.PersistentVolume which isn't present in .Types") + }) + + t.Run("NotInScheme", func(t *testing.T) { + r.ObjNotInScheme = true + defer func() { r.ObjNotInScheme = false }() + + _, err := syncer.Sync(t.Context()) + require.EqualError(t, err, `no kind is registered for the type v1.CustomResourceDefinition in scheme "pkg/runtime/scheme.go:110"`) + }) + + t.Run("NotInAPI", func(t *testing.T) { + r.ObjNotInAPI = true + defer func() { r.ObjNotInAPI = false }() + + // If we have a type that's not registered in the API, we'll swallow + // any such errors an instead log a warning. + // This may not be the optimal behavior. + _, err := syncer.Sync(t.Context()) + require.NoError(t, err) + }) + + t.Run("NamespaceSetOnClusterScope", func(t *testing.T) { + r.ClusterScopeWithNamespace = true + defer func() { r.ClusterScopeWithNamespace = false }() + + _, err := syncer.Sync(t.Context()) + require.EqualError(t, err, fmt.Sprintf("cluster scoped object incorrectly has namespace set: *v1.Namespace %s/test-ns-nsd", r.Namespace)) + }) + + t.Run("Sync", func(t *testing.T) { + defer func() { + r.Prefix = "test" + syncer.Preprocess = nil + }() + + // Test syncing by changing object names and labels. + for i := range 5 { + r.Prefix = fmt.Sprintf("test-%d", i) + + for j := range 5 { + syncer.Preprocess = func(o kube.Object) { + if o.GetLabels() == nil { + o.SetLabels(map[string]string{}) + } + o.GetLabels()["iteration"] = fmt.Sprintf("%d", j) + } + + objs, err := syncer.Sync(t.Context()) + require.NoError(t, err) + + uids := map[kube.ObjectKey]string{} + resourceVersions := map[kube.ObjectKey]string{} + for _, obj := range objs { + uids[kube.AsKey(obj)] = string(obj.GetUID()) + resourceVersions[kube.AsKey(obj)] = obj.GetResourceVersion() + } + + // Test idempotency by running a no-op and asserting that UID and + // resourceVersion remain stable. + for range 5 { + objs, err := syncer.Sync(t.Context()) + require.NoError(t, err) + + for _, obj := range objs { + // assert that objects are updated in place. + require.NotEmpty(t, obj.GetUID()) + require.True(t, obj.GetDeletionTimestamp().IsZero()) + require.Equal(t, uids[kube.AsKey(obj)], string(obj.GetUID())) + require.Equal(t, resourceVersions[kube.AsKey(obj)], obj.GetResourceVersion()) + // Ownership labels have been applied. + require.Equal(t, "syncer", obj.GetLabels()["owned_by"]) + // Iteration label has been applied. + require.Equal(t, fmt.Sprintf("%d", j), obj.GetLabels()["iteration"]) + + gvk, err := kube.GVKFor(ctl.Scheme(), obj) + require.NoError(t, err) + + scope, err := ctl.ScopeOf(gvk) + require.NoError(t, err) + + if scope == meta.RESTScopeNameNamespace { + require.Equal(t, []metav1.OwnerReference{{ + Name: ns.Name, + APIVersion: "v1", + Kind: "Namespace", + UID: ns.UID, + }}, obj.GetOwnerReferences()) + } + } + } + } + } + }) + + // Asserts that the syncer will take ownership of objects instead of + // returning errors. + t.Run("SSAConflicts", func(t *testing.T) { + objs, err := syncer.Sync(t.Context()) + require.NoError(t, err) + + // NB: This must be on the same key that's referenced in renderer to + // actually generate a conflict. + cm := objs[0].(*corev1.ConfigMap) + cm.Data["some"] = "update" + + err = ctl.Apply(t.Context(), objs[0], client.FieldOwner("conflicter")) + require.True(t, apierrors.IsConflict(err)) + + err = ctl.Apply(t.Context(), objs[0], client.FieldOwner("conflicter"), client.ForceOwnership) + require.NoError(t, err) + + _, err = syncer.Sync(t.Context()) + require.NoError(t, err) + }) + + t.Run("DeleteAll", func(t *testing.T) { + ctx := t.Context() + + require.NoError(t, ctl.Create(ctx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not-owned", + Namespace: ns.Name, + // No OwnerReference means this object will remain untouched. + Labels: syncer.OwnershipLabels, + }, + })) + + _, err := syncer.Sync(ctx) + require.NoError(t, err) + + _, err = syncer.DeleteAll(ctx) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + // Our owning namespace hasn't been removed but the other one(s) + // have been cleaned up. NB: envtest namespaces never get fully + // deleted, so we filter to Active ones. + nss, err := kube.List[corev1.NamespaceList](ctx, ctl, "", client.MatchingFields{ + "status.phase": "Active", + }, client.HasLabels{"owned_by"}) + require.NoError(t, err) + require.Len(t, nss.Items, 1) + require.Equal(t, ns.UID, nss.Items[0].UID) + + // The only left over configmap is our unowned one. + cms, err := kube.List[corev1.ConfigMapList](ctx, ctl, ns.Name, client.HasLabels{"owned_by"}) + require.NoError(t, err) + require.Len(t, cms.Items, 1) + require.Equal(t, "not-owned", cms.Items[0].Name) + }, 30*time.Second, time.Second) + }) +} + +type renderer struct { + Prefix string + Namespace string + ObjNotInScheme bool + ObjNotInAPI bool + ObjNotInTypes bool + ClusterScopeWithNamespace bool +} + +func (r *renderer) Render(_ context.Context) ([]kube.Object, error) { + objs := []kube.Object{ + // A Namespace scoped object. + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-cm", Namespace: r.Namespace}, Data: map[string]string{"some": "data"}}, + // A cluster scoped object. + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-ns"}}, + } + if r.ObjNotInAPI { + objs = append(objs, &certmanagerv1.Certificate{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-c", Namespace: r.Namespace}}) + } + if r.ObjNotInScheme { + objs = append(objs, &apiextensionsv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-crd"}}) + } + if r.ObjNotInTypes { + objs = append(objs, &corev1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-pv"}}) + } + if r.ClusterScopeWithNamespace { + objs = append(objs, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-ns-nsd", Namespace: r.Namespace}}) + } + return objs, nil +} + +func (r *renderer) Types() []kube.Object { + ts := []kube.Object{ + &corev1.ConfigMap{}, + &corev1.Namespace{}, + } + if r.ObjNotInScheme { + ts = append(ts, &apiextensionsv1.CustomResourceDefinition{}) + } + if r.ObjNotInAPI { + ts = append(ts, &certmanagerv1.Certificate{}) + } + return ts +} diff --git a/pkg/kube/util.go b/pkg/kube/util.go new file mode 100644 index 000000000..2bb61af11 --- /dev/null +++ b/pkg/kube/util.go @@ -0,0 +1,54 @@ +package kube + +import ( + "github.com/cockroachdb/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func listFor(scheme *runtime.Scheme, obj Object) (ObjectList, error) { + gvk, err := GVKFor(scheme, obj) + if err != nil { + return nil, err + } + + olist, err := scheme.New(schema.GroupVersionKind{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind + "List", + }) + if err != nil { + return nil, errors.WithStack(err) + } + + list, ok := olist.(ObjectList) + if !ok { + return nil, errors.Newf("type is not ObjectList: %T", obj) + } + + return list, nil +} + +func GVKFor(scheme *runtime.Scheme, object Object) (schema.GroupVersionKind, error) { + kinds, _, err := scheme.ObjectKinds(object) + if err != nil { + return schema.GroupVersionKind{}, errors.WithStack(err) + } + + if len(kinds) == 0 { + return schema.GroupVersionKind{}, errors.Newf("unable to determine object kind: %T", object) + } + + return kinds[0], nil +} + +func setGVK(scheme *runtime.Scheme, obj Object) error { + gvk, err := GVKFor(scheme, obj) + if err != nil { + return err + } + + obj.GetObjectKind().SetGroupVersionKind(gvk) + + return nil +}