Skip to content

Commit 9b41b13

Browse files
committed
Auto-gen queue spec should return consistent name
Because a concrete queue specification should always the same name, otherwise the queue spec is not really a spec, but a random generator.
1 parent a0bf21a commit 9b41b13

File tree

3 files changed

+108
-53
lines changed

3 files changed

+108
-53
lines changed

pkg/rabbitmqamqp/amqp_connection_recovery_test.go

Lines changed: 81 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,32 +1038,7 @@ var _ = Describe("Recovery connection test", func() {
10381038
Expect(msgReceived.Message().GetData()).To(Equal([]byte("hello")))
10391039
Expect(msgReceived.Accept(ctx)).To(Succeed())
10401040

1041-
// Drop connection
1042-
Eventually(func() error {
1043-
return testhelper.DropConnectionContainerID(containerId)
1044-
}).WithTimeout(5*time.Second).WithPolling(400*time.Millisecond).
1045-
Should(Succeed(), "expected connection to be closed")
1046-
stateChange := new(StateChanged)
1047-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).
1048-
Should(Receive(&stateChange))
1049-
Expect(stateChange.From).To(Equal(&StateOpen{}))
1050-
Expect(stateChange.To).To(BeAssignableToTypeOf(&StateClosed{}))
1051-
1052-
// Receive reconnecting state
1053-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).
1054-
Should(Receive())
1055-
1056-
By("recovering the connection")
1057-
// Await reconnection
1058-
Eventually(func() (bool, error) {
1059-
conn, err := testhelper.GetConnectionByContainerID(containerId)
1060-
return conn != nil, err
1061-
}).WithTimeout(6 * time.Second).WithPolling(400 * time.Millisecond).
1062-
Should(BeTrueBecause("expected connection to be reconnected"))
1063-
stateChange = new(StateChanged)
1064-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).
1065-
Should(Receive(&stateChange))
1066-
Expect(stateChange.To).To(Equal(&StateOpen{}))
1041+
dropConnectionAndAwaitReconnectionByContainerID(containerId, ch)
10671042

10681043
By("publishing and consuming again")
10691044
// Publish again
@@ -1112,39 +1087,93 @@ var _ = Describe("Recovery connection test", func() {
11121087
ch := make(chan *StateChanged, 1)
11131088
conn.NotifyStatusChange(ch)
11141089

1115-
// Drop connection
1116-
Eventually(func() error {
1117-
return testhelper.DropConnectionContainerID(containerId)
1118-
}).WithTimeout(5*time.Second).WithPolling(400*time.Millisecond).
1119-
Should(Succeed(), "expected connection to be closed")
1120-
stateChange := new(StateChanged)
1121-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).
1122-
Should(Receive(&stateChange))
1123-
Expect(stateChange.From).To(Equal(&StateOpen{}))
1124-
Expect(stateChange.To).To(BeAssignableToTypeOf(&StateClosed{}))
1125-
1126-
// Receive reconnecting state
1127-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).
1128-
Should(Receive())
1129-
1130-
By("recovering the connection")
1131-
// Await reconnection
1132-
Eventually(func() (bool, error) {
1133-
conn, err := testhelper.GetConnectionByContainerID(containerId)
1134-
return conn != nil, err
1135-
}).WithTimeout(6 * time.Second).WithPolling(400 * time.Millisecond).
1136-
Should(BeTrueBecause("expected connection to be reconnected"))
1137-
stateChange = new(StateChanged)
1138-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).
1139-
Should(Receive(&stateChange))
1140-
Expect(stateChange.To).To(Equal(&StateOpen{}))
1090+
dropConnectionAndAwaitReconnectionByContainerID(containerId, ch)
11411091

11421092
By("verifying that the recovery records are not duplicated")
11431093
Expect(conn.topologyRecoveryRecords.queues).To(HaveLen(1))
11441094
Expect(conn.topologyRecoveryRecords.bindings).To(HaveLen(1))
11451095

