Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1beta1/vhost_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type VhostSpec struct {
// +kubebuilder:validation:Enum=delete;retain
// +kubebuilder:default:=delete
DeletionPolicy string `json:"deletionPolicy,omitempty"`
// Limits defines limits to be applied to the vhost.
// Supported limits include max-connections and max-queues.
// See https://www.rabbitmq.com/docs/vhosts#limits
VhostLimits *VhostLimits `json:"limits,omitempty"`
}

// VhostStatus defines the observed state of Vhost
Expand Down Expand Up @@ -67,6 +71,12 @@ type VhostList struct {
Items []Vhost `json:"items"`
}

// VhostLimits defines limits to be applied to the vhost.
type VhostLimits struct {
Connections *int32 `json:"connections,omitempty"`
Queues *int32 `json:"queues,omitempty"`
}

func (v *Vhost) GroupResource() schema.GroupResource {
return schema.GroupResource{
Group: v.GroupVersionKind().Group,
Expand Down
103 changes: 103 additions & 0 deletions api/v1beta1/vhost_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,109 @@ var _ = Describe("Vhost", func() {
}))
})

Context("Vhost Limits", func() {
var connections, queues int32

When("vhost limits are configured", func() {
It("creates a vhost with limits", func() {
connections = 1000
queues = 500
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-with-limits",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-with-limits",
VhostLimits: &VhostLimits{
Connections: &connections,
Queues: &queues,
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(*fetched.Spec.VhostLimits.Connections).To(Equal(connections))
Expect(*fetched.Spec.VhostLimits.Queues).To(Equal(queues))
Expect(fetched.Spec.Name).To(Equal("vhost-with-limits"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})
})

When("No vhost limits are provided", func() {
It("Does not set VhostLimits", func() {
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-with-no-provided-limits",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-with-no-provided-limits",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.VhostLimits).To(BeNil())
Expect(fetched.Spec.Name).To(Equal("vhost-with-no-provided-limits"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})
})

When("Only some vhost limits are provided", func() {
It("Configures those limits and lifts other limits", func() {
queues = 800
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-some-limits",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-some-limits",
VhostLimits: &VhostLimits{
Queues: &queues,
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.VhostLimits.Connections).To(BeNil())
Expect(*fetched.Spec.VhostLimits.Queues).To(Equal(queues))
Expect(fetched.Spec.Name).To(Equal("vhost-some-limits"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})
})
})

Context("Default queue types", func() {
var qTypeVhost = &Vhost{
ObjectMeta: metav1.ObjectMeta{
Expand Down
30 changes: 30 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions config/crd/bases/rabbitmq.com_vhosts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ spec:
- delete
- retain
type: string
limits:
description: |-
Limits defines limits to be applied to the vhost.
Supported limits include max-connections and max-queues.
See https://www.rabbitmq.com/docs/vhosts#limits
properties:
connections:
format: int32
type: integer
queues:
format: int32
type: integer
type: object
name:
description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
type: string
Expand Down
17 changes: 15 additions & 2 deletions controllers/vhost_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@ type VhostReconciler struct {
client.Client
}

func (r *VhostReconciler) DeclareFunc(_ context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
func (r *VhostReconciler) DeclareFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
logger := ctrl.LoggerFrom(ctx)
vhost := obj.(*topology.Vhost)
return validateResponse(client.PutVhost(vhost.Spec.Name, *internal.GenerateVhostSettings(vhost)))
settings := internal.GenerateVhostSettings(vhost)
logger.Info("generated vhost settings", "vhost", vhost.Spec.Name, "settings", settings)
err := validateResponse(client.PutVhost(vhost.Spec.Name, *settings))
if err != nil {
return err
}

vhostLimits := internal.GenerateVhostLimits(vhost.Spec.VhostLimits)
logger.Info("generated vhost limits", "vhost", vhost.Spec.Name, "limits", vhostLimits)
if len(vhostLimits) > 0 {
err = validateResponse(client.PutVhostLimits(vhost.Spec.Name, vhostLimits))
}
return err
}

// DeleteFunc deletes vhost from server
Expand Down
67 changes: 67 additions & 0 deletions controllers/vhost_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ = Describe("vhost-controller", func() {
managerCtx context.Context
managerCancel context.CancelFunc
k8sClient runtimeClient.Client
vhostLimits *topology.VhostLimits
)

BeforeEach(func() {
Expand Down Expand Up @@ -90,6 +91,7 @@ var _ = Describe("vhost-controller", func() {
RabbitmqClusterReference: topology.RabbitmqClusterReference{
Name: "example-rabbit",
},
VhostLimits: vhostLimits,
},
}
})
Expand Down Expand Up @@ -248,4 +250,69 @@ var _ = Describe("vhost-controller", func() {
Expect(fakeRabbitMQClient.DeleteVhostCallCount()).To(Equal(0))
})
})

Context("vhost limits", func() {
When("vhost limits are provided", func() {
var connections, queues int32

BeforeEach(func() {
connections = 708
queues = 509
vhostName = "vhost-with-limits"
vhostLimits = &topology.VhostLimits{
Connections: &connections,
Queues: &queues,
}
fakeRabbitMQClient.PutVhostReturns(&http.Response{
Status: "201 Created",
StatusCode: http.StatusCreated,
}, nil)
fakeRabbitMQClient.PutVhostLimitsReturns(&http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
}, nil)
})

It("puts the vhost limits", func() {
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
Eventually(func() []topology.Condition {
_ = k8sClient.Get(
ctx,
types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace},
&vhost,
)

return vhost.Status.Conditions
}).
Within(statusEventsUpdateTimeout).
WithPolling(time.Second).
Should(ContainElement(MatchFields(IgnoreExtras, Fields{
"Type": Equal(topology.ConditionType("Ready")),
"Reason": Equal("SuccessfulCreateOrUpdate"),
"Status": Equal(corev1.ConditionTrue),
})))

Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(BeNumerically(">", 0))
_, vhostLimitsValues := fakeRabbitMQClient.PutVhostLimitsArgsForCall(0)
Expect(len(vhostLimitsValues)).To(Equal(2))
connectionLimit, ok := vhostLimitsValues["max-connections"]
Expect(ok).To(BeTrue())
Expect(connectionLimit).To(Equal(int(connections)))
queueLimit, ok := vhostLimitsValues["max-queues"]
Expect(ok).To(BeTrue())
Expect(queueLimit).To(Equal(int(queues)))
})
})

When("vhost limits are not provided", func() {
BeforeEach(func() {
vhostName = "vhost-without-limits"
})

It("does not set vhost limits", func() {
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(Equal(0))
})
})
})
})
21 changes: 21 additions & 0 deletions docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,24 @@ More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-
|===


[id="{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-vhostlimits"]
==== VhostLimits

VhostLimits defines limits to be applied to the vhost.

.Appears In:
****
- xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-vhostspec[$$VhostSpec$$]
****

[cols="25a,75a", options="header"]
|===
| Field | Description
| *`connections`* __integer__ |
| *`queues`* __integer__ |
|===


[id="{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-vhostlist"]
==== VhostList

Expand Down Expand Up @@ -1585,6 +1603,9 @@ Supported in RabbitMQ 3.11.12 or above.
Required property.
| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted.
Can be set to 'delete' or 'retain'. Default is 'delete'.
| *`limits`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-vhostlimits[$$VhostLimits$$]__ | Limits defines limits to be applied to the vhost.
Supported limits include max-connections and max-queues.
See https://www.rabbitmq.com/docs/vhosts#limits
|===


Expand Down
13 changes: 13 additions & 0 deletions internal/vhost_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,16 @@ func GenerateVhostSettings(v *topology.Vhost) *rabbithole.VhostSettings {
DefaultQueueType: v.Spec.DefaultQueueType,
}
}

func GenerateVhostLimits(limits *topology.VhostLimits) rabbithole.VhostLimitsValues {
vhostLimitsValues := rabbithole.VhostLimitsValues{}
if limits != nil {
if limits.Connections != nil {
vhostLimitsValues["max-connections"] = int(*limits.Connections)
}
if limits.Queues != nil {
vhostLimitsValues["max-queues"] = int(*limits.Queues)
}
}
return vhostLimitsValues
}
Loading
Loading