Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions apis/bases/rabbitmq.openstack.org_rabbitmqs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1401,13 +1401,9 @@ spec:
type: string
type: object
queueType:
default: Mirrored
description: QueueType to eventually apply the ha-all policy or configure
default queue type for the cluster
enum:
- None
- Mirrored
- Quorum
description: |-
QueueType to eventually apply the ha-all policy or configure default queue type for the cluster.
Allowed values are: None, Mirrored, Quorum. Defaults to Quorum if not specified.
type: string
rabbitmq:
description: Configuration options for RabbitMQ Pods created in the
Expand Down
8 changes: 4 additions & 4 deletions apis/rabbitmq/v1beta1/rabbitmq_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type RabbitMqSpecCore struct {
// by name
TopologyRef *topologyv1.TopoRef `json:"topologyRef,omitempty"`
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Enum=None;Mirrored;Quorum
// +kubebuilder:default=Mirrored
// QueueType to eventually apply the ha-all policy or configure default queue type for the cluster
QueueType string `json:"queueType"`
// +operator-sdk:csv:customresourcedefinitions:type=spec
// QueueType to eventually apply the ha-all policy or configure default queue type for the cluster.
// Allowed values are: None, Mirrored, Quorum. Defaults to Quorum if not specified.
QueueType *string `json:"queueType,omitempty"`
}