11461096
_ = conn.Close(ctx)
11471097
})
1098+
1099+
It("should recover auto-generated queues bound to persistent exchanges", func(ctx context.Context) {
1100+
conn, err := env.NewConnection(ctx)
1101+
Expect(err).ToNot(HaveOccurred())
1102+
ch := make(chan *StateChanged, 1)
1103+
conn.NotifyStatusChange(ch)
1104+
1105+
const exchange = "amq.fanout"
1106+
1107+
queue, err := conn.Management().DeclareQueue(ctx, &AutoGeneratedQueueSpecification{
1108+
IsAutoDelete: true,
1109+
IsExclusive: true,
1110+
})
1111+
Expect(err).ToNot(HaveOccurred())
1112+
1113+
_, err = conn.Management().Bind(ctx, &ExchangeToQueueBindingSpecification{
1114+
SourceExchange: exchange,
1115+
DestinationQueue: queue.Name(),
1116+
BindingKey: "auto-generated-queue-bound-to-persistent-exchange",
1117+
})
1118+
Expect(err).ToNot(HaveOccurred())
1119+
1120+
producer, err := conn.NewPublisher(ctx, &ExchangeAddress{Exchange: exchange, Key: "auto-generated-queue-bound-to-persistent-exchange"}, nil)
1121+
Expect(err).ToNot(HaveOccurred())
1122+
1123+
msg := NewMessage([]byte("hello"))
1124+
result, err := producer.Publish(ctx, msg)
1125+
Expect(err).ToNot(HaveOccurred())
1126+
Expect(result.Outcome).To(Equal(&StateAccepted{}))
1127+
1128+
consumer, err := conn.NewConsumer(ctx, queue.Name(), nil)
1129+
Expect(err).ToNot(HaveOccurred())
1130+
1131+
msgReceived, err := consumer.Receive(ctx)
1132+
Expect(err).ToNot(HaveOccurred())
1133+
Expect(msgReceived.Message().GetData()).To(Equal([]byte("hello")))
1134+
Expect(msgReceived.Accept(ctx)).To(Succeed())
1135+
1136+
dropConnectionAndAwaitReconnectionByContainerID(containerId, ch)
1137+
1138+
msg = NewMessage([]byte("hello again"))
1139+
result, err = producer.Publish(ctx, msg)
1140+
Expect(err).ToNot(HaveOccurred())
1141+
Expect(result.Outcome).To(Equal(&StateAccepted{}))
1142+
1143+
msgReceived, err = consumer.Receive(ctx)
1144+
Expect(err).ToNot(HaveOccurred())
1145+
Expect(msgReceived.Message().GetData()).To(Equal([]byte("hello again")))
1146+
Expect(msgReceived.Accept(ctx)).To(Succeed())
1147+
})
11481148
})
11491149
})
11501150
})
1151+
1152+
func dropConnectionAndAwaitReconnectionByContainerID(containerID string, ch <-chan *StateChanged) {
1153+
// Drop connection
1154+
Eventually(func() error {
1155+
return testhelper.DropConnectionContainerID(containerID)
1156+
}).WithTimeout(5*time.Second).WithPolling(400*time.Millisecond).WithOffset(1).
1157+
Should(Succeed(), "expected connection to be closed")
1158+
stateChange := new(StateChanged)
1159+
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1160+
Should(Receive(&stateChange))
1161+
Expect(stateChange.From).To(Equal(&StateOpen{}))
1162+
Expect(stateChange.To).To(BeAssignableToTypeOf(&StateClosed{}))
1163+
1164+
// Receive reconnecting state
1165+
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1166+
Should(Receive())
1167+
1168+
By("recovering the connection")
1169+
// Await reconnection
1170+
Eventually(func() (bool, error) {
1171+
conn, err := testhelper.GetConnectionByContainerID(containerID)
1172+
return conn != nil, err
1173+
}).WithTimeout(6 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1174+
Should(BeTrueBecause("expected connection to be reconnected"))
1175+
stateChange = new(StateChanged)
1176+
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1177+
Should(Receive(&stateChange))
1178+
Expect(stateChange.To).To(Equal(&StateOpen{}))
1179+
}

pkg/rabbitmqamqp/entities.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ It is a classic queue with auto-generated name.
254254
It is useful in context like RPC or when you need a temporary queue.
255255
*/
256256
type AutoGeneratedQueueSpecification struct {
257+
genName string
257258
IsAutoDelete bool
258259
IsExclusive bool
259260
MaxLength int64
@@ -262,7 +263,10 @@ type AutoGeneratedQueueSpecification struct {
262263
}
263264

264265
func (a *AutoGeneratedQueueSpecification) name() string {
265-
return generateNameWithDefaultPrefix()
266+
if a.genName == "" {
267+
a.genName = generateNameWithDefaultPrefix()
268+
}
269+
return a.genName
266270
}
267271

268272
func (a *AutoGeneratedQueueSpecification) isAutoDelete() bool {

pkg/rabbitmqamqp/entities_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package rabbitmqamqp
2+
3+
import (
4+
. "github.com/onsi/ginkgo/v2"
5+
. "github.com/onsi/gomega"
6+
)
7+
8+
var _ = Describe("Entities", func() {
9+
Describe("AutoGeneratedQueueSpecification", func() {
10+
It("should generate a unique name consistently", func() {
11+
queue := &AutoGeneratedQueueSpecification{}
12+
generatedName := queue.name()
13+
Expect(queue.name()).To(Equal(generatedName), "same instance should generate the same name")
14+
15+
anotherQueue := &AutoGeneratedQueueSpecification{}
16+
anotherGeneratedName := anotherQueue.name()
17+
Expect(anotherQueue.name()).To(Equal(anotherGeneratedName), "same instance should generate the same name")
18+
19+
Expect(generatedName).ToNot(Equal(anotherGeneratedName), "different instances should generate different names")
20+
})
21+
})
22+
})

0 commit comments

Comments
 (0)