Skip to content

Commit 6780eb0

Browse files
committed
Remove outdated vhost limits.
1 parent 918bfb3 commit 6780eb0

File tree

4 files changed

+261
-70
lines changed

4 files changed

+261
-70
lines changed

controllers/vhost_controller.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66

7+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
78
"github.com/rabbitmq/messaging-topology-operator/internal"
89
"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient"
910
ctrl "sigs.k8s.io/controller-runtime"
@@ -30,12 +31,25 @@ func (r *VhostReconciler) DeclareFunc(ctx context.Context, client rabbitmqclient
3031
return err
3132
}
3233

33-
vhostLimits := internal.GenerateVhostLimits(vhost.Spec.VhostLimits)
34-
logger.Info("generated vhost limits", "vhost", vhost.Spec.Name, "limits", vhostLimits)
35-
if len(vhostLimits) > 0 {
36-
err = validateResponse(client.PutVhostLimits(vhost.Spec.Name, vhostLimits))
34+
newVhostLimits := internal.GenerateVhostLimits(vhost.Spec.VhostLimits)
35+
logger.Info("getting existing vhost limits", vhost, vhost.Spec.Name)
36+
existingVhostLimits, err := r.getVhostLimits(client, vhost.Spec.Name)
37+
if err != nil {
38+
return err
39+
}
40+
limitsToDelete := r.vhostLimitsToDelete(existingVhostLimits, newVhostLimits)
41+
if len(limitsToDelete) > 0 {
42+
logger.Info("Deleting outdated vhost limits", "vhost", vhost.Spec.Name, "limits", limitsToDelete)
43+
err = validateResponseForDeletion(client.DeleteVhostLimits(vhost.Spec.Name, limitsToDelete))
44+
if err != nil && !errors.Is(err, NotFound) {
45+
return err
46+
}
47+
}
48+
if len(newVhostLimits) > 0 {
49+
logger.Info("creating new vhost limits", "vhost", vhost.Spec.Name, "limits", newVhostLimits)
50+
return validateResponse(client.PutVhostLimits(vhost.Spec.Name, newVhostLimits))
3751
}
38-
return err
52+
return nil
3953
}
4054

4155
// DeleteFunc deletes vhost from server
@@ -54,3 +68,28 @@ func (r *VhostReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.
5468
}
5569
return nil
5670
}
71+
72+
func (r *VhostReconciler) vhostLimitsToDelete(existingVhostLimits, newVhostLimits rabbithole.VhostLimitsValues) (limitsToDelete rabbithole.VhostLimits) {
73+
vhostLimitKeys := []string{"max-connections", "max-queues"}
74+
for _, limit := range vhostLimitKeys {
75+
_, oldExists := existingVhostLimits[limit]
76+
_, newExists := newVhostLimits[limit]
77+
if oldExists && !newExists {
78+
limitsToDelete = append(limitsToDelete, limit)
79+
}
80+
}
81+
return limitsToDelete
82+
}
83+
84+
func (r *VhostReconciler) getVhostLimits(client rabbitmqclient.Client, vhost string) (rabbithole.VhostLimitsValues, error) {
85+
vhostLimitsInfo, err := client.GetVhostLimits(vhost)
86+
if errors.Is(err, error(rabbithole404)) {
87+
return rabbithole.VhostLimitsValues{}, nil
88+
} else if err != nil {
89+
return rabbithole.VhostLimitsValues{}, err
90+
}
91+
if len(vhostLimitsInfo) == 0 {
92+
return rabbithole.VhostLimitsValues{}, nil
93+
}
94+
return vhostLimitsInfo[0].Value, nil
95+
}

controllers/vhost_controller_test.go

Lines changed: 137 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
1515
"time"
1616

