Skip to content

Commit d4eb0b1

Browse files
authored
Add vhost limits. (#977)
* Add vhost limits. * Remove outdated vhost limits.
1 parent 86a012a commit d4eb0b1

File tree

12 files changed

+748
-2
lines changed

12 files changed

+748
-2
lines changed

api/v1beta1/vhost_types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ type VhostSpec struct {
3434
// +kubebuilder:validation:Enum=delete;retain
3535
// +kubebuilder:default:=delete
3636
DeletionPolicy string `json:"deletionPolicy,omitempty"`
37+
// Limits defines limits to be applied to the vhost.
38+
// Supported limits include max-connections and max-queues.
39+
// See https://www.rabbitmq.com/docs/vhosts#limits
40+
VhostLimits *VhostLimits `json:"limits,omitempty"`
3741
}
3842

3943
// VhostStatus defines the observed state of Vhost
@@ -67,6 +71,12 @@ type VhostList struct {
6771
Items []Vhost `json:"items"`
6872
}
6973

74+
// VhostLimits defines limits to be applied to the vhost.
75+
type VhostLimits struct {
76+
Connections *int32 `json:"connections,omitempty"`
77+
Queues *int32 `json:"queues,omitempty"`
78+
}
79+
7080
func (v *Vhost) GroupResource() schema.GroupResource {
7181
return schema.GroupResource{
7282
Group: v.GroupVersionKind().Group,

api/v1beta1/vhost_types_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,109 @@ var _ = Describe("Vhost", func() {
101101
}))
102102
})
103103