// Method to convert RabbitMqSpec to RabbitmqClusterSpec
Expand Down
37 changes: 36 additions & 1 deletion apis/rabbitmq/v1beta1/rabbitmq_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ func (spec *RabbitMqSpec) Default() {

// Default - common validations go here (for the OpenStackControlplane which uses this one)
func (spec *RabbitMqSpecCore) Default() {
//nothing to validate yet
// Set a sensible default for QueueType only when not explicitly provided
if spec.QueueType == nil || *spec.QueueType == "" {
queueType := "Quorum"
spec.QueueType = &queueType
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
Expand Down Expand Up @@ -101,6 +105,9 @@ func (r *RabbitMq) ValidateCreate() (admission.Warnings, error) {
CrMaxLengthCorrection,
)...) // omit issue with statefulset pod label "controller-revision-hash": "<statefulset_name>-<hash>"

// Validate QueueType if specified
allErrs = append(allErrs, r.Spec.ValidateQueueType(basePath)...)

if len(allErrs) != 0 {
return allWarn, apierrors.NewInvalid(
schema.GroupKind{Group: "rabbitmq.openstack.org", Kind: "RabbitMq"},
Expand Down Expand Up @@ -128,6 +135,9 @@ func (r *RabbitMq) ValidateUpdate(_ runtime.Object) (admission.Warnings, error)
allWarn = append(allWarn, warn...)
allErrs = append(allErrs, errs...)

// Validate QueueType if specified
allErrs = append(allErrs, r.Spec.ValidateQueueType(basePath)...)

if len(allErrs) != 0 {
return allWarn, apierrors.NewInvalid(
schema.GroupKind{Group: "rabbitmq.openstack.org", Kind: "RabbitMq"},
Expand Down Expand Up @@ -173,3 +183,28 @@ func (r *RabbitMq) ValidateDelete() (admission.Warnings, error) {
// TODO(user): fill in your validation logic upon object deletion.
return nil, nil
}

// ValidateQueueType validates that QueueType is one of the allowed values
func (spec *RabbitMqSpec) ValidateQueueType(basePath *field.Path) field.ErrorList {
var allErrs field.ErrorList

if spec.QueueType != nil {
allowedValues := []string{"None", "Mirrored", "Quorum"}
isValid := false
for _, allowed := range allowedValues {
if *spec.QueueType == allowed {
isValid = true
break
}
}
if !isValid {
allErrs = append(allErrs, field.NotSupported(
basePath.Child("queueType"),
*spec.QueueType,
allowedValues,
))
}
}

return allErrs
}
5 changes: 5 additions & 0 deletions apis/rabbitmq/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions config/crd/bases/rabbitmq.openstack.org_rabbitmqs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1401,13 +1401,9 @@ spec:
type: string
type: object
queueType:
default: Mirrored
description: QueueType to eventually apply the ha-all policy or configure
default queue type for the cluster
enum:
- None
- Mirrored
- Quorum
description: |-
QueueType to eventually apply the ha-all policy or configure default queue type for the cluster.
Allowed values are: None, Mirrored, Quorum. Defaults to Quorum if not specified.
type: string
rabbitmq:
description: Configuration options for RabbitMQ Pods created in the
Expand Down
63 changes: 33 additions & 30 deletions controllers/rabbitmq/rabbitmq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,40 +425,43 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
instance.Status.Conditions.MarkTrue(condition.PDBReadyCondition, condition.PDBReadyMessage)

// Let's wait DeploymentReadyCondition=True to apply the policy
if instance.Spec.QueueType == "Mirrored" && *instance.Spec.Replicas > 1 && instance.Status.QueueType != "Mirrored" {
Log.Info("ha-all policy not present. Applying.")
err := updateMirroredPolicy(ctx, helper, instance, r.config, true)
if err != nil {
Log.Error(err, "Could not apply ha-all policy")
instance.Status.Conditions.Set(condition.FalseCondition(
condition.DeploymentReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
condition.DeploymentReadyErrorMessage, err.Error()))
return ctrl.Result{}, err
// QueueType should never be nil due to webhook defaulting, but add safety check
if instance.Spec.QueueType != nil {
if *instance.Spec.QueueType == "Mirrored" && *instance.Spec.Replicas > 1 && instance.Status.QueueType != "Mirrored" {
Log.Info("ha-all policy not present. Applying.")
err := updateMirroredPolicy(ctx, helper, instance, r.config, true)
if err != nil {
Log.Error(err, "Could not apply ha-all policy")
instance.Status.Conditions.Set(condition.FalseCondition(
condition.DeploymentReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
condition.DeploymentReadyErrorMessage, err.Error()))
return ctrl.Result{}, err
}
} else if *instance.Spec.QueueType != "Mirrored" && instance.Status.QueueType == "Mirrored" {
Log.Info("Removing ha-all policy")
err := updateMirroredPolicy(ctx, helper, instance, r.config, false)
if err != nil {
Log.Error(err, "Could not remove ha-all policy")
instance.Status.Conditions.Set(condition.FalseCondition(
condition.DeploymentReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
condition.DeploymentReadyErrorMessage, err.Error()))
return ctrl.Result{}, err
}
}
} else if instance.Spec.QueueType != "Mirrored" && instance.Status.QueueType == "Mirrored" {
Log.Info("Removing ha-all policy")
err := updateMirroredPolicy(ctx, helper, instance, r.config, false)
if err != nil {
Log.Error(err, "Could not remove ha-all policy")
instance.Status.Conditions.Set(condition.FalseCondition(
condition.DeploymentReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
condition.DeploymentReadyErrorMessage, err.Error()))
return ctrl.Result{}, err

// Update status for Quorum queue type
if *instance.Spec.QueueType == "Quorum" && instance.Status.QueueType != "Quorum" {
Log.Info("Setting queue type status to quorum")
} else if *instance.Spec.QueueType != "Quorum" && instance.Status.QueueType == "Quorum" {
Log.Info("Removing quorum queue type status")
}
}

// Update status for Quorum queue type
if instance.Spec.QueueType == "Quorum" && instance.Status.QueueType != "Quorum" {
Log.Info("Setting queue type status to quorum")
} else if instance.Spec.QueueType != "Quorum" && instance.Status.QueueType == "Quorum" {
Log.Info("Removing quorum queue type status")
instance.Status.QueueType = *instance.Spec.QueueType
}

instance.Status.QueueType = instance.Spec.QueueType
}

if instance.Status.Conditions.AllSubConditionIsTrue() {
Expand Down
27 changes: 27 additions & 0 deletions tests/functional/rabbitmq_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,33 @@ var _ = Describe("RabbitMQ Controller", func() {
DeferCleanup(th.DeleteConfigMap, clusterCm)
})

When("QueueType defaulting and explicit values", func() {
It("defaults QueueType to Quorum when unspecified", func() {
spec := GetDefaultRabbitMQSpec()
rabbitmq := CreateRabbitMQ(rabbitmqName, spec)
DeferCleanup(th.DeleteInstance, rabbitmq)

Eventually(func(g Gomega) {
instance := GetRabbitMQ(rabbitmqName)
g.Expect(instance.Spec.QueueType).ToNot(BeNil())
g.Expect(*instance.Spec.QueueType).To(Equal("Quorum"))
}, timeout, interval).Should(Succeed())
})

It("preserves explicitly set QueueType", func() {
spec := GetDefaultRabbitMQSpec()
spec["queueType"] = "Mirrored"
rabbitmq := CreateRabbitMQ(rabbitmqName, spec)
DeferCleanup(th.DeleteInstance, rabbitmq)

Eventually(func(g Gomega) {
instance := GetRabbitMQ(rabbitmqName)
g.Expect(instance.Spec.QueueType).ToNot(BeNil())
g.Expect(*instance.Spec.QueueType).To(Equal("Mirrored"))
}, timeout, interval).Should(Succeed())
})
})

When("a default RabbitMQ gets created", func() {
BeforeEach(func() {
rabbitmq := CreateRabbitMQ(rabbitmqName, GetDefaultRabbitMQSpec())
Expand Down