17+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
1718
. "github.com/onsi/ginkgo/v2"
1819
. "github.com/onsi/gomega"
1920
. "github.com/onsi/gomega/gstruct"
@@ -88,6 +89,7 @@ var _ = Describe("vhost-controller", func() {
8889
Namespace: vhostNamespace,
8990
},
9091
Spec: topology.VhostSpec{
92+
Name: vhostName,
9193
RabbitmqClusterReference: topology.RabbitmqClusterReference{
9294
Name: "example-rabbit",
9395
},
@@ -159,6 +161,141 @@ var _ = Describe("vhost-controller", func() {
159161
})))
160162
})
161163
})
164+
165+
Context("vhost limits", func() {
166+
var connections, queues int32
167+
168+
When("vhost limits are provided", func() {
169+
BeforeEach(func() {
170+
connections = 708
171+
queues = 509
172+
vhostName = "vhost-with-limits"
173+
vhostLimits = &topology.VhostLimits{
174+
Connections: &connections,
175+
Queues: &queues,
176+
}
177+
fakeRabbitMQClient.PutVhostReturns(&http.Response{
178+
Status: "201 Created",
179+
StatusCode: http.StatusCreated,
180+
}, nil)
181+
fakeRabbitMQClient.PutVhostLimitsReturns(&http.Response{
182+
Status: "200 OK",
183+
StatusCode: http.StatusOK,
184+
}, nil)
185+
fakeRabbitMQClient.GetVhostLimitsReturns(nil, rabbithole.ErrorResponse{
186+
StatusCode: 404,
187+
Message: "Object Not Found",
188+
Reason: "Not Found",
189+
})
190+
})
191+
192+
It("puts the vhost limits", func() {
193+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
194+
Eventually(func() []topology.Condition {
195+
_ = k8sClient.Get(
196+
ctx,
197+
types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace},
198+
&vhost,
199+
)
200+
201+
return vhost.Status.Conditions
202+
}).
203+
Within(statusEventsUpdateTimeout).
204+
WithPolling(time.Second).
205+
Should(ContainElement(MatchFields(IgnoreExtras, Fields{
206+
"Type": Equal(topology.ConditionType("Ready")),
207+
"Reason": Equal("SuccessfulCreateOrUpdate"),
208+
"Status": Equal(corev1.ConditionTrue),
209+
})))
210+
211+
Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(BeNumerically(">", 0))
212+
_, vhostLimitsValues := fakeRabbitMQClient.PutVhostLimitsArgsForCall(0)
213+
Expect(len(vhostLimitsValues)).To(Equal(2))
214+
Expect(vhostLimitsValues).To(HaveKeyWithValue("max-connections", int(connections)))
215+
Expect(vhostLimitsValues).To(HaveKeyWithValue("max-queues", int(queues)))
216+
})
217+
})
218+
219+
When("vhost limits are not provided", func() {
220+
BeforeEach(func() {
221+
vhostName = "vhost-without-limits"
222+
fakeRabbitMQClient.GetVhostLimitsReturns(nil, rabbithole.ErrorResponse{
223+
StatusCode: 404,
224+
Message: "Object Not Found",
225+
Reason: "Not Found",
226+
})
227+
})
228+
229+
It("does not set vhost limits", func() {
230+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
231+
Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(Equal(0))
232+
})
233+
})
234+
235+
When("vhost limits are updated", func() {
236+
BeforeEach(func() {
237+
vhostName = "vhost-updated-limits"
238+
queues = 613
239+
vhostLimits = &topology.VhostLimits{
240+
Connections: nil,
241+
Queues: &queues,
242+
}
243+
244+
var vhostLimitsInfo []rabbithole.VhostLimitsInfo
245+
vhostLimitsInfo = append(vhostLimitsInfo, rabbithole.VhostLimitsInfo{
246+
Vhost: vhostName,
247+
Value: rabbithole.VhostLimitsValues{"max-queues": 10, "max-connections": 300},
248+
})
249+
250+
fakeRabbitMQClient.PutVhostReturns(&http.Response{
251+
Status: "201 Created",
252+
StatusCode: http.StatusCreated,
253+
}, nil)
254+
fakeRabbitMQClient.PutVhostLimitsReturns(&http.Response{
255+
Status: "201 Created",
256+
StatusCode: http.StatusCreated,
257+
}, nil)
258+
fakeRabbitMQClient.GetVhostLimitsReturns(vhostLimitsInfo, nil)
259+
fakeRabbitMQClient.DeleteVhostLimitsReturns(&http.Response{
260+
Status: "204 No Content",
261+
StatusCode: http.StatusNoContent,
262+
}, nil)
263+
})
264+
265+
It("updates the provided limits and removes unspecified limits", func() {
266+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
267+
Eventually(func() []topology.Condition {
268+
_ = k8sClient.Get(
269+
ctx,
270+
types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace},
271+
&vhost,
272+
)
273+
274+
return vhost.Status.Conditions
275+
}).
276+
Within(statusEventsUpdateTimeout).
277+
WithPolling(time.Second).
278+
Should(ContainElement(MatchFields(IgnoreExtras, Fields{
279+
"Type": Equal(topology.ConditionType("Ready")),
280+
"Reason": Equal("SuccessfulCreateOrUpdate"),
281+
"Status": Equal(corev1.ConditionTrue),
282+
})))
283+
284+
By("deleting the outdated limits")
285+
Expect(fakeRabbitMQClient.DeleteVhostLimitsCallCount()).To(BeNumerically(">", 0))
286+
vhostname, limits := fakeRabbitMQClient.DeleteVhostLimitsArgsForCall(0)
287+
Expect(vhostname).To(Equal(vhostName))
288+
Expect(len(limits)).To(Equal(1))
289+
Expect(limits).To(ContainElement("max-connections"))
290+
291+
By("updating the new limits")
292+
Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(BeNumerically(">", 0))
293+
_, vhostLimitsValues := fakeRabbitMQClient.PutVhostLimitsArgsForCall(0)
294+
Expect(len(vhostLimitsValues)).To(Equal(1))
295+
Expect(vhostLimitsValues).To(HaveKeyWithValue("max-queues", int(queues)))
296+
})
297+
})
298+
})
162299
})
163300

