Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/v1beta1/user_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type UserSpec struct {
//
// Note that this import only occurs at creation time, and is ignored once a password has been set on a User.
ImportCredentialsSecret *corev1.LocalObjectReference `json:"importCredentialsSecret,omitempty"`
// Limits to apply to a user to restrict the number of connections and channels
// the user can create. These limits can be used as guard rails in environments
// where applications cannot be trusted and monitored in detail, for example,
// when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits.
UserLimits *UserLimits `json:"limits,omitempty"`
}

// UserStatus defines the observed state of User.
Expand All @@ -56,6 +61,17 @@ type UserStatus struct {
// +kubebuilder:validation:Enum=management;policymaker;monitoring;administrator
type UserTag string

// Limits to apply to a user to restrict the number of connections and channels
// the user can create. These limits can be used as guard rails in environments
// where applications cannot be trusted and monitored in detail, for example,
// when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits.
type UserLimits struct {
// Limits how many connections the user can open.
Connections *int32 `json:"connections,omitempty"`
// Limits how many AMQP 0.9.1 channels the user can open.
Channels *int32 `json:"channels,omitempty"`
}

// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:categories=rabbitmq
Expand Down
47 changes: 47 additions & 0 deletions api/v1beta1/user_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,51 @@ var _ = Describe("user spec", func() {
})
})

When("creating a user with limits", func() {
var user User
var username string
var userLimits UserLimits
var connections, channels int32

JustBeforeEach(func() {
user = User{
ObjectMeta: metav1.ObjectMeta{
Name: username,
Namespace: namespace,
},
Spec: UserSpec{
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UserLimits: &userLimits,
},
}
})

When("creating a user with valid limits", func() {
BeforeEach(func() {
username = "limits-user"
connections = 5
channels = 10
userLimits = UserLimits{
Connections: &connections,
Channels: &channels,
}
})
It("successfully creates the user", func() {
Expect(k8sClient.Create(ctx, &user)).To(Succeed())
fetchedUser := &User{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: user.Name,
Namespace: user.Namespace,
}, fetchedUser)).To(Succeed())
Expect(fetchedUser.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "some-cluster",
}))
Expect(fetchedUser.Spec.UserLimits).NotTo(BeNil())
Expect(*fetchedUser.Spec.UserLimits.Connections).To(Equal(connections))
Expect(*fetchedUser.Spec.UserLimits.Channels).To(Equal(channels))
})
})
})
})
10 changes: 10 additions & 0 deletions api/v1beta1/vhost_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type VhostSpec struct {
// +kubebuilder:validation:Enum=delete;retain
// +kubebuilder:default:=delete
DeletionPolicy string `json:"deletionPolicy,omitempty"`
// Limits defines limits to be applied to the vhost.
// Supported limits include max-connections and max-queues.
// See https://www.rabbitmq.com/docs/vhosts#limits
VhostLimits *VhostLimits `json:"limits,omitempty"`
}

// VhostStatus defines the observed state of Vhost
Expand Down Expand Up @@ -67,6 +71,12 @@ type VhostList struct {
Items []Vhost `json:"items"`
}

// VhostLimits defines limits to be applied to the vhost.
type VhostLimits struct {
Connections *int32 `json:"connections,omitempty"`
Queues *int32 `json:"queues,omitempty"`
}

func (v *Vhost) GroupResource() schema.GroupResource {
return schema.GroupResource{
Group: v.GroupVersionKind().Group,
Expand Down
103 changes: 103 additions & 0 deletions api/v1beta1/vhost_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,109 @@ var _ = Describe("Vhost", func() {
}))
})

Context("Vhost Limits", func() {
var connections, queues int32

When("vhost limits are configured", func() {
It("creates a vhost with limits", func() {
connections = 1000
queues = 500
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-with-limits",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-with-limits",
VhostLimits: &VhostLimits{
Connections: &connections,
Queues: &queues,
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(*fetched.Spec.VhostLimits.Connections).To(Equal(connections))
Expect(*fetched.Spec.VhostLimits.Queues).To(Equal(queues))
Expect(fetched.Spec.Name).To(Equal("vhost-with-limits"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})
})

When("No vhost limits are provided", func() {
It("Does not set VhostLimits", func() {
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-with-no-provided-limits",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-with-no-provided-limits",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.VhostLimits).To(BeNil())
Expect(fetched.Spec.Name).To(Equal("vhost-with-no-provided-limits"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})
})

When("Only some vhost limits are provided", func() {
It("Configures those limits and lifts other limits", func() {
queues = 800
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-some-limits",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-some-limits",
VhostLimits: &VhostLimits{
Queues: &queues,
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.VhostLimits.Connections).To(BeNil())
Expect(*fetched.Spec.VhostLimits.Queues).To(Equal(queues))
Expect(fetched.Spec.Name).To(Equal("vhost-some-limits"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "random-cluster",
}))
})
})
})

Context("Default queue types", func() {
var qTypeVhost = &Vhost{
ObjectMeta: metav1.ObjectMeta{
Expand Down
60 changes: 60 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions config/crd/bases/rabbitmq.com_users.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ spec:
type: string
type: object
x-kubernetes-map-type: atomic
limits:
description: |-
Limits to apply to a user to restrict the number of connections and channels
the user can create. These limits can be used as guard rails in environments
where applications cannot be trusted and monitored in detail, for example,
when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits.
properties:
channels:
description: Limits how many AMQP 0.9.1 channels the user can
open.
format: int32
type: integer
connections:
description: Limits how many connections the user can open.
format: int32
type: integer
type: object
rabbitmqClusterReference:
description: |-
Reference to the RabbitmqCluster that the user will be created for. This cluster must
Expand Down
13 changes: 13 additions & 0 deletions config/crd/bases/rabbitmq.com_vhosts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ spec:
- delete
- retain
type: string
limits:
description: |-
Limits defines limits to be applied to the vhost.
Supported limits include max-connections and max-queues.
See https://www.rabbitmq.com/docs/vhosts#limits
properties:
connections:
format: int32
type: integer
queues:
format: int32
type: integer
type: object
name:
description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
type: string
Expand Down
4 changes: 4 additions & 0 deletions controllers/binding_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ var _ = Describe("bindingController", func() {
})

When("creating a binding", func() {
AfterEach(func() {
Expect(k8sClient.Delete(ctx, &binding)).To(Succeed())
})

When("the RabbitMQ Client returns a HTTP error response", func() {
BeforeEach(func() {
bindingName = "test-binding-http-error"
Expand Down
10 changes: 10 additions & 0 deletions controllers/exchange_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ var _ = Describe("exchange-controller", func() {
})

Context("creation", func() {
AfterEach(func() {
Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed())
})

When("the RabbitMQ Client returns a HTTP error response", func() {
BeforeEach(func() {
exchangeName = "test-http-error"
Expand Down Expand Up @@ -150,6 +154,7 @@ var _ = Describe("exchange-controller", func() {
})
})
})

Context("LastTransitionTime", func() {
BeforeEach(func() {
exchangeName = "test-last-transition-time"
Expand All @@ -158,6 +163,11 @@ var _ = Describe("exchange-controller", func() {
StatusCode: http.StatusCreated,
}, nil)
})

AfterEach(func() {
Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed())
})

It("changes only if status changes", func() {
By("setting LastTransitionTime when transitioning to status Ready=true")
Expect(k8sClient.Create(ctx, &exchange)).To(Succeed())
Expand Down
Loading
Loading