diff --git a/Taskfile.yml b/Taskfile.yml index 2bb95d477..47a07280a 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -218,14 +218,18 @@ tasks: TEST_REDPANDA_VERSION: '{{ .TEST_REDPANDA_VERSION | default .DEFAULT_TEST_REDPANDA_VERSION }}' TEST_UPGRADE_REDPANDA_VERSION: '{{ .TEST_UPGRADE_REDPANDA_VERSION | default .DEFAULT_TEST_UPGRADE_REDPANDA_VERSION }}' TEST_UPGRADE_OPERATOR_VERSION: '{{ .TEST_UPGRADE_OPERATOR_VERSION | default .DEFAULT_TEST_UPGRADE_OPERATOR_VERSION }}' + IMAGES: + - quay.io/jetstack/cert-manager-controller:{{.TEST_CERTMANAGER_VERSION}} + - quay.io/jetstack/cert-manager-cainjector:{{.TEST_CERTMANAGER_VERSION}} + - quay.io/jetstack/cert-manager-startupapicheck:{{.TEST_CERTMANAGER_VERSION}} + - quay.io/jetstack/cert-manager-webhook:{{.TEST_CERTMANAGER_VERSION}} + - docker.redpanda.com/redpandadata/redpanda:{{.TEST_REDPANDA_VERSION}} + - docker.redpanda.com/redpandadata/redpanda:{{.TEST_UPGRADE_REDPANDA_VERSION}} + - docker.redpanda.com/redpandadata/redpanda-operator:{{.TEST_UPGRADE_OPERATOR_VERSION}} + cmds: - - docker pull quay.io/jetstack/cert-manager-controller:{{.TEST_CERTMANAGER_VERSION}} - - docker pull quay.io/jetstack/cert-manager-cainjector:{{.TEST_CERTMANAGER_VERSION}} - - docker pull quay.io/jetstack/cert-manager-startupapicheck:{{.TEST_CERTMANAGER_VERSION}} - - docker pull quay.io/jetstack/cert-manager-webhook:{{.TEST_CERTMANAGER_VERSION}} - - docker pull docker.redpanda.com/redpandadata/redpanda:{{.TEST_REDPANDA_VERSION}} - - docker pull docker.redpanda.com/redpandadata/redpanda:{{.TEST_UPGRADE_REDPANDA_VERSION}} - - docker pull docker.redpanda.com/redpandadata/redpanda-operator:{{.TEST_UPGRADE_OPERATOR_VERSION}} + - for: {var: IMAGES} + cmd: docker inspect {{.ITEM}} > /dev/null || docker pull {{.ITEM}} pending-prs: desc: "Get all pending PRs for watched branches" diff --git a/harpoon/go.mod b/harpoon/go.mod index 2ad097b81..895b5a1d1 100644 --- a/harpoon/go.mod +++ b/harpoon/go.mod @@ -93,6 +93,7 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/miekg/dns v1.1.58 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect diff --git a/harpoon/go.sum b/harpoon/go.sum index b3bb23da5..66878ea75 100644 --- a/harpoon/go.sum +++ b/harpoon/go.sum @@ -36,6 +36,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cert-manager/cert-manager v1.14.5 h1:uuM1O2g2S80nxiH3eW2cZYMGiL2zmDFVdAzg8sibWuc= +github.com/cert-manager/cert-manager v1.14.5/go.mod h1:fmr/cU5jiLxWj69CroDggSOa49RljUK+dU583TaQUXM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= @@ -575,6 +577,8 @@ pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/gateway-api v1.1.0 h1:DsLDXCi6jR+Xz8/xd0Z1PYl2Pn0TyaFMOPPZIj4inDM= +sigs.k8s.io/gateway-api v1.1.0/go.mod h1:ZH4lHrL2sDi0FHZ9jjneb8kKnGzFWyrTya35sWUTrRs= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/kustomize/api v0.19.0 h1:F+2HB2mU1MSiR9Hp1NEgoU2q9ItNOaBJl0I4Dlus5SQ= diff --git a/operator/internal/controller/redpanda/resource_controller.go b/operator/internal/controller/redpanda/resource_controller.go index 9b43026fc..5aa6161b1 100644 --- a/operator/internal/controller/redpanda/resource_controller.go +++ b/operator/internal/controller/redpanda/resource_controller.go @@ -22,12 +22,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" + "github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log" ) -const fieldOwner client.FieldOwner = "redpanda-operator" - type Resource[T any] interface { *T client.Object @@ -100,7 +99,7 @@ func (r *ResourceController[T, U]) Reconcile(ctx context.Context, req ctrl.Reque if !controllerutil.ContainsFinalizer(object, FinalizerKey) { patch := r.reconciler.FinalizerPatch(request) if patch != nil { - if err := r.Patch(ctx, object, patch, client.ForceOwnership, fieldOwner); err != nil { + if err := r.Patch(ctx, object, patch, client.ForceOwnership, lifecycle.DefaultFieldOwner); err != nil { return ctrl.Result{}, err } } @@ -109,7 +108,7 @@ func (r *ResourceController[T, U]) Reconcile(ctx context.Context, req ctrl.Reque patch, err := r.reconciler.SyncResource(ctx, request) var syncError error if patch != nil { - syncError = r.Status().Patch(ctx, object, patch, client.ForceOwnership, fieldOwner) + syncError = r.Status().Patch(ctx, object, patch, client.ForceOwnership, lifecycle.DefaultFieldOwner) } result := ctrl.Result{} diff --git a/operator/internal/lifecycle/client.go b/operator/internal/lifecycle/client.go index 4202fa627..a88615aa1 100644 --- a/operator/internal/lifecycle/client.go +++ b/operator/internal/lifecycle/client.go @@ -11,19 +11,16 @@ package lifecycle import ( "context" - "errors" "fmt" + "maps" "slices" + "github.com/cockroachdb/errors" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8sapierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,11 +46,23 @@ func NewClusterObject[T any, U Cluster[T]]() U { // NewResourceClient creates a new instance of a ResourceClient for managing resources. func NewResourceClient[T any, U Cluster[T]](mgr ctrl.Manager, resourcesFn ResourceManagerFactory[T, U]) *ResourceClient[T, U] { ownershipResolver, statusUpdater, nodePoolRenderer, simpleResourceRenderer := resourcesFn(mgr) + ctl, err := kube.FromRESTConfig(mgr.GetConfig(), kube.Options{ + Options: client.Options{ + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + }, + FieldManager: string(DefaultFieldOwner), + }) + if err != nil { + // NB: This is less than ideal but it's exceptionally unlikely that + // FromRESTConfig actually returns an error and this method is only + // ever called at initializtion time, not runtime. + panic(err) + } + return &ResourceClient[T, U]{ + ctl: ctl, logger: mgr.GetLogger().WithName("ResourceClient"), - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - mapper: mgr.GetRESTMapper(), ownershipResolver: ownershipResolver, statusUpdater: statusUpdater, nodePoolRenderer: nodePoolRenderer, @@ -64,10 +73,8 @@ func NewResourceClient[T any, U Cluster[T]](mgr ctrl.Manager, resourcesFn Resour // ResourceClient is a client used to manage dependent resources, // both simple and node pools, for a given cluster type. type ResourceClient[T any, U Cluster[T]] struct { - client client.Client + ctl *kube.Ctl logger logr.Logger - scheme *runtime.Scheme - mapper apimeta.RESTMapper ownershipResolver OwnershipResolver[T, U] statusUpdater ClusterStatusUpdater[T, U] nodePoolRenderer NodePoolRenderer[T, U] @@ -76,7 +83,12 @@ type ResourceClient[T any, U Cluster[T]] struct { // PatchNodePoolSet updates a StatefulSet for a specific node pool. func (r *ResourceClient[T, U]) PatchNodePoolSet(ctx context.Context, owner U, set *appsv1.StatefulSet) error { - return r.patchOwnedResource(ctx, owner, set) + if set.GetLabels() == nil { + set.SetLabels(map[string]string{}) + } + maps.Copy(set.GetLabels(), r.ownershipResolver.AddLabels(owner)) + set.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(owner, owner.GetObjectKind().GroupVersionKind())}) + return r.ctl.Apply(ctx, set, client.ForceOwnership) } // SetClusterStatus sets the status of the given cluster. @@ -84,54 +96,47 @@ func (r *ResourceClient[T, U]) SetClusterStatus(cluster U, status *ClusterStatus return r.statusUpdater.Update(cluster, status) } -type gvkObject struct { - gvk schema.GroupVersionKind - nn types.NamespacedName +type renderer[T any, U Cluster[T]] struct { + SimpleResourceRenderer[T, U] + Cluster U } -// SyncAll synchronizes the simple resources associated with the given cluster, -// cleaning up any resources that should no longer exist. -func (r *ResourceClient[T, U]) SyncAll(ctx context.Context, owner U) error { - // we don't sync node pools here - resources, err := r.listAllOwnedResources(ctx, owner, false) - if err != nil { - return err - } - toDelete := map[gvkObject]client.Object{} - for _, resource := range resources { - toDelete[gvkObject{ - gvk: resource.GetObjectKind().GroupVersionKind(), - nn: client.ObjectKeyFromObject(resource), - }] = resource - } - - toSync, err := r.simpleResourceRenderer.Render(ctx, owner) - if err != nil { - return err - } - - // attempt to create as many resources in one pass as we can - errs := []error{} +func (r *renderer[T, U]) Render(ctx context.Context) ([]kube.Object, error) { + return r.SimpleResourceRenderer.Render(ctx, r.Cluster) +} - for _, resource := range toSync { - if err := r.patchOwnedResource(ctx, owner, resource); err != nil { - errs = append(errs, err) - } - delete(toDelete, gvkObject{ - gvk: resource.GetObjectKind().GroupVersionKind(), - nn: client.ObjectKeyFromObject(resource), - }) - } +func (r *renderer[T, U]) Types() []kube.Object { + types := r.SimpleResourceRenderer.WatchedResourceTypes() + return slices.DeleteFunc(types, func(o kube.Object) bool { + _, ok := o.(*appsv1.StatefulSet) + return ok + }) +} - for _, resource := range toDelete { - if err := r.client.Delete(ctx, resource); err != nil { - if !k8sapierrors.IsNotFound(err) { - errs = append(errs, err) +func (r *ResourceClient[T, U]) syncer(owner U) *kube.Syncer { + return &kube.Syncer{ + Ctl: r.ctl, + Namespace: owner.GetNamespace(), + Renderer: &renderer[T, U]{ + Cluster: owner, + SimpleResourceRenderer: r.simpleResourceRenderer, + }, + OwnershipLabels: r.ownershipResolver.GetOwnerLabels(owner), + Preprocess: func(o kube.Object) { + if o.GetLabels() == nil { + o.SetLabels(map[string]string{}) } - } + maps.Copy(o.GetLabels(), r.ownershipResolver.AddLabels(owner)) + }, + Owner: *metav1.NewControllerRef(owner, owner.GetObjectKind().GroupVersionKind()), } +} - return errors.Join(errs...) +// SyncAll synchronizes the simple resources associated with the given cluster, +// cleaning up any resources that should no longer exist. +func (r *ResourceClient[T, U]) SyncAll(ctx context.Context, owner U) error { + _, err := r.syncer(owner).Sync(ctx) + return err } // FetchExistingAndDesiredPools fetches the existing and desired node pools for a given cluster, returning @@ -184,7 +189,12 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob builder.Owns(&appsv1.StatefulSet{}) for _, resourceType := range r.simpleResourceRenderer.WatchedResourceTypes() { - mapping, err := getResourceScope(r.mapper, r.scheme, resourceType) + gvk, err := kube.GVKFor(r.ctl.Scheme(), resourceType) + if err != nil { + return err + } + + mapping, err := r.ctl.ScopeOf(gvk) if err != nil { if !apimeta.IsNoMatchError(err) { return err @@ -196,7 +206,7 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob continue } - if mapping.Name() == apimeta.RESTScopeNamespace.Name() { + if mapping == apimeta.RESTScopeNameNamespace { // we're working with a namespace scoped resource, so we can work with ownership builder.Owns(resourceType) continue @@ -220,189 +230,52 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob // DeleteAll deletes all resources owned by the given cluster, including node pools. func (r *ResourceClient[T, U]) DeleteAll(ctx context.Context, owner U) (bool, error) { - // since this is a widespread deletion, we can delete even stateful sets - resources, err := r.listAllOwnedResources(ctx, owner, true) - if err != nil { - return false, err - } - - alive := []client.Object{} - for _, o := range resources { - if o.GetDeletionTimestamp() == nil { - alive = append(alive, o) - } - } - - // attempt to delete as many resources in one pass as we can errs := []error{} - for _, resource := range alive { - if err := r.client.Delete(ctx, resource); err != nil { - errs = append(errs, err) - } - } - - return len(alive) > 0, errors.Join(errs...) -} - -// listResources lists resources of a specific type and object, returning them as an array. -func (r *ResourceClient[T, U]) listResources(ctx context.Context, object client.Object, opts ...client.ListOption) ([]client.Object, error) { - kind, err := getGroupVersionKind(r.client.Scheme(), object) - if err != nil { - return nil, err - } - - olist, err := r.client.Scheme().New(schema.GroupVersionKind{ - Group: kind.Group, - Version: kind.Version, - Kind: kind.Kind + "List", - }) + allDeleted, err := r.syncer(owner).DeleteAll(ctx) if err != nil { - return nil, fmt.Errorf("initializing list: %w", err) - } - list, ok := olist.(client.ObjectList) - if !ok { - return nil, fmt.Errorf("invalid object list type: %T", object) - } - - if err := r.client.List(ctx, list, opts...); err != nil { - // no-op list on unregistered resources, this happens when we - // don't actually have a CRD installed for some resource type - // we're trying to list - if !apimeta.IsNoMatchError(err) { - return nil, fmt.Errorf("listing resources: %w", err) - } + errs = append(errs, err) } - items, err := kube.Items[client.Object](list) + pools, err := r.fetchExistingPools(ctx, owner) if err != nil { - return nil, err - } - - for _, item := range items { - item.GetObjectKind().SetGroupVersionKind(*kind) + return false, err } - return sortCreation(items), nil -} - -// listAllOwnedResources lists all resources owned by a given cluster, optionally including node pools. -func (r *ResourceClient[T, U]) listAllOwnedResources(ctx context.Context, owner U, includeNodePools bool) ([]client.Object, error) { - resources := []client.Object{} - for _, resourceType := range append(r.simpleResourceRenderer.WatchedResourceTypes(), &appsv1.StatefulSet{}) { - matching, err := r.listResources(ctx, resourceType, client.MatchingLabels(r.ownershipResolver.GetOwnerLabels(owner))) - if err != nil { - return nil, err + for _, pool := range pools { + if pool.set.DeletionTimestamp != nil { + allDeleted = false } - filtered := []client.Object{} - for i := range matching { - object := matching[i] - - // filter out unowned resources - mapping, err := getResourceScope(r.mapper, r.scheme, object) - if err != nil { - if !apimeta.IsNoMatchError(err) { - return nil, err - } - - // we have an unknown mapping so just ignore this - continue - } - // isOwner defaults to true here because we don't set - // owner refs on ClusterScoped resources. We only check - // for ownership if it's namespace scoped. - isOwner := true - if mapping.Name() == apimeta.RESTScopeNameNamespace { - isOwner = slices.ContainsFunc(object.GetOwnerReferences(), func(ref metav1.OwnerReference) bool { - return ref.UID == owner.GetUID() - }) - } - - // special case the node pools - if (includeNodePools || !r.nodePoolRenderer.IsNodePool(object)) && isOwner { - filtered = append(filtered, object) - } + if err := r.ctl.Delete(ctx, pool.set); err != nil { + errs = append(errs, err) } - resources = append(resources, filtered...) } - return resources, nil -} -// patchOwnedResource applies a patch to a resource owned by the cluster. -func (r *ResourceClient[T, U]) patchOwnedResource(ctx context.Context, owner U, object client.Object, extraLabels ...map[string]string) error { - if err := r.normalize(object, owner, extraLabels...); err != nil { - return err - } - return r.client.Patch(ctx, object, client.Apply, defaultFieldOwner, client.ForceOwnership) + return allDeleted, errors.Join(errs...) } -// normalize normalizes an object by setting its labels and owner references. Any labels passed in as `extraLabels` -// will potentially override those set by the ownership resolver. -func (r *ResourceClient[T, U]) normalize(object client.Object, owner U, extraLabels ...map[string]string) error { - kind, err := getGroupVersionKind(r.scheme, object) - if err != nil { - return err - } - - unknownMapping := false - - mapping, err := getResourceScope(r.mapper, r.scheme, object) +// fetchExistingPools fetches the existing pools (StatefulSets) for a given cluster. +func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U) ([]*poolWithOrdinals, error) { + sets, err := kube.List[appsv1.StatefulSetList](ctx, r.ctl, client.InNamespace(cluster.GetNamespace()), client.MatchingLabels(r.ownershipResolver.GetOwnerLabels(cluster))) if err != nil { - if !apimeta.IsNoMatchError(err) { - return err - } - - // we have an unknown mapping so err on the side of not setting - // an owner reference - unknownMapping = true - } - - // nil out the managed fields since with some resources that actually do - // a fetch (i.e. secrets that are created only once), we get an error trying - // to patch a second time - object.SetManagedFields(nil) - - // This needs to be set explicitly in order for SSA to function properly. - // If an initialized pointer to a concrete CR has not specified its GVK - // explicitly, SSA will fail. - object.GetObjectKind().SetGroupVersionKind(*kind) - - labels := object.GetLabels() - if labels == nil { - labels = map[string]string{} - } - - for name, value := range r.ownershipResolver.AddLabels(owner) { - labels[name] = value + return nil, errors.Wrapf(err, "listing StatefulSets") } - for _, extra := range extraLabels { - for name, value := range extra { - labels[name] = value + i := 0 + for _, set := range sets.Items { + isOwned := slices.ContainsFunc(set.OwnerReferences, func(ref metav1.OwnerReference) bool { + return ref.UID == cluster.GetUID() + }) + if isOwned { + sets.Items[i] = set + i++ } } - - object.SetLabels(labels) - - if !unknownMapping && mapping.Name() == apimeta.RESTScopeNamespace.Name() { - object.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(owner, owner.GetObjectKind().GroupVersionKind())}) - } - - return nil -} - -// fetchExistingPools fetches the existing pools (StatefulSets) for a given cluster. -func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U) ([]*poolWithOrdinals, error) { - sets, err := r.listResources(ctx, &appsv1.StatefulSet{}, client.MatchingLabels(r.ownershipResolver.GetOwnerLabels(cluster))) - if err != nil { - return nil, fmt.Errorf("listing StatefulSets: %w", err) - } + sets.Items = sets.Items[:i] existing := []*poolWithOrdinals{} - for _, set := range sets { - statefulSet := set.(*appsv1.StatefulSet) - - if !r.nodePoolRenderer.IsNodePool(statefulSet) { + for _, statefulSet := range sets.Items { + if !r.nodePoolRenderer.IsNodePool(&statefulSet) { continue } @@ -412,31 +285,32 @@ func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U } // based on https://github.com/kubernetes/kubernetes/blob/c90a4b16b6aa849ed362ee40997327db09e3a62d/pkg/controller/history/controller_history.go#L222 - revisions, err := r.listResources(ctx, &appsv1.ControllerRevision{}, client.MatchingLabelsSelector{ + revisions, err := kube.List[appsv1.ControllerRevisionList](ctx, r.ctl, client.MatchingLabelsSelector{ Selector: selector, }) if err != nil { - return nil, fmt.Errorf("listing ControllerRevisions: %w", err) + return nil, errors.Wrapf(err, "listing ControllerRevisions") } + ownedRevisions := []*appsv1.ControllerRevision{} - for i := range revisions { - ref := metav1.GetControllerOfNoCopy(revisions[i]) - if ref == nil || ref.UID == set.GetUID() { - ownedRevisions = append(ownedRevisions, revisions[i].(*appsv1.ControllerRevision)) + for i := range revisions.Items { + ref := metav1.GetControllerOfNoCopy(&revisions.Items[i]) + if ref == nil || ref.UID == statefulSet.GetUID() { + ownedRevisions = append(ownedRevisions, &revisions.Items[i]) } } - pods, err := r.listResources(ctx, &corev1.Pod{}, client.MatchingLabelsSelector{ + pods, err := kube.List[corev1.PodList](ctx, r.ctl, client.MatchingLabelsSelector{ Selector: selector, }) if err != nil { return nil, fmt.Errorf("listing Pods: %w", err) } - ownedPods := []*corev1.Pod{} - for i := range pods { - ownedPods = append(ownedPods, pods[i].(*corev1.Pod)) + ownedPods := make([]*corev1.Pod, len(pods.Items)) + for i := range pods.Items { + ownedPods[i] = &pods.Items[i] } withOrdinals, err := sortPodsByOrdinal(ownedPods...) @@ -445,7 +319,7 @@ func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U } existing = append(existing, &poolWithOrdinals{ - set: statefulSet, + set: &statefulSet, revisions: sortRevisions(ownedRevisions), pods: withOrdinals, }) diff --git a/operator/internal/lifecycle/client_test.go b/operator/internal/lifecycle/client_test.go index a2f9d9e46..c90700366 100644 --- a/operator/internal/lifecycle/client_test.go +++ b/operator/internal/lifecycle/client_test.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap/zapcore" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" k8sapierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -393,94 +392,6 @@ func TestClientWatchResources(t *testing.T) { } } -func TestClientSyncAll(t *testing.T) { - for name, tt := range map[string]struct { - renderLoops [][]client.Object - testParams clientTest - }{ - "no-op": {}, - "render-error": { - testParams: clientTest{ - resourcesRenderError: errors.New("render"), - }, - }, - "overlapping-resource-names": { - renderLoops: [][]client.Object{ - { - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - }, - }, - testParams: clientTest{ - watchedResources: []client.Object{ - &corev1.ConfigMap{}, - &corev1.Secret{}, - &rbacv1.ClusterRole{}, - }, - resources: []client.Object{ - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - &rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{ - Name: "overlapping-resources-resource", - Namespace: metav1.NamespaceDefault, - }, - }, - }, - }, - }, - } { - tt.testParams.Run(parentCtx, t, name, func(t *testing.T, instances *clientTestInstances, cluster *MockCluster) { - ctx, cancel := setupContext() - defer cancel() - - ensureSynced := func(resources []client.Object) { - objects, err := instances.resourceClient.listAllOwnedResources(ctx, cluster, false) - require.NoError(t, err) - require.Len(t, objects, len(resources)) - } - - for _, resource := range tt.testParams.resources { - err := instances.checkObject(ctx, t, resource) - require.Error(t, err) - require.True(t, k8sapierrors.IsNotFound(err)) - } - - err := instances.resourceClient.SyncAll(ctx, cluster) - if tt.testParams.resourcesRenderError != nil { - require.Error(t, err) - require.ErrorIs(t, err, tt.testParams.resourcesRenderError) - return - } - - require.NoError(t, err) - - ensureSynced(tt.testParams.resources) - - for _, resources := range tt.renderLoops { - instances.resourceRenderer.SetResources(cluster, resources) - - require.NoError(t, instances.resourceClient.SyncAll(ctx, cluster)) - ensureSynced(resources) - } - }) - } -} - func TestClientFetchExistingAndDesiredPools(t *testing.T) { for name, tt := range map[string]*clientTest{ "no-op": {}, diff --git a/operator/internal/lifecycle/constants.go b/operator/internal/lifecycle/constants.go index bd8052f58..8aea25656 100644 --- a/operator/internal/lifecycle/constants.go +++ b/operator/internal/lifecycle/constants.go @@ -12,7 +12,7 @@ package lifecycle import "sigs.k8s.io/controller-runtime/pkg/client" const ( - defaultFieldOwner = client.FieldOwner("cluster.redpanda.com/operator") + DefaultFieldOwner = client.FieldOwner("cluster.redpanda.com/operator") defaultNamespaceLabel = "cluster.redpanda.com/namespace" defaultOperatorLabel = "cluster.redpanda.com/operator" defaultOwnerLabel = "cluster.redpanda.com/owner" diff --git a/operator/internal/lifecycle/helpers.go b/operator/internal/lifecycle/helpers.go index dc6075f25..601f03cff 100644 --- a/operator/internal/lifecycle/helpers.go +++ b/operator/internal/lifecycle/helpers.go @@ -25,13 +25,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func isStatefulSet(object client.Object) bool { - gvk := object.GetObjectKind().GroupVersionKind() - return gvk.Group == appsv1.SchemeGroupVersion.Group && - gvk.Version == appsv1.SchemeGroupVersion.Version && - gvk.Kind == "StatefulSet" -} - // getGroupVersionKind gets a GVK for an object based on all // GVKs registered with a runtime scheme. func getGroupVersionKind(scheme *runtime.Scheme, object client.Object) (*schema.GroupVersionKind, error) { diff --git a/operator/internal/lifecycle/interfaces_test.go b/operator/internal/lifecycle/interfaces_test.go index a6c9b6d68..d25c1923b 100644 --- a/operator/internal/lifecycle/interfaces_test.go +++ b/operator/internal/lifecycle/interfaces_test.go @@ -364,10 +364,7 @@ func (r *MockNodePoolRenderer) Render(ctx context.Context, cluster *MockCluster) } func (r *MockNodePoolRenderer) IsNodePool(object client.Object) bool { - gvk := object.GetObjectKind().GroupVersionKind() - if gvk.Group != appsv1.SchemeGroupVersion.Group || - gvk.Version != appsv1.SchemeGroupVersion.Version || - gvk.Kind != "StatefulSet" { + if !isNodePool(object) { return false } diff --git a/operator/internal/lifecycle/v2_node_pools.go b/operator/internal/lifecycle/v2_node_pools.go index 859947ca4..441cc47d2 100644 --- a/operator/internal/lifecycle/v2_node_pools.go +++ b/operator/internal/lifecycle/v2_node_pools.go @@ -124,7 +124,8 @@ func (m *V2NodePoolRenderer) Render(ctx context.Context, cluster *ClusterWithPoo // For now, this concrete implementation just looks for any StatefulSets and says that they are a // node pool. func isNodePool(object client.Object) bool { - return isStatefulSet(object) + _, ok := object.(*appsv1.StatefulSet) + return ok } // IsNodePool returns whether or not the object passed to it should be considered a node pool. diff --git a/pkg/go.mod b/pkg/go.mod index d68c1a0d2..a0c4203aa 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -5,6 +5,7 @@ go 1.24.3 replace pgregory.net/rapid => github.com/chrisseto/rapid v0.0.0-20240815210052-cdeef406c65c require ( + github.com/cert-manager/cert-manager v1.14.5 github.com/cockroachdb/errors v1.11.3 github.com/go-logr/logr v1.4.2 github.com/gonvenience/ytbx v1.4.4 @@ -37,6 +38,7 @@ require ( google.golang.org/protobuf v1.36.6 helm.sh/helm/v3 v3.18.5 k8s.io/api v0.33.3 + k8s.io/apiextensions-apiserver v0.33.3 k8s.io/apimachinery v0.33.3 k8s.io/client-go v0.33.3 k8s.io/klog/v2 v2.130.1 @@ -134,7 +136,6 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect - github.com/miekg/dns v1.1.58 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect @@ -206,13 +207,13 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.33.3 // indirect k8s.io/apiserver v0.33.3 // indirect k8s.io/cli-runtime v0.33.3 // indirect k8s.io/component-base v0.33.3 // indirect k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect k8s.io/kubectl v0.33.3 // indirect oras.land/oras-go/v2 v2.6.0 // indirect + sigs.k8s.io/gateway-api v1.1.0 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/kustomize/api v0.19.0 // indirect sigs.k8s.io/kustomize/kyaml v0.19.0 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index 23e4ed2cc..b36675267 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -40,6 +40,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cert-manager/cert-manager v1.14.5 h1:uuM1O2g2S80nxiH3eW2cZYMGiL2zmDFVdAzg8sibWuc= +github.com/cert-manager/cert-manager v1.14.5/go.mod h1:fmr/cU5jiLxWj69CroDggSOa49RljUK+dU583TaQUXM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= @@ -574,6 +576,8 @@ oras.land/oras-go/v2 v2.6.0 h1:X4ELRsiGkrbeox69+9tzTu492FMUu7zJQW6eJU+I2oc= oras.land/oras-go/v2 v2.6.0/go.mod h1:magiQDfG6H1O9APp+rOsvCPcW1GD2MM7vgnKY0Y+u1o= sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/gateway-api v1.1.0 h1:DsLDXCi6jR+Xz8/xd0Z1PYl2Pn0TyaFMOPPZIj4inDM= +sigs.k8s.io/gateway-api v1.1.0/go.mod h1:ZH4lHrL2sDi0FHZ9jjneb8kKnGzFWyrTya35sWUTrRs= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/kustomize/api v0.19.0 h1:F+2HB2mU1MSiR9Hp1NEgoU2q9ItNOaBJl0I4Dlus5SQ= diff --git a/pkg/kube/ctl.go b/pkg/kube/ctl.go index 8cb9be5ad..fe4bc89d2 100644 --- a/pkg/kube/ctl.go +++ b/pkg/kube/ctl.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/errors" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" @@ -108,20 +109,20 @@ func FromRESTConfig(cfg *RESTConfig, opts ...Option) (*Ctl, error) { return nil, err } - fieldManager := options.FieldManager - if fieldManager == "" { - fieldManager = "*kube.Ctl" + fieldOwner := options.FieldManager + if fieldOwner == "" { + fieldOwner = "*kube.Ctl" } - return &Ctl{config: cfg, client: c, fieldManager: fieldManager}, nil + return &Ctl{config: cfg, client: c, fieldOwner: client.FieldOwner(fieldOwner)}, nil } // Ctl is a Kubernetes client inspired by the shape of the `kubectl` CLI with a // focus on being ergonomic. type Ctl struct { - config *rest.Config - client client.Client - fieldManager string + config *rest.Config + client client.Client + fieldOwner client.FieldOwner } // RestConfig returns a deep copy of the [rest.Config] used by this [Ctl]. @@ -129,6 +130,19 @@ func (c *Ctl) RestConfig() *rest.Config { return rest.CopyConfig(c.config) } +// Scheme returns the [runtime.Scheme] used by this instance. +func (c *Ctl) Scheme() *runtime.Scheme { + return c.client.Scheme() +} + +func (c *Ctl) ScopeOf(gvk schema.GroupVersionKind) (meta.RESTScopeName, error) { + mapping, err := c.client.RESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return meta.RESTScopeName(""), errors.WithStack(err) + } + return mapping.Scope.Name(), nil +} + // Get fetches the latest state of an object into `obj` from Kubernetes. // Usage: // @@ -154,7 +168,7 @@ func (c *Ctl) GetAndWait(ctx context.Context, key ObjectKey, obj Object, cond Co // // var pods corev1.PodList // ctl.List(ctx, &pods) -func (c *Ctl) List(ctx context.Context, objs client.ObjectList, opts ...client.ListOption) error { +func (c *Ctl) List(ctx context.Context, objs ObjectList, opts ...client.ListOption) error { if err := c.client.List(ctx, objs, opts...); err != nil { return errors.WithStack(err) } @@ -162,17 +176,17 @@ func (c *Ctl) List(ctx context.Context, objs client.ObjectList, opts ...client.L } // Apply "applies" the provided [Object] via SSA (Server Side Apply). -func (c *Ctl) Apply(ctx context.Context, obj Object) error { - kinds, _, err := c.client.Scheme().ObjectKinds(obj) - if err != nil { - return errors.WithStack(err) - } - +func (c *Ctl) Apply(ctx context.Context, obj Object, opts ...client.PatchOption) error { obj.SetManagedFields(nil) - obj.GetObjectKind().SetGroupVersionKind(kinds[0]) + if err := setGVK(c.Scheme(), obj); err != nil { + return err + } + + // Prepend field owner to allow caller's to override it. + opts = append([]client.PatchOption{c.fieldOwner}, opts...) - if err := c.client.Patch(ctx, obj, client.Apply, client.FieldOwner(c.fieldManager)); err != nil { + if err := c.client.Patch(ctx, obj, client.Apply, opts...); err != nil { return errors.WithStack(err) } @@ -238,9 +252,16 @@ func (c *Ctl) CreateAndWait(ctx context.Context, obj Object, cond CondFn[Object] return c.WaitFor(ctx, obj, cond) } -// Delete initiates the deletion the given [Object]. +// Delete declaratively initiates deletion the given [Object]. +// +// Unlike other Ctl methods, Delete does not update obj. +// +// If obj is already being deleted or has been successfully delete (e.g. +// returns a 404), Delete returns nil. func (c *Ctl) Delete(ctx context.Context, obj Object) error { if err := c.client.Delete(ctx, obj); err != nil { + // Swallow not found errors to behave as a "declarative" delete. + _, err := IsDeleted(obj, err) return errors.WithStack(err) } return nil diff --git a/pkg/kube/ctl_test.go b/pkg/kube/ctl_test.go index 4e44b23a0..ec343257a 100644 --- a/pkg/kube/ctl_test.go +++ b/pkg/kube/ctl_test.go @@ -53,4 +53,35 @@ func TestCtl(t *testing.T) { cms, err := kube.List[corev1.ConfigMapList](ctx, ctl, kube.InNamespace("hello-world")) require.NoError(t, err) require.Len(t, cms.Items, 3) + + t.Run("Delete", func(t *testing.T) { + s := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "delete-me", + Namespace: "hello-world", + // Set a finalizer to stall deletion. + Finalizers: []string{"i-prevent.com/deletion"}, + }, + } + + require.NoError(t, ctl.Apply(ctx, s)) + require.NotZero(t, s.UID) + + require.NoError(t, ctl.Delete(ctx, s)) + + // Refresh s and assert that it's now deleting. + require.NoError(t, ctl.Get(ctx, kube.AsKey(s), s)) + require.NotNil(t, s.DeletionTimestamp) + + // Re-issue delete with DeletionTimestamp already set to showcase that + // doesn't result in errors. + require.NoError(t, ctl.Delete(ctx, s)) + + // Clear finalizers to allow deletion to progress. + s.Finalizers = nil + require.NoError(t, ctl.ApplyAndWait(ctx, s, kube.IsDeleted)) + + // No 404s if we attempt o delete an already deleted object. + require.NoError(t, ctl.Delete(ctx, s)) + }) } diff --git a/pkg/kube/syncer.go b/pkg/kube/syncer.go new file mode 100644 index 000000000..61e483abf --- /dev/null +++ b/pkg/kube/syncer.go @@ -0,0 +1,269 @@ +package kube + +import ( + "context" + "maps" + "reflect" + "slices" + + "github.com/cockroachdb/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/redpanda-data/redpanda-operator/pkg/otelutil/log" +) + +type Renderer interface { + Render(context.Context) ([]Object, error) + Types() []Object +} + +// Syncer synchronizes a set of [Object]s into the Kubernetes API. Objects will +// be upated via SSA and deleted when they are no longer returned from +// [Renderer]. +type Syncer struct { + Ctl *Ctl + Renderer Renderer + + // Namespace is the namespace [Syncer] will use for listing [Object]s. If + // Renderer returns an Object in a namespace other than this one, it WILL + // NOT be gabage collected. + Namespace string + + // Owner is the [metav1.OwnerReference] that will be set on all **Namespace + // scoped** Objects returned by Renderer. + // It is additionally used to filter **Namespace scoped** objects. + // + // Owner CAN NOT be changed without abandoning objects. + Owner metav1.OwnerReference + + // OwnershipLabels functions similar to Owner. They're applied to all + // objects and used for filtering. In the case of cluster wide objects, + // OwnershipLabels is the sole method of determining ownership. + // + // OwnershipLabels CAN NOT be changed without abandoning objects. + OwnershipLabels map[string]string + + // Preprocess, if provided, is run ahead of applying Objects. It may be + // used to add additional labels, annotation, etc uniformly. + Preprocess func(Object) +} + +func (s *Syncer) Sync(ctx context.Context) ([]Object, error) { + toSync, err := s.toSync(ctx) + if err != nil { + return nil, err + } + + existing, err := s.listInPurview(ctx) + if err != nil { + return nil, err + } + + // Diff toSync and existing to create a list of Objects that should be GC'd. + toDelete := make(map[gvkObject]Object, len(existing)) + for _, resource := range existing { + gvk, err := GVKFor(s.Ctl.Scheme(), resource) + if err != nil { + return nil, err + } + + toDelete[gvkObject{ + gvk: gvk, + key: AsKey(resource), + }] = resource + } + + for _, resource := range toSync { + gvk, err := GVKFor(s.Ctl.Scheme(), resource) + if err != nil { + return nil, err + } + + delete(toDelete, gvkObject{ + gvk: gvk, + key: AsKey(resource), + }) + } + + for _, obj := range toSync { + if err := s.Ctl.Apply(ctx, obj, client.ForceOwnership); err != nil { + // Similarly to our list function, ignore unregistered values and log a warning. + if meta.IsNoMatchError(err) { + gvk, err := GVKFor(s.Ctl.Scheme(), obj) + if err != nil { + return nil, err + } + + log.Error(ctx, err, "WARNING no registered value for resource type", "gvk", gvk.String(), "key", AsKey(obj)) + continue + } + return nil, err + } + } + + for _, obj := range toDelete { + if err := s.Ctl.Delete(ctx, obj); err != nil { + return nil, err + } + } + + // Return the applied objects. They're mutated in place by ApplyAll + // which will allow callers to extract information from their + // statuses and the like. + return toSync, nil +} + +func (s *Syncer) DeleteAll(ctx context.Context) (bool, error) { + toDelete, err := s.listInPurview(ctx) + if err != nil { + return true, err + } + + alive := 0 + for _, obj := range toDelete { + if obj.GetDeletionTimestamp() == nil { + alive++ + } + + if err := s.Ctl.Delete(ctx, obj); err != nil { + return true, err + } + } + + return alive > 0, nil +} + +func (s *Syncer) listInPurview(ctx context.Context) ([]Object, error) { + var objects []Object + for _, t := range s.Renderer.Types() { + gvk, err := GVKFor(s.Ctl.Scheme(), t) + if err != nil { + return nil, err + } + + scope, err := s.Ctl.ScopeOf(gvk) + if err != nil { + // If we encounter an unknown type, e.g. someone hasn't installed + // cert-manager, don't block the entire sync process. Instead we'll + // log a warning and move on. + if meta.IsNoMatchError(err) { + log.Error(ctx, err, "WARNING no registered value for resource type", "gvk", gvk.String()) + continue + } + return nil, err + } + + list, err := listFor(s.Ctl.client.Scheme(), t) + if err != nil { + return nil, err + } + + if err := s.Ctl.List(ctx, list, client.InNamespace(s.Namespace), client.MatchingLabels(s.OwnershipLabels)); err != nil { + return nil, err + } + + items, err := Items[Object](list) + if err != nil { + return nil, err + } + + // If resources are Namespace scoped, we additionally filter on whether + // or not OwnerRef is set correctly. + if scope == meta.RESTScopeNameNamespace { + i := 0 + for _, obj := range items { + owned := slices.ContainsFunc(obj.GetOwnerReferences(), func(ref metav1.OwnerReference) bool { + return ref.UID == s.Owner.UID + }) + + if owned { + items[i] = obj + i++ + } + + } + + items = items[:i] + } + + objects = append(objects, items...) + } + + return objects, nil +} + +func (s *Syncer) toSync(ctx context.Context) ([]Object, error) { + objs, err := s.Renderer.Render(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + + types := s.Renderer.Types() + expectedTypes := make(map[reflect.Type]struct{}, len(types)) + for _, t := range types { + expectedTypes[reflect.TypeOf(t)] = struct{}{} + } + + for _, obj := range objs { + // Ensure that all types returned are present in s.Types. If they aren't + // we'd potentially "leak" objects. + if _, ok := expectedTypes[reflect.TypeOf(obj)]; !ok { + return nil, errors.Newf(".Render returned %T which isn't present in .Types", obj) + } + + // Run Preprocessors, if any. + if s.Preprocess != nil { + s.Preprocess(obj) + } + + // Additionally apply Owners (if non-namespace scoped) and OwnershipLabels + s.applyOwnerLabels(obj) + if err := s.applyOwnerReferences(obj); err != nil { + return nil, err + } + } + + return objs, nil +} + +func (s *Syncer) applyOwnerLabels(obj Object) { + if obj.GetLabels() == nil { + obj.SetLabels(map[string]string{}) + } + maps.Copy(obj.GetLabels(), s.OwnershipLabels) +} + +func (s *Syncer) applyOwnerReferences(obj Object) error { + gvk, err := GVKFor(s.Ctl.Scheme(), obj) + if err != nil { + return err + } + + scope, err := s.Ctl.ScopeOf(gvk) + if err != nil { + // Ignore no match errors that stem from ScopeOf. We'll handle them + // elsewhere. There's no risk of applying the object with a missing + // ownerreference as the API server won't accept objects of this type. + if meta.IsNoMatchError(err) { + return nil + } + return err + } + + // no owners on namespace scoped items. + if scope == meta.RESTScopeNameRoot { + return nil + } + + obj.SetOwnerReferences([]metav1.OwnerReference{s.Owner}) + + return nil +} + +type gvkObject struct { + gvk schema.GroupVersionKind + key ObjectKey +} diff --git a/pkg/kube/syncer_test.go b/pkg/kube/syncer_test.go new file mode 100644 index 000000000..5eef0f6ab --- /dev/null +++ b/pkg/kube/syncer_test.go @@ -0,0 +1,241 @@ +package kube_test + +import ( + "context" + "fmt" + "testing" + "time" + + certmanagerv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/redpanda-data/redpanda-operator/pkg/kube" + "github.com/redpanda-data/redpanda-operator/pkg/kube/kubetest" +) + +func TestSyncer(t *testing.T) { + // Notably: + // - excludes apiextensions. + // - includes certmanagerv1 but CRDs are NOT installed. + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + require.NoError(t, certmanagerv1.AddToScheme(scheme)) + + ctl := kubetest.NewEnv(t, kube.Options{ + Options: client.Options{ + Scheme: scheme, + }, + }) + + // This namespace with hold our test resources and act as the "owner" of + // them. + ns, err := kube.Create[corev1.Namespace](t.Context(), ctl, corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + Labels: map[string]string{ + "owned_by": "no-one", + }, + }, + }) + require.NoError(t, err) + + r := &renderer{Prefix: "test", Namespace: ns.Name} + syncer := kube.Syncer{ + Ctl: ctl, + Renderer: r, + Namespace: ns.Name, + Owner: metav1.OwnerReference{ + Name: ns.Name, + APIVersion: "v1", + Kind: "Namespace", + UID: ns.UID, + }, + OwnershipLabels: map[string]string{ + "owned_by": "syncer", + }, + } + + t.Run("NotInTypes", func(t *testing.T) { + r.ObjNotInTypes = true + defer func() { r.ObjNotInTypes = false }() + + _, err := syncer.Sync(t.Context()) + require.EqualError(t, err, ".Render returned *v1.PersistentVolume which isn't present in .Types") + }) + + t.Run("NotInScheme", func(t *testing.T) { + r.ObjNotInScheme = true + defer func() { r.ObjNotInScheme = false }() + + _, err := syncer.Sync(t.Context()) + require.EqualError(t, err, `no kind is registered for the type v1.CustomResourceDefinition in scheme "pkg/runtime/scheme.go:110"`) + }) + + t.Run("NotInAPI", func(t *testing.T) { + r.ObjNotInAPI = true + defer func() { r.ObjNotInAPI = false }() + + // If we have a type that's not registered in the API, we'll swallow + // any such errors an instead log a warning. + // This may not be the optimal behavior. + _, err := syncer.Sync(t.Context()) + require.NoError(t, err) + }) + + t.Run("Sync", func(t *testing.T) { + defer func() { + r.Prefix = "test" + syncer.Preprocess = nil + }() + + // Test syncing by changing object names and labels. + for i := range 5 { + r.Prefix = fmt.Sprintf("test-%d", i) + + for j := range 5 { + syncer.Preprocess = func(o kube.Object) { + if o.GetLabels() == nil { + o.SetLabels(map[string]string{}) + } + o.GetLabels()["iteration"] = fmt.Sprintf("%d", j) + } + + objs, err := syncer.Sync(t.Context()) + require.NoError(t, err) + + for _, obj := range objs { + // assert that objects are updated in place. + require.NotEmpty(t, obj.GetUID()) + // Ownership labels have been applied. + require.Equal(t, "syncer", obj.GetLabels()["owned_by"]) + + // + require.Equal(t, fmt.Sprintf("%d", j), obj.GetLabels()["iteration"]) + + gvk, err := kube.GVKFor(ctl.Scheme(), obj) + require.NoError(t, err) + + scope, err := ctl.ScopeOf(gvk) + require.NoError(t, err) + + if scope == meta.RESTScopeNameNamespace { + require.Equal(t, []metav1.OwnerReference{{ + Name: ns.Name, + APIVersion: "v1", + Kind: "Namespace", + UID: ns.UID, + }}, obj.GetOwnerReferences()) + } + } + } + } + }) + + // Asserts that the syncer will take ownership of objects instead of + // returning errors. + t.Run("SSAConflicts", func(t *testing.T) { + objs, err := syncer.Sync(t.Context()) + require.NoError(t, err) + + // NB: This must be on the same key that's referenced in renderer to + // actually generate a conflict. + cm := objs[0].(*corev1.ConfigMap) + cm.Data["some"] = "update" + + err = ctl.Apply(t.Context(), objs[0], client.FieldOwner("conflicter")) + require.True(t, apierrors.IsConflict(err)) + + err = ctl.Apply(t.Context(), objs[0], client.FieldOwner("conflicter"), client.ForceOwnership) + require.NoError(t, err) + + _, err = syncer.Sync(t.Context()) + require.NoError(t, err) + }) + + t.Run("DeleteAll", func(t *testing.T) { + ctx := t.Context() + + require.NoError(t, ctl.Create(ctx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not-owned", + Namespace: ns.Name, + // No OwnerReference means this object will remain untouched. + Labels: syncer.OwnershipLabels, + }, + })) + + _, err := syncer.Sync(ctx) + require.NoError(t, err) + + _, err = syncer.DeleteAll(ctx) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + // Our owning namespace hasn't been removed but the other one(s) + // have been cleaned up. NB: envtest namespaces never get fully + // deleted, so we filter to Active ones. + nss, err := kube.List[corev1.NamespaceList](ctx, ctl, client.MatchingFields{ + "status.phase": "Active", + }, client.HasLabels{"owned_by"}) + require.NoError(t, err) + require.Len(t, nss.Items, 1) + require.Equal(t, ns.UID, nss.Items[0].UID) + + // The only left over configmap is our unowned one. + cms, err := kube.List[corev1.ConfigMapList](ctx, ctl, client.HasLabels{"owned_by"}, client.InNamespace(ns.Name)) + require.NoError(t, err) + require.Len(t, cms.Items, 1) + require.Equal(t, "not-owned", cms.Items[0].Name) + }, 30*time.Second, time.Second) + }) +} + +type renderer struct { + Prefix string + Namespace string + ObjNotInScheme bool + ObjNotInAPI bool + ObjNotInTypes bool +} + +func (r *renderer) Render(_ context.Context) ([]kube.Object, error) { + objs := []kube.Object{ + // A Namespace scoped object. + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-cm", Namespace: r.Namespace}, Data: map[string]string{"some": "data"}}, + // A cluster scoped object. + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-ns"}}, + } + if r.ObjNotInAPI { + objs = append(objs, &certmanagerv1.Certificate{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-c", Namespace: r.Namespace}}) + } + if r.ObjNotInScheme { + objs = append(objs, &apiextensionsv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-crd"}}) + } + if r.ObjNotInTypes { + objs = append(objs, &corev1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: r.Prefix + "-pv"}}) + } + return objs, nil +} + +func (r *renderer) Types() []kube.Object { + ts := []kube.Object{ + &corev1.ConfigMap{}, + &corev1.Namespace{}, + } + if r.ObjNotInScheme { + ts = append(ts, &apiextensionsv1.CustomResourceDefinition{}) + } + if r.ObjNotInAPI { + ts = append(ts, &certmanagerv1.Certificate{}) + } + return ts +} diff --git a/pkg/kube/util.go b/pkg/kube/util.go new file mode 100644 index 000000000..2bb61af11 --- /dev/null +++ b/pkg/kube/util.go @@ -0,0 +1,54 @@ +package kube + +import ( + "github.com/cockroachdb/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func listFor(scheme *runtime.Scheme, obj Object) (ObjectList, error) { + gvk, err := GVKFor(scheme, obj) + if err != nil { + return nil, err + } + + olist, err := scheme.New(schema.GroupVersionKind{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind + "List", + }) + if err != nil { + return nil, errors.WithStack(err) + } + + list, ok := olist.(ObjectList) + if !ok { + return nil, errors.Newf("type is not ObjectList: %T", obj) + } + + return list, nil +} + +func GVKFor(scheme *runtime.Scheme, object Object) (schema.GroupVersionKind, error) { + kinds, _, err := scheme.ObjectKinds(object) + if err != nil { + return schema.GroupVersionKind{}, errors.WithStack(err) + } + + if len(kinds) == 0 { + return schema.GroupVersionKind{}, errors.Newf("unable to determine object kind: %T", object) + } + + return kinds[0], nil +} + +func setGVK(scheme *runtime.Scheme, obj Object) error { + gvk, err := GVKFor(scheme, obj) + if err != nil { + return err + } + + obj.GetObjectKind().SetGroupVersionKind(gvk) + + return nil +}