164301
Context("deletion", func() {
@@ -254,69 +391,4 @@ var _ = Describe("vhost-controller", func() {
254391
Expect(fakeRabbitMQClient.DeleteVhostCallCount()).To(Equal(0))
255392
})
256393
})
257-
258-
Context("vhost limits", func() {
259-
When("vhost limits are provided", func() {
260-
var connections, queues int32
261-
262-
BeforeEach(func() {
263-
connections = 708
264-
queues = 509
265-
vhostName = "vhost-with-limits"
266-
vhostLimits = &topology.VhostLimits{
267-
Connections: &connections,
268-
Queues: &queues,
269-
}
270-
fakeRabbitMQClient.PutVhostReturns(&http.Response{
271-
Status: "201 Created",
272-
StatusCode: http.StatusCreated,
273-
}, nil)
274-
fakeRabbitMQClient.PutVhostLimitsReturns(&http.Response{
275-
Status: "200 OK",
276-
StatusCode: http.StatusOK,
277-
}, nil)
278-
})
279-
280-
It("puts the vhost limits", func() {
281-
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
282-
Eventually(func() []topology.Condition {
283-
_ = k8sClient.Get(
284-
ctx,
285-
types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace},
286-
&vhost,
287-
)
288-
289-
return vhost.Status.Conditions
290-
}).
291-
Within(statusEventsUpdateTimeout).
292-
WithPolling(time.Second).
293-
Should(ContainElement(MatchFields(IgnoreExtras, Fields{
294-
"Type": Equal(topology.ConditionType("Ready")),
295-
"Reason": Equal("SuccessfulCreateOrUpdate"),
296-
"Status": Equal(corev1.ConditionTrue),
297-
})))
298-
299-
Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(BeNumerically(">", 0))
300-
_, vhostLimitsValues := fakeRabbitMQClient.PutVhostLimitsArgsForCall(0)
301-
Expect(len(vhostLimitsValues)).To(Equal(2))
302-
connectionLimit, ok := vhostLimitsValues["max-connections"]
303-
Expect(ok).To(BeTrue())
304-
Expect(connectionLimit).To(Equal(int(connections)))
305-
queueLimit, ok := vhostLimitsValues["max-queues"]
306-
Expect(ok).To(BeTrue())
307-
Expect(queueLimit).To(Equal(int(queues)))
308-
})
309-
})
310-
311-
When("vhost limits are not provided", func() {
312-
BeforeEach(func() {
313-
vhostName = "vhost-without-limits"
314-
})
315-
316-
It("does not set vhost limits", func() {
317-
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
318-
Expect(fakeRabbitMQClient.PutVhostLimitsCallCount()).To(Equal(0))
319-
})
320-
})
321-
})
322394
})

rabbitmqclient/rabbitmq_client_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Client interface {
4141
PutVhost(string, rabbithole.VhostSettings) (*http.Response, error)
4242
DeleteVhost(string) (*http.Response, error)
4343
PutVhostLimits(string, rabbithole.VhostLimitsValues) (*http.Response, error)
44+
GetVhostLimits(string) ([]rabbithole.VhostLimitsInfo, error)
4445
DeleteVhostLimits(string, rabbithole.VhostLimits) (*http.Response, error)
4546
PutGlobalParameter(name string, value interface{}) (*http.Response, error)
4647
DeleteGlobalParameter(name string) (*http.Response, error)

0 commit comments

Comments
 (0)