Skip to content

Commit f786a0f

Browse files
Extend FederationSpec with QueueType and ResourceCleanupMode
1 parent baa57a2 commit f786a0f

39 files changed

+128
-53
lines changed

api/v1beta1/federation_types.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ type FederationSpec struct {
4040
// +kubebuilder:validation:Enum=delete;retain
4141
// +kubebuilder:default:=delete
4242
DeletionPolicy string `json:"deletionPolicy,omitempty"`
43+
// The queue type of the internal upstream queue used by exchange federation.
44+
// Defaults to classic (a single replica queue type). Set to quorum to use a replicated queue type.
45+
// Changing the queue type will delete and recreate the upstream queue by default.
46+
// This may lead to messages getting lost or not routed anywhere during the re-declaration.
47+
// To avoid that, set resource-cleanup-mode key to never.
48+
// This requires manually deleting the old upstream queue so that it can be recreated with the new type.
49+
// +kubebuilder:validation:Enum=classic;quorum
50+
QueueType string `json:"queueType,omitempty"`
51+
// Whether to delete the internal upstream queue when federation links stop.
52+
// By default, the internal upstream queue is deleted immediately when a federation link stops.
53+
// Set to never to keep the upstream queue around and collect messages even when changing federation configuration.
54+
// +kubebuilder:validation:Enum=default;never
55+
ResourceCleanupMode string `json:"resourceCleanupMode,omitempty"`
4356
}
4457

4558
// FederationStatus defines the observed state of Federation

