Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions .ci/publish-documentation-to-github-pages.sh

This file was deleted.

11 changes: 9 additions & 2 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
name: Test against supported go-version

on:
- workflow_call
push:
branches: [ main ]
pull_request:
branches: [ main ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build-ubuntu:
runs-on: ubuntu-latest
strategy:
fail-fast: true
fail-fast: false
matrix:
go:
- stable
Expand Down
11 changes: 0 additions & 11 deletions .github/workflows/main.yaml

This file was deleted.

57 changes: 39 additions & 18 deletions pkg/rabbitmqamqp/amqp_connection_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,14 +966,19 @@ var _ = Describe("Recovery connection test", func() {
})
})

Context("end-to-end tests", func() {
Context("end-to-end tests", FlakeAttempts(3), func() {
var (
env *Environment
containerId string
env *Environment
)

BeforeEach(func() {
containerId = CurrentSpecReport().LeafNodeText
AfterEach(func(ctx context.Context) {
if env != nil {
env.CloseConnections(ctx)
}
})

It("should recover the topology", func(ctx context.Context) {
const containerId = "recover-topology"
env = NewEnvironment("amqp://", &AmqpConnOptions{
TopologyRecoveryOptions: TopologyRecoveryOnlyTransient,
ContainerID: containerId,
Expand All @@ -983,15 +988,8 @@ var _ = Describe("Recovery connection test", func() {
BackOffReconnectInterval: 2 * time.Second,
MaxReconnectAttempts: 5,
},
Id: containerId,
})
})

AfterEach(func(ctx context.Context) {
env.CloseConnections(ctx)
})

It("should recover the topology", func(ctx context.Context) {
conn, err := env.NewConnection(ctx)
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -1057,6 +1055,18 @@ var _ = Describe("Recovery connection test", func() {
})

It("should not duplicate recovery records", func(ctx context.Context) {
const containerId = "not-duplicate-recovery-records"
env = NewEnvironment("amqp://", &AmqpConnOptions{
TopologyRecoveryOptions: TopologyRecoveryOnlyTransient,
ContainerID: containerId,
SASLType: amqp.SASLTypeAnonymous(),
RecoveryConfiguration: &RecoveryConfiguration{
ActiveRecovery: true,
BackOffReconnectInterval: 2 * time.Second,
MaxReconnectAttempts: 5,
},
})

conn, err := env.NewConnection(ctx)
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -1097,6 +1107,18 @@ var _ = Describe("Recovery connection test", func() {
})

It("recovers auto-gen queues", func(ctx context.Context) {
const containerId = "recover-auto-gen-queues"
env = NewEnvironment("amqp://", &AmqpConnOptions{
TopologyRecoveryOptions: TopologyRecoveryOnlyTransient,
ContainerID: containerId,
SASLType: amqp.SASLTypeAnonymous(),
RecoveryConfiguration: &RecoveryConfiguration{
ActiveRecovery: true,
BackOffReconnectInterval: 2 * time.Second,
MaxReconnectAttempts: 5,
},
})

conn, err := env.NewConnection(ctx)
Expect(err).ToNot(HaveOccurred())
ch := make(chan *StateChanged, 1)
Expand Down Expand Up @@ -1153,27 +1175,26 @@ func dropConnectionAndAwaitReconnectionByContainerID(containerID string, ch <-ch
// Drop connection
Eventually(func() error {
return testhelper.DropConnectionContainerID(containerID)
}).WithTimeout(5*time.Second).WithPolling(400*time.Millisecond).WithOffset(1).
}).WithTimeout(10*time.Second).WithPolling(500*time.Millisecond).WithOffset(1).
Should(Succeed(), "expected connection to be closed")
stateChange := new(StateChanged)
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
Eventually(ch).Within(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
Should(Receive(&stateChange))
Expect(stateChange.From).To(Equal(&StateOpen{}))
Expect(stateChange.To).To(BeAssignableToTypeOf(&StateClosed{}))

// Receive reconnecting state
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
Eventually(ch).Within(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
Should(Receive())

By("recovering the connection")
// Await reconnection
Eventually(func() (bool, error) {
conn, err := testhelper.GetConnectionByContainerID(containerID)
return conn != nil, err
}).WithTimeout(6 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
}).WithTimeout(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
Should(BeTrueBecause("expected connection to be reconnected"))
stateChange = new(StateChanged)
Eventually(ch).Within(5 * time.Second).WithPolling(400 * time.Millisecond).WithOffset(1).
Eventually(ch).Within(10 * time.Second).WithPolling(500 * time.Millisecond).WithOffset(1).
Should(Receive(&stateChange))
Expect(stateChange.To).To(Equal(&StateOpen{}))
}