diff --git a/api/v1beta1/federation_types.go b/api/v1beta1/federation_types.go index 79b1c9ee..c0d05159 100644 --- a/api/v1beta1/federation_types.go +++ b/api/v1beta1/federation_types.go @@ -35,6 +35,11 @@ type FederationSpec struct { TrustUserId bool `json:"trustUserId,omitempty"` Exchange string `json:"exchange,omitempty"` Queue string `json:"queue,omitempty"` + // DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted. + // Can be set to 'delete' or 'retain'. Default is 'delete'. + // +kubebuilder:validation:Enum=delete;retain + // +kubebuilder:default:=delete + DeletionPolicy string `json:"deletionPolicy,omitempty"` } // FederationStatus defines the observed state of Federation diff --git a/api/v1beta1/federation_types_test.go b/api/v1beta1/federation_types_test.go index a6ab05a0..c8c12bef 100644 --- a/api/v1beta1/federation_types_test.go +++ b/api/v1beta1/federation_types_test.go @@ -25,6 +25,7 @@ var _ = Describe("Federation spec", func() { UriSecret: &corev1.LocalObjectReference{ Name: "a-secret", }, + DeletionPolicy: "delete", } federation := Federation{ @@ -122,4 +123,32 @@ var _ = Describe("Federation spec", func() { Expect(k8sClient.Create(ctx, &federation)).To(MatchError(`Federation.rabbitmq.com "invalid-federation" is invalid: spec.ackMode: Unsupported value: "non-existing-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`)) }) }) + + It("creates a federation with non-default DeletionPolicy", func() { + federation := Federation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "federation-with-retain-policy", + Namespace: namespace, + }, + Spec: FederationSpec{ + Name: "federation-with-retain-policy", + DeletionPolicy: "retain", + UriSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + }, + } + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + fetched := &Federation{} + Expect(k8sClient.Get(ctx, types.NamespacedName{ + Name: federation.Name, + Namespace: federation.Namespace, + }, fetched)).To(Succeed()) + + Expect(fetched.Spec.DeletionPolicy).To(Equal("retain")) + Expect(fetched.Spec.Name).To(Equal("federation-with-retain-policy")) + }) }) diff --git a/api/v1beta1/federation_webhook_test.go b/api/v1beta1/federation_webhook_test.go index 766b1b09..7a98deed 100644 --- a/api/v1beta1/federation_webhook_test.go +++ b/api/v1beta1/federation_webhook_test.go @@ -115,5 +115,12 @@ var _ = Describe("federation webhook", func() { _, err := newFederation.ValidateUpdate(rootCtx, &federation, newFederation) Expect(err).To(Succeed()) }) + + It("allows updates on federation.spec.deletionPolicy", func() { + newFederation := federation.DeepCopy() + newFederation.Spec.DeletionPolicy = "retain" + _, err := newFederation.ValidateUpdate(rootCtx, &federation, newFederation) + Expect(err).ToNot(HaveOccurred()) + }) }) }) diff --git a/api/v1beta1/queue_types.go b/api/v1beta1/queue_types.go index 11e3dcfc..0cc85b20 100644 --- a/api/v1beta1/queue_types.go +++ b/api/v1beta1/queue_types.go @@ -45,6 +45,11 @@ type QueueSpec struct { // Required property. // +kubebuilder:validation:Required RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"` + // DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted. + // Can be set to 'delete' or 'retain'. Default is 'delete'. + // +kubebuilder:validation:Enum=delete;retain + // +kubebuilder:default:=delete + DeletionPolicy string `json:"deletionPolicy,omitempty"` } // QueueStatus defines the observed state of Queue diff --git a/api/v1beta1/queue_types_test.go b/api/v1beta1/queue_types_test.go index e6980df2..91da4bd5 100644 --- a/api/v1beta1/queue_types_test.go +++ b/api/v1beta1/queue_types_test.go @@ -17,10 +17,11 @@ var _ = Describe("Queue spec", func() { It("creates a queue with default settings", func() { expectedSpec := QueueSpec{ - Name: "test-queue", - Vhost: "/", - Durable: false, - AutoDelete: false, + Name: "test-queue", + Vhost: "/", + Durable: false, + AutoDelete: false, + DeletionPolicy: "delete", RabbitmqClusterReference: RabbitmqClusterReference{ Name: "some-cluster", }, @@ -85,4 +86,29 @@ var _ = Describe("Queue spec", func() { })) Expect(fetchedQ.Spec.Arguments.Raw).To(Equal([]byte(`{"yoyo":10}`))) }) + + It("creates a queue with non-default DeletionPolicy", func() { + q := Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "queue-with-retain-policy", + Namespace: namespace, + }, + Spec: QueueSpec{ + Name: "queue-with-retain-policy", + DeletionPolicy: "retain", + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "random-cluster", + }, + }, + } + Expect(k8sClient.Create(ctx, &q)).To(Succeed()) + fetchedQ := &Queue{} + Expect(k8sClient.Get(ctx, types.NamespacedName{ + Name: q.Name, + Namespace: q.Namespace, + }, fetchedQ)).To(Succeed()) + + Expect(fetchedQ.Spec.DeletionPolicy).To(Equal("retain")) + Expect(fetchedQ.Spec.Name).To(Equal("queue-with-retain-policy")) + }) }) diff --git a/api/v1beta1/queue_webhook_test.go b/api/v1beta1/queue_webhook_test.go index a0bacc46..ff2fb71b 100644 --- a/api/v1beta1/queue_webhook_test.go +++ b/api/v1beta1/queue_webhook_test.go @@ -154,5 +154,12 @@ var _ = Describe("queue webhook", func() { _, err = newQueue.ValidateUpdate(rootCtx, &queue, newQueue) Expect(err).To(MatchError(ContainSubstring("queue arguments cannot be updated"))) }) + + It("allows updates on queue.spec.deletionPolicy", func() { + newQueue := queue.DeepCopy() + newQueue.Spec.DeletionPolicy = "retain" + _, err := newQueue.ValidateUpdate(rootCtx, &queue, newQueue) + Expect(err).ToNot(HaveOccurred()) + }) }) }) diff --git a/api/v1beta1/shovel_types.go b/api/v1beta1/shovel_types.go index 317ad967..1e7c01e3 100644 --- a/api/v1beta1/shovel_types.go +++ b/api/v1beta1/shovel_types.go @@ -84,6 +84,11 @@ type ShovelSpec struct { SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"` // amqp10 configuration; required if srcProtocol is amqp10 SourceAddress string `json:"srcAddress,omitempty"` + // DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted. + // Can be set to 'delete' or 'retain'. Default is 'delete'. + // +kubebuilder:validation:Enum=delete;retain + // +kubebuilder:default:=delete + DeletionPolicy string `json:"deletionPolicy,omitempty"` } // ShovelStatus defines the observed state of Shovel diff --git a/api/v1beta1/shovel_types_test.go b/api/v1beta1/shovel_types_test.go index ab275705..b179adab 100644 --- a/api/v1beta1/shovel_types_test.go +++ b/api/v1beta1/shovel_types_test.go @@ -193,4 +193,36 @@ var _ = Describe("Shovel spec", func() { Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-srcprotocol" is invalid: spec.srcProtocol: Unsupported value: "mqtt": supported values: "amqp091", "amqp10"`)) }) }) + + It("creates a shovel with non-default DeletionPolicy", func() { + shovel := Shovel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shovel-with-retain-policy", + Namespace: namespace, + }, + Spec: ShovelSpec{ + Name: "shovel-with-retain-policy", + DeletionPolicy: "retain", + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + UriSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + }, + } + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + fetched := &Shovel{} + Expect(k8sClient.Get(ctx, types.NamespacedName{ + Name: shovel.Name, + Namespace: shovel.Namespace, + }, fetched)).To(Succeed()) + + Expect(fetched.Spec.DeletionPolicy).To(Equal("retain")) + Expect(fetched.Spec.Name).To(Equal("shovel-with-retain-policy")) + Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{ + Name: "some-cluster", + })) + Expect(fetched.Spec.UriSecret.Name).To(Equal("a-secret")) + }) }) diff --git a/api/v1beta1/shovel_webhook_test.go b/api/v1beta1/shovel_webhook_test.go index 0c5f90fd..8b8090da 100644 --- a/api/v1beta1/shovel_webhook_test.go +++ b/api/v1beta1/shovel_webhook_test.go @@ -179,5 +179,12 @@ var _ = Describe("shovel webhook", func() { _, err := newShovel.ValidateUpdate(rootCtx, &shovel, newShovel) Expect(err).ToNot(HaveOccurred()) }) + + It("allows updates on shovel.spec.deletionPolicy", func() { + newShovel := shovel.DeepCopy() + newShovel.Spec.DeletionPolicy = "retain" + _, err := newShovel.ValidateUpdate(rootCtx, &shovel, newShovel) + Expect(err).ToNot(HaveOccurred()) + }) }) }) diff --git a/api/v1beta1/vhost_types.go b/api/v1beta1/vhost_types.go index 207f1fa1..f152b871 100644 --- a/api/v1beta1/vhost_types.go +++ b/api/v1beta1/vhost_types.go @@ -29,6 +29,11 @@ type VhostSpec struct { // Required property. // +kubebuilder:validation:Required RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"` + // DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted. + // Can be set to 'delete' or 'retain'. Default is 'delete'. + // +kubebuilder:validation:Enum=delete;retain + // +kubebuilder:default:=delete + DeletionPolicy string `json:"deletionPolicy,omitempty"` } // VhostStatus defines the observed state of Vhost diff --git a/api/v1beta1/vhost_types_test.go b/api/v1beta1/vhost_types_test.go index c838bfcf..dd9aefff 100644 --- a/api/v1beta1/vhost_types_test.go +++ b/api/v1beta1/vhost_types_test.go @@ -16,8 +16,9 @@ var _ = Describe("Vhost", func() { It("creates a vhost", func() { expectedSpec := VhostSpec{ - Name: "test-vhost", - Tracing: false, + Name: "test-vhost", + Tracing: false, + DeletionPolicy: "delete", RabbitmqClusterReference: RabbitmqClusterReference{ Name: "some-cluster", }, @@ -134,4 +135,29 @@ var _ = Describe("Vhost", func() { Expect(k8sClient.Create(ctx, qTypeVhost)).To(MatchError(`Vhost.rabbitmq.com "some-vhost" is invalid: spec.defaultQueueType: Unsupported value: "aqueuetype": supported values: "quorum", "classic", "stream"`)) }) }) + + It("creates a vhost with non-default DeletionPolicy", func() { + vhost := Vhost{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vhost-with-retain-policy", + Namespace: namespace, + }, + Spec: VhostSpec{ + Name: "vhost-with-retain-policy", + DeletionPolicy: "retain", + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "random-cluster", + }, + }, + } + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + fetched := &Vhost{} + Expect(k8sClient.Get(ctx, types.NamespacedName{ + Name: vhost.Name, + Namespace: vhost.Namespace, + }, fetched)).To(Succeed()) + + Expect(fetched.Spec.DeletionPolicy).To(Equal("retain")) + Expect(fetched.Spec.Name).To(Equal("vhost-with-retain-policy")) + }) }) diff --git a/api/v1beta1/vhost_webhook_test.go b/api/v1beta1/vhost_webhook_test.go index 3390ab3e..4eb82ef2 100644 --- a/api/v1beta1/vhost_webhook_test.go +++ b/api/v1beta1/vhost_webhook_test.go @@ -102,5 +102,12 @@ var _ = Describe("vhost webhook", func() { _, err := newVhost.ValidateUpdate(rootCtx, &vhost, newVhost) Expect(err).ToNot(HaveOccurred()) }) + + It("allows updates on vhost.spec.deletionPolicy", func() { + newVhost := vhost.DeepCopy() + newVhost.Spec.DeletionPolicy = "retain" + _, err := newVhost.ValidateUpdate(rootCtx, &vhost, newVhost) + Expect(err).ToNot(HaveOccurred()) + }) }) }) diff --git a/config/crd/bases/rabbitmq.com_federations.yaml b/config/crd/bases/rabbitmq.com_federations.yaml index 04850de3..bcbf05cc 100644 --- a/config/crd/bases/rabbitmq.com_federations.yaml +++ b/config/crd/bases/rabbitmq.com_federations.yaml @@ -49,6 +49,15 @@ spec: - on-publish - no-ack type: string + deletionPolicy: + default: delete + description: |- + DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted. + Can be set to 'delete' or 'retain'. Default is 'delete'. + enum: + - delete + - retain + type: string exchange: type: string expires: diff --git a/config/crd/bases/rabbitmq.com_queues.yaml b/config/crd/bases/rabbitmq.com_queues.yaml index 530a2d69..8e7cfdfa 100644 --- a/config/crd/bases/rabbitmq.com_queues.yaml +++ b/config/crd/bases/rabbitmq.com_queues.yaml @@ -58,6 +58,15 @@ spec: description: when set to true, queues are delete only if they have no consumer. type: boolean + deletionPolicy: + default: delete + description: |- + DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted. + Can be set to 'delete' or 'retain'. Default is 'delete'. + enum: + - delete + - retain + type: string durable: description: When set to false queues does not survive server restart. type: boolean diff --git a/config/crd/bases/rabbitmq.com_shovels.yaml b/config/crd/bases/rabbitmq.com_shovels.yaml index d3f6d8d8..b7b4bfd0 100644 --- a/config/crd/bases/rabbitmq.com_shovels.yaml +++ b/config/crd/bases/rabbitmq.com_shovels.yaml @@ -53,6 +53,15 @@ spec: type: boolean deleteAfter: type: string + deletionPolicy: + default: delete + description: |- + DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted. + Can be set to 'delete' or 'retain'. Default is 'delete'. + enum: + - delete + - retain + type: string destAddForwardHeaders: type: boolean destAddTimestampHeader: diff --git a/config/crd/bases/rabbitmq.com_vhosts.yaml b/config/crd/bases/rabbitmq.com_vhosts.yaml index 78757eb7..7537ddd8 100644 --- a/config/crd/bases/rabbitmq.com_vhosts.yaml +++ b/config/crd/bases/rabbitmq.com_vhosts.yaml @@ -50,6 +50,15 @@ spec: - classic - stream type: string + deletionPolicy: + default: delete + description: |- + DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted. + Can be set to 'delete' or 'retain'. Default is 'delete'. + enum: + - delete + - retain + type: string name: description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html. type: string diff --git a/controllers/federation_controller.go b/controllers/federation_controller.go index 17a7684d..ad347a88 100644 --- a/controllers/federation_controller.go +++ b/controllers/federation_controller.go @@ -54,6 +54,9 @@ func (r *FederationReconciler) getUri(ctx context.Context, federation *topology. func (r *FederationReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error { logger := ctrl.LoggerFrom(ctx) federation := obj.(*topology.Federation) + if shouldSkipDeletion(ctx, federation.Spec.DeletionPolicy, federation.Spec.Name) { + return nil + } err := validateResponseForDeletion(client.DeleteFederationUpstream(federation.Spec.Vhost, federation.Spec.Name)) if errors.Is(err, NotFound) { logger.Info("cannot find federation upstream parameter; no need to delete it", "federation", federation.Spec.Name) diff --git a/controllers/federation_controller_test.go b/controllers/federation_controller_test.go index c422fa05..de9f5a17 100644 --- a/controllers/federation_controller_test.go +++ b/controllers/federation_controller_test.go @@ -225,4 +225,30 @@ var _ = Describe("federation-controller", func() { }) }) }) + + When("the Federation has DeletionPolicy set to retain", func() { + BeforeEach(func() { + federationName = "federation-with-retain-policy" + federation.Spec.DeletionPolicy = "retain" + fakeRabbitMQClient.DeleteFederationUpstreamReturns(&http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + }, nil) + }) + + It("deletes the k8s resource but preserves the federation in RabbitMQ server", func() { + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &federation)).To(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation) + return apierrors.IsNotFound(err) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + + Expect(fakeRabbitMQClient.DeleteFederationUpstreamCallCount()).To(Equal(0)) + }) + }) }) diff --git a/controllers/queue_controller.go b/controllers/queue_controller.go index 9f473fcc..8032ce23 100644 --- a/controllers/queue_controller.go +++ b/controllers/queue_controller.go @@ -47,6 +47,9 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient. logger.Info("Deleting queues from ReconcilerFunc DeleteObj") queue := obj.(*topology.Queue) + if shouldSkipDeletion(ctx, queue.Spec.DeletionPolicy, queue.Spec.Name) { + return nil + } queueDeleteOptions, err := internal.GenerateQueueDeleteOptions(queue) if err != nil { return fmt.Errorf("failed to generate queue delete options: %w", err) diff --git a/controllers/queue_controller_test.go b/controllers/queue_controller_test.go index d2e740b8..9cad0525 100644 --- a/controllers/queue_controller_test.go +++ b/controllers/queue_controller_test.go @@ -221,4 +221,30 @@ var _ = Describe("queue-controller", func() { }) }) }) + + When("the Queue has DeletionPolicy set to retain", func() { + BeforeEach(func() { + queueName = "queue-with-retain-policy" + queue.Spec.DeletionPolicy = "retain" + fakeRabbitMQClient.DeleteQueueReturns(&http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + }, nil) + }) + + It("deletes the k8s resource but preserves the queue in RabbitMQ server", func() { + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue) + return apierrors.IsNotFound(err) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + + Expect(fakeRabbitMQClient.DeleteQueueCallCount()).To(Equal(0)) + }) + }) }) diff --git a/controllers/shovel_controller.go b/controllers/shovel_controller.go index 2ca11399..99ba7ca2 100644 --- a/controllers/shovel_controller.go +++ b/controllers/shovel_controller.go @@ -62,6 +62,9 @@ func (r *ShovelReconciler) getUris(ctx context.Context, shovel *topology.Shovel) func (r *ShovelReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error { logger := ctrl.LoggerFrom(ctx) shovel := obj.(*topology.Shovel) + if shouldSkipDeletion(ctx, shovel.Spec.DeletionPolicy, shovel.Spec.Name) { + return nil + } err := validateResponseForDeletion(client.DeleteShovel(shovel.Spec.Vhost, shovel.Spec.Name)) if errors.Is(err, NotFound) { logger.Info("cannot find shovel parameter; no need to delete it", "shovel", shovel.Spec.Name) diff --git a/controllers/shovel_controller_test.go b/controllers/shovel_controller_test.go index 520aadc7..df18a61c 100644 --- a/controllers/shovel_controller_test.go +++ b/controllers/shovel_controller_test.go @@ -286,4 +286,29 @@ var _ = Describe("shovel-controller", func() { }) }) }) + When("the Shovel has DeletionPolicy set to retain", func() { + BeforeEach(func() { + shovelName = "shovel-with-retain-policy" + shovel.Spec.DeletionPolicy = "retain" + fakeRabbitMQClient.DeleteShovelReturns(&http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + }, nil) + }) + + It("deletes the k8s resource but preserves the shovel in RabbitMQ server", func() { + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel) + return apierrors.IsNotFound(err) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + + Expect(fakeRabbitMQClient.DeleteShovelCallCount()).To(Equal(0)) + }) + }) }) diff --git a/controllers/utils.go b/controllers/utils.go index c22c0a05..14015047 100644 --- a/controllers/utils.go +++ b/controllers/utils.go @@ -128,3 +128,13 @@ func handleRMQReferenceParseError(ctx context.Context, client client.Client, eve logger.Error(err, failedParseClusterRef) return reconcile.Result{}, err } + +// shouldSkipDeletion checks the DeletionPolicy and logs if deletion should be skipped. +func shouldSkipDeletion(ctx context.Context, deletionPolicy string, resourceName string) bool { + logger := ctrl.LoggerFrom(ctx) + if deletionPolicy == "retain" { + logger.Info("Skipping deletion in RabbitMQ server due to deletionPolicy=retain", "resource", resourceName) + return true + } + return false +} diff --git a/controllers/vhost_controller.go b/controllers/vhost_controller.go index d7aa0875..4bcfbc32 100644 --- a/controllers/vhost_controller.go +++ b/controllers/vhost_controller.go @@ -30,6 +30,9 @@ func (r *VhostReconciler) DeclareFunc(_ context.Context, client rabbitmqclient.C func (r *VhostReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error { logger := ctrl.LoggerFrom(ctx) vhost := obj.(*topology.Vhost) + if shouldSkipDeletion(ctx, vhost.Spec.DeletionPolicy, vhost.Spec.Name) { + return nil + } err := validateResponseForDeletion(client.DeleteVhost(vhost.Spec.Name)) if errors.Is(err, NotFound) { logger.Info("cannot find vhost in rabbitmq server; already deleted", "vhost", vhost.Spec.Name) diff --git a/controllers/vhost_controller_test.go b/controllers/vhost_controller_test.go index 3404fbda..c5132655 100644 --- a/controllers/vhost_controller_test.go +++ b/controllers/vhost_controller_test.go @@ -222,4 +222,30 @@ var _ = Describe("vhost-controller", func() { }) }) }) + + When("the Vhost has DeletionPolicy set to retain", func() { + BeforeEach(func() { + vhostName = "vhost-with-retain-policy" + vhost.Spec.DeletionPolicy = "retain" + fakeRabbitMQClient.DeleteVhostReturns(&http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + }, nil) + }) + + It("deletes the k8s resource but preserves the vhost in RabbitMQ server", func() { + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost) + return apierrors.IsNotFound(err) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + + Expect(fakeRabbitMQClient.DeleteVhostCallCount()).To(Equal(0)) + }) + }) }) diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index a854cae5..65280d70 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -494,6 +494,8 @@ Required property. | *`trustUserId`* __boolean__ | | *`exchange`* __string__ | | *`queue`* __string__ | +| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted. +Can be set to 'delete' or 'retain'. Default is 'delete'. |=== @@ -908,6 +910,8 @@ QueueSpec defines the desired state of Queue Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead. | *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the queue will be created in. Required property. +| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted. +Can be set to 'delete' or 'retain'. Default is 'delete'. |=== @@ -1193,6 +1197,8 @@ Required property. | *`srcExchangeKey`* __string__ | amqp091 configuration | *`srcConsumerArgs`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#rawextension-runtime-pkg[$$RawExtension$$]__ | amqp091 configuration | *`srcAddress`* __string__ | amqp10 configuration; required if srcProtocol is amqp10 +| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted. +Can be set to 'delete' or 'retain'. Default is 'delete'. |=== @@ -1577,6 +1583,8 @@ VhostSpec defines the desired state of Vhost Supported in RabbitMQ 3.11.12 or above. | *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the vhost will be created in. Required property. +| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted. +Can be set to 'delete' or 'retain'. Default is 'delete'. |=== diff --git a/docs/examples/federations/federation.yaml b/docs/examples/federations/federation.yaml index 7bad870c..286fc97e 100644 --- a/docs/examples/federations/federation.yaml +++ b/docs/examples/federations/federation.yaml @@ -9,5 +9,6 @@ spec: uriSecret: name: federation-uri ackMode: "on-confirm" + deletionPolicy: retain # delete or retain; default to delete; rabbitmqClusterReference: name: example-rabbit diff --git a/docs/examples/queues/quorum-queue.yaml b/docs/examples/queues/quorum-queue.yaml index db54771c..06cff777 100644 --- a/docs/examples/queues/quorum-queue.yaml +++ b/docs/examples/queues/quorum-queue.yaml @@ -11,6 +11,7 @@ spec: type: quorum # without providing a queue type, rabbitmq creates a classic queue autoDelete: false durable: true # seting 'durable' to false means this queue won't survive a server restart + deletionPolicy: retain # delete or retain; default to delete; rabbitmqClusterReference: name: test # rabbitmqCluster must exist in the same namespace as this resource # status: diff --git a/docs/examples/queues/stream-queue.yaml b/docs/examples/queues/stream-queue.yaml index b3198a85..1f4fc999 100644 --- a/docs/examples/queues/stream-queue.yaml +++ b/docs/examples/queues/stream-queue.yaml @@ -9,5 +9,6 @@ spec: durable: true # mandatory for stream queues arguments: x-max-length-bytes: 1000000000 ## setting the retention policy + deletionPolicy: retain # delete or retain; default to delete; rabbitmqClusterReference: name: test # rabbitmqCluster must exist in the same namespace as this resource diff --git a/docs/examples/shovels/shovel.yaml b/docs/examples/shovels/shovel.yaml index 6af5590a..2c3a40cd 100644 --- a/docs/examples/shovels/shovel.yaml +++ b/docs/examples/shovels/shovel.yaml @@ -14,5 +14,6 @@ spec: destQueue: "destination-queue" destPublishProperties: # optional map of properties to overwrite when shovelling messages delivery_mode: 2 + deletionPolicy: retain # delete or retain; default to delete; rabbitmqClusterReference: name: example-rabbit diff --git a/docs/examples/vhosts/vhost.yaml b/docs/examples/vhosts/vhost.yaml index 4566d1ac..a0cc6611 100644 --- a/docs/examples/vhosts/vhost.yaml +++ b/docs/examples/vhosts/vhost.yaml @@ -5,6 +5,7 @@ metadata: spec: name: test-vhost # vhost name; required and cannot be updated defaultQueueType: quorum # default queue type for this vhost; require RabbitMQ version 3.11.12 or above + deletionPolicy: retain # delete or retain; default to delete; rabbitmqClusterReference: name: test # rabbitmqCluster must exist in the same namespace as this resource # status: diff --git a/system_tests/federation_system_test.go b/system_tests/federation_system_test.go index f87ee7f6..00d2bbda 100644 --- a/system_tests/federation_system_test.go +++ b/system_tests/federation_system_test.go @@ -11,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) var _ = Describe("federation", func() { @@ -124,4 +125,45 @@ var _ = Describe("federation", func() { }, 10).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("Object Not Found")) }) + + When("deletion policy is retain", func() { + It("deletes k8s resource but keeps the federation in RabbitMQ", func() { + federationWithRetain := &topology.Federation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "retain-policy-test", + Namespace: namespace, + }, + Spec: topology.FederationSpec{ + Name: "retain-policy-test", + UriSecret: &corev1.LocalObjectReference{Name: federationUriSecret.Name}, + DeletionPolicy: "retain", + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + }, + } + + By("creating a federation with retain policy") + Expect(k8sClient.Create(ctx, federationWithRetain, &client.CreateOptions{})).To(Succeed()) + + By("waiting for the federation to be created in RabbitMQ") + Eventually(func() error { + _, err := rabbitClient.GetFederationUpstream("/", federationWithRetain.Spec.Name) + return err + }, 30, 2).ShouldNot(HaveOccurred()) + + By("deleting the k8s resource") + Expect(k8sClient.Delete(ctx, federationWithRetain)).To(Succeed()) + + By("verifying k8s resource is gone") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: federationWithRetain.Name, Namespace: federationWithRetain.Namespace}, &topology.Federation{}) + return apierrors.IsNotFound(err) + }, 30, 2).Should(BeTrue()) + + By("verifying federation still exists in RabbitMQ") + _, err := rabbitClient.GetFederationUpstream("/", federationWithRetain.Spec.Name) + Expect(err).NotTo(HaveOccurred()) + }) + }) }) diff --git a/system_tests/queue_system_test.go b/system_tests/queue_system_test.go index 3a0a9b86..eb1dfe41 100644 --- a/system_tests/queue_system_test.go +++ b/system_tests/queue_system_test.go @@ -14,6 +14,7 @@ import ( . "github.com/onsi/gomega/gstruct" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) var _ = Describe("Queue Controller", func() { @@ -101,4 +102,46 @@ var _ = Describe("Queue Controller", func() { }, 30).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("Object Not Found")) }) + + When("deletion policy is retain", func() { + It("deletes k8s resource but keeps the queue in RabbitMQ", func() { + queueWithRetain := &topology.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "retain-policy-test", + Namespace: namespace, + }, + Spec: topology.QueueSpec{ + Name: "retain-policy-test", + Type: "classic", + Durable: true, + DeletionPolicy: "retain", + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + }, + } + + By("creating a queue with retain policy") + Expect(k8sClient.Create(ctx, queueWithRetain, &client.CreateOptions{})).To(Succeed()) + + By("waiting for the queue to be created in RabbitMQ") + Eventually(func() error { + _, err := rabbitClient.GetQueue("/", queueWithRetain.Spec.Name) + return err + }, 30, 2).ShouldNot(HaveOccurred()) + + By("deleting the k8s resource") + Expect(k8sClient.Delete(ctx, queueWithRetain)).To(Succeed()) + + By("verifying k8s resource is gone") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: queueWithRetain.Name, Namespace: queueWithRetain.Namespace}, &topology.Queue{}) + return apierrors.IsNotFound(err) + }, 30, 2).Should(BeTrue()) + + By("verifying queue still exists in RabbitMQ") + _, err := rabbitClient.GetQueue("/", queueWithRetain.Spec.Name) + Expect(err).NotTo(HaveOccurred()) + }) + }) }) diff --git a/system_tests/shovel_system_test.go b/system_tests/shovel_system_test.go index ece77d9e..30ca154a 100644 --- a/system_tests/shovel_system_test.go +++ b/system_tests/shovel_system_test.go @@ -12,6 +12,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) var _ = Describe("Shovel", func() { @@ -145,6 +146,48 @@ var _ = Describe("Shovel", func() { Expect(k8sClient.Delete(ctx, shovel)).To(Succeed()) assertShovelDeleted(shovel) }) + + When("deletion policy is retain", func() { + It("deletes k8s resource but keeps the shovel in RabbitMQ", func() { + shovelWithRetain := &topology.Shovel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "retain-policy-test", + Namespace: namespace, + }, + Spec: topology.ShovelSpec{ + Name: "retain-policy-test", + UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name}, + DeletionPolicy: "retain", + SourceQueue: "test-queue", + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + }, + } + + By("creating a shovel with retain policy") + Expect(k8sClient.Create(ctx, shovelWithRetain, &client.CreateOptions{})).To(Succeed()) + + By("waiting for the shovel to be created in RabbitMQ") + Eventually(func() error { + _, err := rabbitClient.GetShovel("/", shovelWithRetain.Spec.Name) + return err + }, 30, 2).ShouldNot(HaveOccurred()) + + By("deleting the k8s resource") + Expect(k8sClient.Delete(ctx, shovelWithRetain)).To(Succeed()) + + By("verifying k8s resource is gone") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovelWithRetain.Name, Namespace: shovelWithRetain.Namespace}, &topology.Shovel{}) + return apierrors.IsNotFound(err) + }, 30, 2).Should(BeTrue()) + + By("verifying shovel still exists in RabbitMQ") + _, err := rabbitClient.GetShovel("/", shovelWithRetain.Spec.Name) + Expect(err).NotTo(HaveOccurred()) + }) + }) }) func declareAssertShovelCommonProperties(ctx context.Context, shovel *topology.Shovel) *rabbithole.ShovelInfo { diff --git a/system_tests/vhost_system_test.go b/system_tests/vhost_system_test.go index 839e765f..fa8ccc60 100644 --- a/system_tests/vhost_system_test.go +++ b/system_tests/vhost_system_test.go @@ -11,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) var _ = Describe("vhost", func() { @@ -98,4 +99,44 @@ var _ = Describe("vhost", func() { }, 30).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("Object Not Found")) }) + + When("deletion policy is retain", func() { + It("deletes k8s resource but keeps the vhost in RabbitMQ", func() { + vhostWithRetain := &topology.Vhost{ + ObjectMeta: metav1.ObjectMeta{ + Name: "retain-policy-test", + Namespace: namespace, + }, + Spec: topology.VhostSpec{ + Name: "retain-policy-test", + DeletionPolicy: "retain", + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + }, + } + + By("creating a vhost with retain policy") + Expect(k8sClient.Create(ctx, vhostWithRetain, &client.CreateOptions{})).To(Succeed()) + + By("waiting for the vhost to be created in RabbitMQ") + Eventually(func() error { + _, err := rabbitClient.GetVhost(vhostWithRetain.Spec.Name) + return err + }, 30, 2).ShouldNot(HaveOccurred()) + + By("deleting the k8s resource") + Expect(k8sClient.Delete(ctx, vhostWithRetain)).To(Succeed()) + + By("verifying k8s resource is gone") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: vhostWithRetain.Name, Namespace: vhostWithRetain.Namespace}, &topology.Vhost{}) + return apierrors.IsNotFound(err) + }, 30, 2).Should(BeTrue()) + + By("verifying vhost still exists in RabbitMQ") + _, err := rabbitClient.GetVhost(vhostWithRetain.Spec.Name) + Expect(err).NotTo(HaveOccurred()) + }) + }) })