api/v1beta1/federation_types_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,16 @@ var _ = Describe("Federation spec", func() {
6464
UriSecret: &corev1.LocalObjectReference{
6565
Name: "a-secret",
6666
},
67-
Expires: 1000,
68-
MessageTTL: 1000,
69-
MaxHops: 100,
70-
PrefetchCount: 50,
71-
ReconnectDelay: 10,
72-
TrustUserId: true,
73-
Exchange: "an-exchange",
74-
AckMode: "no-ack",
67+
Expires: 1000,
68+
MessageTTL: 1000,
69+
MaxHops: 100,
70+
PrefetchCount: 50,
71+
ReconnectDelay: 10,
72+
TrustUserId: true,
73+
Exchange: "an-exchange",
74+
AckMode: "no-ack",
75+
QueueType: "quorum",
76+
ResourceCleanupMode: "never",
7577
RabbitmqClusterReference: RabbitmqClusterReference{
7678
Name: "some-cluster",
7779
},
@@ -99,6 +101,8 @@ var _ = Describe("Federation spec", func() {
99101
Expect(fetched.Spec.MaxHops).To(Equal(100))
100102
Expect(fetched.Spec.PrefetchCount).To(Equal(50))
101103
Expect(fetched.Spec.ReconnectDelay).To(Equal(10))
104+
Expect(fetched.Spec.QueueType).To(Equal("quorum"))
105+
Expect(fetched.Spec.ResourceCleanupMode).To(Equal("never"))
102106
})
103107

104108
When("creating a federation with an invalid 'AckMode' value", func() {

config/crd/bases/rabbitmq.com_federations.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,18 @@ spec:
7373
type: integer
7474
queue:
7575
type: string
76+
queueType:
77+
description: |-
78+
The queue type of the internal upstream queue used by exchange federation.
79+
Defaults to classic (a single replica queue type). Set to quorum to use a replicated queue type.
80+
Changing the queue type will delete and recreate the upstream queue by default.
81+
This may lead to messages getting lost or not routed anywhere during the re-declaration.
82+
To avoid that, set resource-cleanup-mode key to never.
83+
This requires manually deleting the old upstream queue so that it can be recreated with the new type.
84+
enum:
85+
- classic
86+
- quorum
87+
type: string
7688
rabbitmqClusterReference:
7789
description: |-
7890
Reference to the RabbitmqCluster that this federation upstream will be created in.
@@ -108,6 +120,15 @@ spec:
108120
type: object
109121
reconnectDelay:
110122
type: integer
123+
resourceCleanupMode:
124+
description: |-
125+
Whether to delete the internal upstream queue when federation links stop.
126+
By default, the internal upstream queue is deleted immediately when a federation link stops.
127+
Set to never to keep the upstream queue around and collect messages even when changing federation configuration.
128+
enum:
129+
- default
130+
- never
131+
type: string
111132
trustUserId:
112133
type: boolean
113134
uriSecret:

controllers/binding_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"reflect"
1818

1919
"github.com/go-logr/logr"
20-
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
20+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
2121
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
2222
"github.com/rabbitmq/messaging-topology-operator/internal"
2323
"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient"

docs/api/rabbitmq.com.ref.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,15 @@ Required property.
496496
| *`queue`* __string__ |
497497
| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted.
498498
Can be set to 'delete' or 'retain'. Default is 'delete'.
499+
| *`queueType`* __string__ | The queue type of the internal upstream queue used by exchange federation.
500+
Defaults to classic (a single replica queue type). Set to quorum to use a replicated queue type.
501+
Changing the queue type will delete and recreate the upstream queue by default.
502+
This may lead to messages getting lost or not routed anywhere during the re-declaration.
503+
To avoid that, set resource-cleanup-mode key to never.
504+
This requires manually deleting the old upstream queue so that it can be recreated with the new type.
505+
| *`resourceCleanupMode`* __string__ | Whether to delete the internal upstream queue when federation links stop.
506+
By default, the internal upstream queue is deleted immediately when a federation link stops.
507+
Set to never to keep the upstream queue around and collect messages even when changing federation configuration.
499508
|===
500509

501510

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/go-logr/logr v1.4.2
1010
github.com/google/uuid v1.6.0
1111
github.com/hashicorp/vault/api v1.16.0
12-
github.com/michaelklishin/rabbit-hole/v2 v2.16.0
12+
github.com/michaelklishin/rabbit-hole/v3 v3.1.1-0.20250329075555-f252b4e04969
1313
github.com/onsi/ginkgo/v2 v2.23.3
1414
github.com/onsi/gomega v1.36.3
1515
github.com/rabbitmq/cluster-operator/v2 v2.12.1

go.sum

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,10 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
148148
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
149149
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
150150
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
151-
github.com/michaelklishin/rabbit-hole/v2 v2.16.0 h1:RvTPW3DnmyR7R1XTbET0ILRneU1Ou3vzIVj4YHBIE/g=
152-
github.com/michaelklishin/rabbit-hole/v2 v2.16.0/go.mod h1:wnHAhXYVncuXKdF/3mdywRE4vzBgn4k07Z+HjdNGMpM=
151+
github.com/michaelklishin/rabbit-hole/v3 v3.1.0 h1:ar9/AQedbJzmCaPL8axu18Gm+zvpQk9xjzJQ+daKlHQ=
152+
github.com/michaelklishin/rabbit-hole/v3 v3.1.0/go.mod h1:3pmYEJhHyKcMMYkmWBz1kX4gD0o/ptUgbZqAsbOygQI=
153+
github.com/michaelklishin/rabbit-hole/v3 v3.1.1-0.20250329075555-f252b4e04969 h1:Z49FYlRTmMN8J8No3biLcPLbREsGrV4sz8OaRUHhXFY=
154+
github.com/michaelklishin/rabbit-hole/v3 v3.1.1-0.20250329075555-f252b4e04969/go.mod h1:mhnrwEn7fweTZXG/bOf82YqM/u+u7pWZKghploqREHQ=
153155
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
154156
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
155157
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@@ -293,6 +295,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y
293295
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
294296
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
295297
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
298+
golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs=
299+
golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
296300
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
297301
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
298302
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -334,6 +338,8 @@ golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
334338
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
335339
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
336340
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
341+
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
342+
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
337343
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
338344
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
339345
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
@@ -389,6 +395,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
389395
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
390396
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
391397
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
398+
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
399+
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
392400
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
393401
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
394402
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
@@ -406,6 +414,8 @@ golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
406414
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
407415
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
408416
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
417+
golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU=
418+
golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s=
409419
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
410420
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
411421
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -424,6 +434,8 @@ golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
424434
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
425435
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
426436
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
437+
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
438+
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
427439
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
428440
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
429441
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
@@ -464,6 +476,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
464476
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
465477
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
466478
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
479+
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
480+
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
467481
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
468482
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
469483
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

internal/binding.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ package internal
1212
import (
1313
"encoding/json"
1414
"fmt"
15-
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
15+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
1616
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
1717
"strings"
1818
)

internal/exchange_settings.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ package internal
1212
import (
1313
"encoding/json"
1414
"fmt"
15-
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
15+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
1616
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
1717
)
1818

internal/federation_definition.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,24 @@ This product may include a number of subcomponents with separate copyright notic
1010
package internal
1111

1212
import (
13-
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
13+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
1414
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
1515
"strings"
1616
)
1717

1818
func GenerateFederationDefinition(f *topology.Federation, uri string) rabbithole.FederationDefinition {
1919
return rabbithole.FederationDefinition{
20-
Uri: strings.Split(uri, ","),
21-
Expires: f.Spec.Expires,
22-
MessageTTL: int32(f.Spec.MessageTTL),
23-
MaxHops: f.Spec.MaxHops,
24-
PrefetchCount: f.Spec.PrefetchCount,
25-
ReconnectDelay: f.Spec.ReconnectDelay,
26-
AckMode: f.Spec.AckMode,
27-
TrustUserId: f.Spec.TrustUserId,
28-
Exchange: f.Spec.Exchange,
29-
Queue: f.Spec.Queue,
20+
Uri: strings.Split(uri, ","),
21+
Expires: f.Spec.Expires,
22+
MessageTTL: int32(f.Spec.MessageTTL),
23+
MaxHops: f.Spec.MaxHops,
24+
PrefetchCount: f.Spec.PrefetchCount,
25+
ReconnectDelay: f.Spec.ReconnectDelay,
26+
AckMode: f.Spec.AckMode,
27+
TrustUserId: f.Spec.TrustUserId,
28+
Exchange: f.Spec.Exchange,
29+
Queue: f.Spec.Queue,
30+
QueueType: f.Spec.QueueType,
31+
ResourceCleanupMode: f.Spec.ResourceCleanupMode,
3032
}
3133
}

0 commit comments

Comments
 (0)