Skip to content

Commit 4cb7a0d

Browse files
committed
Mark topology E2E test as flaky
Because the management helper sometimes fails to find the connection string. This seems to be alleviated by increasing the timeouts to 10 seconds, and polling to 500ms. This makes sense because the management API in the browser refreshes every 5 seconds, and rabbit probably needs some time to update the management DB with connection information. With this commit, the E2E test suite ran successfully 26 times in a row.
1 parent daff3a8 commit 4cb7a0d

File tree

1 file changed

+39
-18
lines changed

1 file changed

+39
-18
lines changed

pkg/rabbitmqamqp/amqp_connection_recovery_test.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -966,14 +966,19 @@ var _ = Describe("Recovery connection test", func() {
966966
})
967967
})
968968

969-
Context("end-to-end tests", func() {
969+
Context("end-to-end tests", FlakeAttempts(3), func() {
970970
var (
971-
env *Environment
972-
containerId string
971+
env *Environment
973972
)
974973

975-
BeforeEach(func() {
976-
containerId = CurrentSpecReport().LeafNodeText
974+
AfterEach(func(ctx context.Context) {
975+
if env != nil {
976+
env.CloseConnections(ctx)
977+
}
978+
})
979+
980+
It("should recover the topology", func(ctx context.Context) {
981+
const containerId = "recover-topology"
977982
env = NewEnvironment("amqp://", &AmqpConnOptions{
978983
TopologyRecoveryOptions: TopologyRecoveryOnlyTransient,
979984
ContainerID: containerId,
@@ -983,15 +988,8 @@ var _ = Describe("Recovery connection test", func() {
983988
BackOffReconnectInterval: 2 * time.Second,
984989
MaxReconnectAttempts: 5,
985990
},
986-
Id: containerId,
987991
})
988-
})
989992

990-
AfterEach(func(ctx context.Context) {
991-
env.CloseConnections(ctx)
992-
})
993-
994-
It("should recover the topology", func(ctx context.Context) {
995993
conn, err := env.NewConnection(ctx)
996994
Expect(err).ToNot(HaveOccurred())
997995

@@ -1057,6 +1055,18 @@ var _ = Describe("Recovery connection test", func() {
10571055
})
10581056

10591057
It("should not duplicate recovery records", func(ctx context.Context) {
1058+
const containerId = "not-duplicate-recovery-records"
1059+
env = NewEnvironment("amqp://", &AmqpConnOptions{
1060+
TopologyRecoveryOptions: TopologyRecoveryOnlyTransient,
1061+
ContainerID: containerId,
1062+
SASLType: amqp.SASLTypeAnonymous(),
1063+
RecoveryConfiguration: &RecoveryConfiguration{
1064+
ActiveRecovery: true,
1065+
BackOffReconnectInterval: 2 * time.Second,
1066+
MaxReconnectAttempts: 5,
1067+
},
1068+
})
1069+
10601070
conn, err := env.NewConnection(ctx)
10611071
Expect(err).ToNot(HaveOccurred())
10621072

@@ -1097,6 +1107,18 @@ var _ = Describe("Recovery connection test", func() {
10971107
})
10981108

10991109
It("recovers auto-gen queues", func(ctx context.Context) {
1110+
const containerId = "recover-auto-gen-queues"
1111+
env = NewEnvironment("amqp://", &AmqpConnOptions{
1112+
TopologyRecoveryOptions: TopologyRecoveryOnlyTransient,
1113+
ContainerID: containerId,
1114+
SASLType: amqp.SASLTypeAnonymous(),
1115+
RecoveryConfiguration: &RecoveryConfiguration{
1116+
ActiveRecovery: true,
1117+
BackOffReconnectInterval: 2 * time.Second,
1118+
MaxReconnectAttempts: 5,
1119+
},
1120+
})
1121+
11001122
conn, err := env.NewConnection(ctx)
11011123
Expect(err).ToNot(HaveOccurred())
11021124
ch := make(chan *StateChanged, 1)
@@ -1153,27 +1175,26 @@ func dropConnectionAndAwaitReconnectionByContainerID(containerID string, ch <-ch
11531175
// Drop connection
11541176
Eventually(func() error {
11551177
return testhelper.DropConnectionContainerID(containerID)
1156-
}).WithTimeout(5*time.Second).WithPolling(400*time.Millisecond).WithOffset(1).
1178+
}).WithTimeout(10*time.Second).WithPolling(500*time.Millisecond).WithOffset(1).
11571179
Should(Succeed(), "expected connection to be closed")
11581180
stateChange := new(StateChanged)
1159-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1181+
Eventually(ch).Within(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
11601182
Should(Receive(&stateChange))
11611183
Expect(stateChange.From).To(Equal(&StateOpen{}))
11621184
Expect(stateChange.To).To(BeAssignableToTypeOf(&StateClosed{}))
11631185

11641186
// Receive reconnecting state
1165-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1187+
Eventually(ch).Within(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
11661188
Should(Receive())
11671189

1168-
By("recovering the connection")
11691190
// Await reconnection
11701191
Eventually(func() (bool, error) {
11711192
conn, err := testhelper.GetConnectionByContainerID(containerID)
11721193
return conn != nil, err
1173-
}).WithTimeout(6 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1194+
}).WithTimeout(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
11741195
Should(BeTrueBecause("expected connection to be reconnected"))
11751196
stateChange = new(StateChanged)
1176-
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
1197+
Eventually(ch).Within(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
11771198
Should(Receive(&stateChange))
11781199
Expect(stateChange.To).To(Equal(&StateOpen{}))
11791200
}

0 commit comments

Comments
 (0)