104+
Context("Vhost Limits", func() {
105+
var connections, queues int32
106+
107+
When("vhost limits are configured", func() {
108+
It("creates a vhost with limits", func() {
109+
connections = 1000
110+
queues = 500
111+
vhost := Vhost{
112+
ObjectMeta: metav1.ObjectMeta{
113+
Name: "vhost-with-limits",
114+
Namespace: namespace,
115+
},
116+
Spec: VhostSpec{
117+
Name: "vhost-with-limits",
118+
VhostLimits: &VhostLimits{
119+
Connections: &connections,
120+
Queues: &queues,
121+
},
122+
RabbitmqClusterReference: RabbitmqClusterReference{
123+
Name: "random-cluster",
124+
},
125+
},
126+
}
127+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
128+
fetched := &Vhost{}
129+
Expect(k8sClient.Get(ctx, types.NamespacedName{
130+
Name: vhost.Name,
131+
Namespace: vhost.Namespace,
132+
}, fetched)).To(Succeed())
133+
134+
Expect(*fetched.Spec.VhostLimits.Connections).To(Equal(connections))
135+
Expect(*fetched.Spec.VhostLimits.Queues).To(Equal(queues))
136+
Expect(fetched.Spec.Name).To(Equal("vhost-with-limits"))
137+
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
138+
Name: "random-cluster",
139+
}))
140+
})
141+
})
142+
143+
When("No vhost limits are provided", func() {
144+
It("Does not set VhostLimits", func() {
145+
vhost := Vhost{
146+
ObjectMeta: metav1.ObjectMeta{
147+
Name: "vhost-with-no-provided-limits",
148+
Namespace: namespace,
149+
},
150+
Spec: VhostSpec{
151+
Name: "vhost-with-no-provided-limits",
152+
RabbitmqClusterReference: RabbitmqClusterReference{
153+
Name: "random-cluster",
154+
},
155+
},
156+
}
157+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
158+
fetched := &Vhost{}
159+
Expect(k8sClient.Get(ctx, types.NamespacedName{
160+
Name: vhost.Name,
161+
Namespace: vhost.Namespace,
162+
}, fetched)).To(Succeed())
163+
164+
Expect(fetched.Spec.VhostLimits).To(BeNil())
165+
Expect(fetched.Spec.Name).To(Equal("vhost-with-no-provided-limits"))
166+
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
167+
Name: "random-cluster",
168+
}))
169+
})
170+
})
171+
172+
When("Only some vhost limits are provided", func() {
173+
It("Configures those limits and lifts other limits", func() {
174+
queues = 800
175+
vhost := Vhost{
176+
ObjectMeta: metav1.ObjectMeta{
177+
Name: "vhost-some-limits",
178+
Namespace: namespace,
179+
},
180+
Spec: VhostSpec{
181+
Name: "vhost-some-limits",
182+
VhostLimits: &VhostLimits{
183+
Queues: &queues,
184+
},
185+
RabbitmqClusterReference: RabbitmqClusterReference{
186+
Name: "random-cluster",
187+
},
188+
},
189+
}
190+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
191+
fetched := &Vhost{}
192+
Expect(k8sClient.Get(ctx, types.NamespacedName{
193+
Name: vhost.Name,
194+
Namespace: vhost.Namespace,
195+
}, fetched)).To(Succeed())
196+
197+
Expect(fetched.Spec.VhostLimits.Connections).To(BeNil())
198+
Expect(*fetched.Spec.VhostLimits.Queues).To(Equal(queues))
199+
Expect(fetched.Spec.Name).To(Equal("vhost-some-limits"))
200+
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
201+
Name: "random-cluster",
202+
}))
203+
})
204+
})
205+
})
206+
104207
Context("Default queue types", func() {
105208
var qTypeVhost = &Vhost{
106209
ObjectMeta: metav1.ObjectMeta{

api/v1beta1/zz_generated.deepcopy.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/rabbitmq.com_vhosts.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ spec:
5959
- delete
6060
- retain
6161
type: string
62+
limits:
63+
description: |-
64+
Limits defines limits to be applied to the vhost.
65+
Supported limits include max-connections and max-queues.
66+
See https://www.rabbitmq.com/docs/vhosts#limits
67+
properties:
68+
connections:
69+
format: int32
70+
type: integer
71+
queues:
72+
format: int32
73+
type: integer
74+
type: object
6275
name:
6376
description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
6477
type: string

controllers/vhost_controller.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66

7+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
78
"github.com/rabbitmq/messaging-topology-operator/internal"
89
"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient"
910
ctrl "sigs.k8s.io/controller-runtime"
@@ -20,9 +21,35 @@ type VhostReconciler struct {
2021
client.Client
2122
}
2223

23-
func (r *VhostReconciler) DeclareFunc(_ context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
24+
func (r *VhostReconciler) DeclareFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
25+
logger := ctrl.LoggerFrom(ctx)
2426
vhost := obj.(*topology.Vhost)
25-
return validateResponse(client.PutVhost(vhost.Spec.Name, *internal.GenerateVhostSettings(vhost)))
27+
settings := internal.GenerateVhostSettings(vhost)
28+
logger.Info("generated vhost settings", "vhost", vhost.Spec.Name, "settings", settings)
29+
err := validateResponse(client.PutVhost(vhost.Spec.Name, *settings))
30+
if err != nil {
31+
return err
32+
}
33+
34+
newVhostLimits := internal.GenerateVhostLimits(vhost.Spec.VhostLimits)
35+
logger.Info("getting existing vhost limits", vhost, vhost.Spec.Name)
36+
existingVhostLimits, err := r.getVhostLimits(client, vhost.Spec.Name)
37+
if err != nil {
38+
return err
39+
}
40+
limitsToDelete := r.vhostLimitsToDelete(existingVhostLimits, newVhostLimits)
41+
if len(limitsToDelete) > 0 {
42+
logger.Info("Deleting outdated vhost limits", "vhost", vhost.Spec.Name, "limits", limitsToDelete)
43+
err = validateResponseForDeletion(client.DeleteVhostLimits(vhost.Spec.Name, limitsToDelete))
44+
if err != nil && !errors.Is(err, NotFound) {
45+
return err
46+
}
47+
}
48+
if len(newVhostLimits) > 0 {
49+
logger.Info("creating new vhost limits", "vhost", vhost.Spec.Name, "limits", newVhostLimits)
50+
return validateResponse(client.PutVhostLimits(vhost.Spec.Name, newVhostLimits))
51+
}
52+
return nil
2653
}
2754

2855
// DeleteFunc deletes vhost from server
@@ -41,3 +68,28 @@ func (r *VhostReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.
4168
}
4269
return nil
4370
}
71+
72+
func (r *VhostReconciler) vhostLimitsToDelete(existingVhostLimits, newVhostLimits rabbithole.VhostLimitsValues) (limitsToDelete rabbithole.VhostLimits) {
73+
vhostLimitKeys := []string{"max-connections", "max-queues"}
74+
for _, limit := range vhostLimitKeys {
75+
_, oldExists := existingVhostLimits[limit]
76+
_, newExists := newVhostLimits[limit]
77+
if oldExists && !newExists {
78+
limitsToDelete = append(limitsToDelete, limit)
79+
}
80+
}
81+
return limitsToDelete
82+
}
83+
84+
func (r *VhostReconciler) getVhostLimits(client rabbitmqclient.Client, vhost string) (rabbithole.VhostLimitsValues, error) {
85+
vhostLimitsInfo, err := client.GetVhostLimits(vhost)
86+
if errors.Is(err, error(rabbithole404)) {
87+
return rabbithole.VhostLimitsValues{}, nil
88+
} else if err != nil {
89+
return rabbithole.VhostLimitsValues{}, err
90+
}
91+
if len(vhostLimitsInfo) == 0 {
92+
return rabbithole.VhostLimitsValues{}, nil
93+
}
94+
return vhostLimitsInfo[0].Value, nil
95+
}

0 commit comments

Comments
 (0)