Skip to content

Commit 094c920

Browse files
authored
Expose MQTT and STOMP endpoints (#292)
* Expose MQTT and STOMP ports when these plugins are enabled Resolves #271
1 parent 3e52b74 commit 094c920

File tree

14 files changed

+294
-134
lines changed

14 files changed

+294
-134
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ list: ## list Makefile targets
1414
unit-tests: generate fmt vet manifests ## Run unit tests
1515
ginkgo -r api/ internal/
1616

17-
1817
integration-tests: generate fmt vet manifests ## Run integration tests
1918
ginkgo -r controllers/
2019

api/v1beta1/rabbitmqcluster_types.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,10 @@ type TLSSpec struct {
233233
SecretName string `json:"secretName,omitempty"`
234234
// Name of a Secret in the same Namespace as the RabbitmqCluster, containing the Certificate Authority's public certificate for TLS.
235235
// This can be the same as SecretName.
236+
// Used for mTLS.
236237
CaSecretName string `json:"caSecretName,omitempty"`
237238
// The Secret defined in CaSecretName must store the Certificate Authority's public certificate under the key specified in CaCertName.
239+
// Used for mTLS.
238240
CaCertName string `json:"caCertName,omitempty"`
239241
}
240242

@@ -314,6 +316,15 @@ func (cluster *RabbitmqCluster) SingleTLSSecret() bool {
314316
return cluster.MutualTLSEnabled() && cluster.Spec.TLS.CaSecretName == cluster.Spec.TLS.SecretName
315317
}
316318

319+
func (cluster *RabbitmqCluster) AdditionalPluginEnabled(plugin Plugin) bool {
320+
for _, p := range cluster.Spec.Rabbitmq.AdditionalPlugins {
321+
if p == plugin {
322+
return true
323+
}
324+
}
325+
return false
326+
}
327+
317328
func (rmqStatus *RabbitmqClusterStatus) SetConditions(resources []runtime.Object) {
318329
var oldAllPodsReadyCondition *status.RabbitmqClusterCondition
319330
var oldClusterAvailableCondition *status.RabbitmqClusterCondition

config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8254,12 +8254,12 @@ spec:
82548254
caCertName:
82558255
description: The Secret defined in CaSecretName must store the
82568256
Certificate Authority's public certificate under the key specified
8257-
in CaCertName.
8257+
in CaCertName. Used for mTLS.
82588258
type: string
82598259
caSecretName:
82608260
description: Name of a Secret in the same Namespace as the RabbitmqCluster,
82618261
containing the Certificate Authority's public certificate for
8262-
TLS. This can be the same as SecretName.
8262+
TLS. This can be the same as SecretName. Used for mTLS.
82638263
type: string
82648264
secretName:
82658265
description: Name of a Secret in the same Namespace as the RabbitmqCluster,

controllers/rabbitmqcluster_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (r *RabbitmqClusterReconciler) allReplicasReady(ctx context.Context, rmq *r
355355
// enablePlugins - helper function to set the list of enabled plugins in a given RabbitmqCluster pods
356356
// `rabbitmq-plugins set` disables plugins that are not in the provided list
357357
func (r *RabbitmqClusterReconciler) enablePlugins(rmq *rabbitmqv1beta1.RabbitmqCluster) error {
358-
plugins := resource.NewRabbitMQPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
358+
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
359359
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
360360
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
361361
rabbitCommand := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ require (
88
github.com/Azure/go-autorest/autorest/adal v0.8.0 // indirect
99
github.com/cespare/xxhash/v2 v2.1.1 // indirect
1010
github.com/cloudflare/cfssl v1.4.1
11+
github.com/eclipse/paho.mqtt.golang v1.2.0
1112
github.com/go-logr/logr v0.1.0
1213
github.com/go-logr/zapr v0.1.1 // indirect
14+
github.com/go-stomp/stomp v2.0.6+incompatible
1315
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
1416
github.com/gophercloud/gophercloud v0.5.0 // indirect
1517
github.com/onsi/ginkgo v1.14.0
1618
github.com/onsi/gomega v1.10.1
1719
github.com/prometheus/client_golang v1.2.1 // indirect
20+
github.com/smartystreets/goconvey v1.6.4 // indirect
1821
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
1922
go.uber.org/multierr v1.2.0 // indirect
2023
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
@@ -26,5 +29,4 @@ require (
2629
k8s.io/apimachinery v0.18.6
2730
k8s.io/client-go v0.18.6
2831
sigs.k8s.io/controller-runtime v0.6.2
29-
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e // indirect
3032
)

go.sum

Lines changed: 14 additions & 67 deletions
Large diffs are not rendered by default.

internal/resource/client_service.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (builder *ClientServiceBuilder) Update(object runtime.Object) error {
5555
service.Spec.Type = corev1.ServiceType(builder.Instance.Spec.Service.Type)
5656
service.Spec.Selector = metadata.LabelSelector(builder.Instance.Name)
5757

58-
service.Spec.Ports = updatePorts(service.Spec.Ports, builder.Instance.TLSEnabled())
58+
service.Spec.Ports = builder.updatePorts(service.Spec.Ports)
5959

6060
if builder.Instance.Spec.Service.Type == "ClusterIP" || builder.Instance.Spec.Service.Type == "" {
6161
for i := range service.Spec.Ports {
@@ -108,20 +108,48 @@ func applySvcOverride(svc *corev1.Service, override *rabbitmqv1beta1.ClientServi
108108
return nil
109109
}
110110

111-
func updatePorts(servicePorts []corev1.ServicePort, tlsEnabled bool) []corev1.ServicePort {
111+
func (builder *ClientServiceBuilder) updatePorts(servicePorts []corev1.ServicePort) []corev1.ServicePort {
112112
servicePortsMap := map[string]corev1.ServicePort{
113-
"amqp": corev1.ServicePort{
113+
"amqp": {
114114
Protocol: corev1.ProtocolTCP,
115115
Port: 5672,
116116
Name: "amqp",
117117
},
118-
"management": corev1.ServicePort{
118+
"management": {
119119
Protocol: corev1.ProtocolTCP,
120120
Port: 15672,
121121
Name: "management",
122122
},
123123
}
124-
if tlsEnabled {
124+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
125+
servicePortsMap["mqtt"] = corev1.ServicePort{
126+
Protocol: corev1.ProtocolTCP,
127+
Port: 1883,
128+
Name: "mqtt",
129+
}
130+
}
131+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
132+
servicePortsMap["web-mqtt"] = corev1.ServicePort{
133+
Protocol: corev1.ProtocolTCP,
134+
Port: 15675,
135+
Name: "web-mqtt",
136+
}
137+
}
138+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stomp") {
139+
servicePortsMap["stomp"] = corev1.ServicePort{
140+
Protocol: corev1.ProtocolTCP,
141+
Port: 61613,
142+
Name: "stomp",
143+
}
144+
}
145+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
146+
servicePortsMap["web-stomp"] = corev1.ServicePort{
147+
Protocol: corev1.ProtocolTCP,
148+
Port: 15674,
149+
Name: "web-stomp",
150+
}
151+
}
152+
if builder.Instance.TLSEnabled() {
125153
servicePortsMap["amqps"] = corev1.ServicePort{
126154
Protocol: corev1.ProtocolTCP,
127155
Port: 5671,

internal/resource/client_service_test.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package resource_test
1111

1212
import (
1313
. "github.com/onsi/ginkgo"
14+
. "github.com/onsi/ginkgo/extensions/table"
1415
. "github.com/onsi/gomega"
1516
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
1617
"github.com/rabbitmq/cluster-operator/internal/resource"
@@ -28,7 +29,7 @@ var _ = Context("ClientServices", func() {
2829
scheme *runtime.Scheme
2930
)
3031

31-
Context("Build", func() {
32+
Describe("Build", func() {
3233
BeforeEach(func() {
3334
scheme = runtime.NewScheme()
3435
Expect(rabbitmqv1beta1.AddToScheme(scheme)).To(Succeed())
@@ -57,7 +58,7 @@ var _ = Context("ClientServices", func() {
5758
})
5859
})
5960

60-
Context("Update", func() {
61+
Describe("Update", func() {
6162
BeforeEach(func() {
6263
scheme = runtime.NewScheme()
6364
Expect(rabbitmqv1beta1.AddToScheme(scheme)).To(Succeed())
@@ -299,9 +300,27 @@ var _ = Context("ClientServices", func() {
299300
Port: 15672,
300301
Protocol: corev1.ProtocolTCP,
301302
}
302-
Expect(svc.Spec.Ports).Should(ConsistOf(amqpPort, managementPort))
303+
Expect(svc.Spec.Ports).To(ConsistOf(amqpPort, managementPort))
303304
})
304305

306+
DescribeTable("plugins exposing ports",
307+
func(plugin, servicePortName string, port int) {
308+
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{rabbitmqv1beta1.Plugin(plugin)}
309+
Expect(serviceBuilder.Update(svc)).To(Succeed())
310+
311+
expectedPort := corev1.ServicePort{
312+
Name: servicePortName,
313+
Port: int32(port),
314+
Protocol: corev1.ProtocolTCP,
315+
}
316+
Expect(svc.Spec.Ports).To(ContainElement(expectedPort))
317+
},
318+
Entry("MQTT", "rabbitmq_mqtt", "mqtt", 1883),
319+
Entry("MQTT-over-WebSockets", "rabbitmq_web_mqtt", "web-mqtt", 15675),
320+
Entry("STOMP", "rabbitmq_stomp", "stomp", 61613),
321+
Entry("STOMP-over-WebSockets", "rabbitmq_web_stomp", "web-stomp", 15674),
322+
)
323+
305324
It("updates the service type from ClusterIP to NodePort", func() {
306325
svc.Spec.Type = corev1.ServiceTypeClusterIP
307326
serviceBuilder.Instance.Spec.Service.Type = "NodePort"

internal/resource/rabbitmq_plugins.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var requiredPlugins = []string{
1818

1919
const pluginsConfig = "plugins-conf"
2020

21-
type RabbitMQPlugins struct {
21+
type RabbitmqPlugins struct {
2222
requiredPlugins []string
2323
additionalPlugins []string
2424
}
@@ -27,19 +27,19 @@ type RabbitmqPluginsConfigMapBuilder struct {
2727
Instance *rabbitmqv1beta1.RabbitmqCluster
2828
}
2929

30-
func NewRabbitMQPlugins(plugins []rabbitmqv1beta1.Plugin) RabbitMQPlugins {
30+
func NewRabbitmqPlugins(plugins []rabbitmqv1beta1.Plugin) RabbitmqPlugins {
3131
additionalPlugins := make([]string, len(plugins))
3232
for i := range additionalPlugins {
3333
additionalPlugins[i] = string(plugins[i])
3434
}
3535

36-
return RabbitMQPlugins{
36+
return RabbitmqPlugins{
3737
requiredPlugins: requiredPlugins,
3838
additionalPlugins: additionalPlugins,
3939
}
4040
}
4141

42-
func (r *RabbitMQPlugins) DesiredPlugins() []string {
42+
func (r *RabbitmqPlugins) DesiredPlugins() []string {
4343
allPlugins := append(r.requiredPlugins, r.additionalPlugins...)
4444

4545
check := make(map[string]bool)
@@ -53,7 +53,7 @@ func (r *RabbitMQPlugins) DesiredPlugins() []string {
5353
return enabledPlugins
5454
}
5555

56-
func (r *RabbitMQPlugins) AsString(sep string) string {
56+
func (r *RabbitmqPlugins) AsString(sep string) string {
5757
return strings.Join(r.DesiredPlugins(), sep)
5858
}
5959

@@ -92,6 +92,6 @@ func (builder *RabbitmqPluginsConfigMapBuilder) Build() (runtime.Object, error)
9292
}
9393

9494
func desiredPluginsAsString(additionalPlugins []rabbitmqv1beta1.Plugin) string {
95-
plugins := NewRabbitMQPlugins(additionalPlugins)
95+
plugins := NewRabbitmqPlugins(additionalPlugins)
9696
return "[" + plugins.AsString(",") + "]."
9797
}

internal/resource/rabbitmq_plugins_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ var _ = Describe("RabbitMQPlugins", func() {
2525
Context("DesiredPlugins", func() {
2626
When("AdditionalPlugins is empty", func() {
2727
It("returns list of required plugins", func() {
28-
plugins := NewRabbitMQPlugins(nil)
28+
plugins := NewRabbitmqPlugins(nil)
2929
Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_peer_discovery_k8s", "rabbitmq_prometheus", "rabbitmq_management"}))
3030
})
3131
})
3232

3333
When("AdditionalPlugins are provided", func() {
3434
It("returns a concatenated list of plugins", func() {
3535
morePlugins := []rabbitmqv1beta1.Plugin{"rabbitmq_shovel", "my_great_plugin"}
36-
plugins := NewRabbitMQPlugins(morePlugins)
36+
plugins := NewRabbitmqPlugins(morePlugins)
3737

3838
Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_peer_discovery_k8s",
3939
"rabbitmq_prometheus",
@@ -47,7 +47,7 @@ var _ = Describe("RabbitMQPlugins", func() {
4747
When("AdditionalPlugins are provided with duplicates", func() {
4848
It("returns a unique list of plugins", func() {
4949
morePlugins := []rabbitmqv1beta1.Plugin{"rabbitmq_management", "rabbitmq_shovel", "my_great_plugin", "rabbitmq_shovel"}
50-
plugins := NewRabbitMQPlugins(morePlugins)
50+
plugins := NewRabbitmqPlugins(morePlugins)
5151

5252
Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_peer_discovery_k8s",
5353
"rabbitmq_prometheus",

0 commit comments

Comments
 (0)