Skip to content

Commit 1d321a7

Browse files
committed
add tests
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 21406bd commit 1d321a7

File tree

5 files changed

+131
-43
lines changed

5 files changed

+131
-43
lines changed

pkg/rabbitmq_amqp/amqp_connection.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func Dial(ctx context.Context, addresses []string, connOptions *AmqpConnOptions,
136136
}
137137
conn.amqpConnOptions = connOptions
138138
conn.amqpConnOptions.addresses = addresses
139+
conn.lifeCycle.SetState(&StateOpen{})
139140
return conn, nil
140141

141142
}
@@ -204,6 +205,7 @@ func (a *AmqpConnection) open(ctx context.Context, addresses []string, connOptio
204205
if azureConnection.Err() != nil {
205206
Error("connection closed unexpectedly", "error", azureConnection.Err())
206207
a.maybeReconnect()
208+
207209
return
208210
}
209211
Debug("connection closed successfully")
@@ -220,7 +222,6 @@ func (a *AmqpConnection) open(ctx context.Context, addresses []string, connOptio
220222
return err
221223
}
222224

223-
a.lifeCycle.SetState(&StateOpen{})
224225
return nil
225226
}
226227
func (a *AmqpConnection) maybeReconnect() {
@@ -232,6 +233,7 @@ func (a *AmqpConnection) maybeReconnect() {
232233
a.lifeCycle.SetState(&StateReconnecting{})
233234
numberOfAttempts := 1
234235
waitTime := a.amqpConnOptions.RecoveryConfiguration.BackOffReconnectInterval
236+
reconnected := false
235237
for numberOfAttempts <= a.amqpConnOptions.RecoveryConfiguration.MaxReconnectAttempts {
236238
///wait for before reconnecting
237239
Info("Waiting before reconnecting", "in", waitTime, "attempt", numberOfAttempts)
@@ -247,25 +249,29 @@ func (a *AmqpConnection) maybeReconnect() {
247249
waitTime = waitTime * 2
248250
Error("Failed to connection. ", "id", a.Id(), "error", err)
249251
} else {
252+
reconnected = true
250253
break
251254
}
252255
}
253256

254-
var fails int32
255-
Info("Reconnected successfully, restarting publishers and consumers")
256-
a.entitiesTracker.publishers.Range(func(key, value any) bool {
257-
publisher := value.(*Publisher)
258-
// try to createSender
259-
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
260-
err := publisher.createSender(ctx)
261-
if err != nil {
262-
atomic.AddInt32(&fails, 1)
263-
Error("Failed to createSender publisher", "ID", publisher.Id(), "error", err)
264-
}
265-
cancel()
266-
return true
267-
})
268-
Info("Restarted publishers", "number of fails", fails)
257+
if reconnected {
258+
var fails int32
259+
Info("Reconnected successfully, restarting publishers and consumers")
260+
a.entitiesTracker.publishers.Range(func(key, value any) bool {
261+
publisher := value.(*Publisher)
262+
// try to createSender
263+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
264+
err := publisher.createSender(ctx)
265+
if err != nil {
266+
atomic.AddInt32(&fails, 1)
267+
Error("Failed to createSender publisher", "ID", publisher.Id(), "error", err)
268+
}
269+
cancel()
270+
return true
271+
})
272+
Info("Restarted publishers", "number of fails", fails)
273+
a.lifeCycle.SetState(&StateOpen{})
274+
}
269275

270276
}
271277

pkg/rabbitmq_amqp/amqp_connection_recovery_test.go

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,27 @@ import (
55
"github.com/Azure/go-amqp"
66
. "github.com/onsi/ginkgo/v2"
77
. "github.com/onsi/gomega"
8-
test_helper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper"
8+
testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper"
99
"time"
1010
)
1111

1212
var _ = Describe("Recovery connection test", func() {
13-
It("connection should reconnect if dropped by via REST API", func() {
14-
containerID := "connection should reconnect if dropped by via REST API"
13+
It("connection should reconnect producers and consumers if dropped by via REST API", func() {
14+
/*
15+
The test is a bit complex since it requires to drop the connection by REST API
16+
Then wait for the connection to be reconnected.
17+
The scope of the test is to verify that the connection is reconnected and the
18+
producers and consumers are able to send and receive messages.
19+
It is more like an integration test.
20+
This kind of the tests requires time in terms of execution it has to wait for the
21+
connection to be reconnected, so to speed up the test I aggregated the tests in one.
22+
*/
23+
24+
name := "connection should reconnect producers and consumers if dropped by via REST API"
1525
connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{
1626
SASLType: amqp.SASLTypeAnonymous(),
17-
ContainerID: containerID,
27+
ContainerID: name,
28+
// reduced the reconnect interval to speed up the test
1829
RecoveryConfiguration: &RecoveryConfiguration{
1930
ActiveRecovery: true,
2031
BackOffReconnectInterval: 2 * time.Second,
@@ -25,19 +36,77 @@ var _ = Describe("Recovery connection test", func() {
2536
ch := make(chan *StateChanged, 1)
2637
connection.NotifyStatusChange(ch)
2738

39+
qName := generateName(name)
40+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
41+
Name: qName,
42+
})
43+
Expect(err).To(BeNil())
44+
Expect(queueInfo).NotTo(BeNil())
45+
46+
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{
47+
Queue: qName,
48+
}, "test")
49+
50+
Expect(err).To(BeNil())
51+
Expect(publisher).NotTo(BeNil())
52+
for i := 0; i < 5; i++ {
53+
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello")))
54+
Expect(err).To(BeNil())
55+
Expect(publishResult).NotTo(BeNil())
56+
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
57+
}
58+
2859
Eventually(func() bool {
29-
err := test_helper.DropConnectionContainerID(containerID)
60+
err := testhelper.DropConnectionContainerID(name)
3061
return err == nil
3162
}).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue())
32-
<-ch
33-
time.Sleep(2 * time.Second)
63+
st1 := <-ch
64+
Expect(st1.From).To(Equal(&StateOpen{}))
65+
Expect(st1.To).To(BeAssignableToTypeOf(&StateClosed{}))
66+
/// Closed state should have an error
67+
// Since it is forced closed by the REST API
68+
err = st1.To.(*StateClosed).GetError()
69+
Expect(err).NotTo(BeNil())
70+
Expect(err.Error()).To(ContainSubstring("Connection forced"))
71+
72+
time.Sleep(1 * time.Second)
3473
Eventually(func() bool {
35-
conn, err := test_helper.GetConnectionByContainerID(containerID)
74+
conn, err := testhelper.GetConnectionByContainerID(name)
3675
return err == nil && conn != nil
3776
}).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue())
38-
<-ch
77+
st2 := <-ch
78+
Expect(st2.From).To(BeAssignableToTypeOf(&StateClosed{}))
79+
Expect(st2.To).To(Equal(&StateReconnecting{}))
80+
81+
st3 := <-ch
82+
Expect(st3.From).To(BeAssignableToTypeOf(&StateReconnecting{}))
83+
Expect(st3.To).To(Equal(&StateOpen{}))
84+
85+
for i := 0; i < 5; i++ {
86+
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello")))
87+
Expect(err).To(BeNil())
88+
Expect(publishResult).NotTo(BeNil())
89+
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
90+
}
91+
92+
time.Sleep(500 * time.Millisecond)
93+
purged, err := connection.Management().PurgeQueue(context.Background(), qName)
94+
Expect(err).To(BeNil())
95+
Expect(purged).To(Equal(5 + 5))
96+
97+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
98+
3999
err = connection.Close(context.Background())
40-
<-ch
100+
Expect(err).To(BeNil())
101+
st4 := <-ch
102+
Expect(st4.From).To(Equal(&StateOpen{}))
103+
Expect(st4.To).To(BeAssignableToTypeOf(&StateClosed{}))
104+
err = st4.To.(*StateClosed).GetError()
105+
// the flow status should be:
106+
// from open to closed (with error)
107+
// from closed to reconnecting
108+
// from reconnecting to open
109+
// from open to closed (without error)
41110
Expect(err).To(BeNil())
42111
})
43112
})

pkg/rabbitmq_amqp/amqp_management.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
var ErrPreconditionFailed = errors.New("precondition Failed")
1414
var ErrDoesNotExist = errors.New("does not exist")
1515

16+
/*
17+
AmqpManagement is the interface to the RabbitMQ /management endpoint
18+
The management interface is used to declare/delete exchanges, queues, and bindings
19+
*/
1620
type AmqpManagement struct {
1721
session *amqp.Session
1822
sender *amqp.Sender
@@ -83,6 +87,11 @@ func (a *AmqpManagement) Close(ctx context.Context) error {
8387
return err
8488
}
8589

90+
/*
91+
Request sends a request to the /management endpoint.
92+
It is a generic method that can be used to send any request to the management endpoint.
93+
In most of the cases you don't need to use this method directly, instead use the standard methods
94+
*/
8695
func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string,
8796
expectedResponseCodes []int) (map[string]any, error) {
8897
return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes)

pkg/rabbitmq_amqp/entities.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ func (e QueueType) String() string {
2020
return string(e.Type)
2121
}
2222

23+
/*
24+
QueueSpecification represents the specification of a queue
25+
*/
2326
type QueueSpecification interface {
2427
name() string
2528
isAutoDelete() bool
@@ -28,8 +31,6 @@ type QueueSpecification interface {
2831
buildArguments() map[string]any
2932
}
3033

31-
// QuorumQueueSpecification represents the specification of the quorum queue
32-
3334
type OverflowStrategy interface {
3435
overflowStrategy() string
3536
}
@@ -73,6 +74,10 @@ func (r *ClientLocalLeaderLocator) leaderLocator() string {
7374
return "client-local"
7475
}
7576

77+
/*
78+
QuorumQueueSpecification represents the specification of the quorum queue
79+
*/
80+
7681
type QuorumQueueSpecification struct {
7782
Name string
7883
AutoExpire int64
@@ -154,7 +159,9 @@ func (q *QuorumQueueSpecification) buildArguments() map[string]any {
154159
return result
155160
}
156161

157-
// ClassicQueueSpecification represents the specification of the classic queue
162+
/*
163+
ClassicQueueSpecification represents the specification of the classic queue
164+
*/
158165
type ClassicQueueSpecification struct {
159166
Name string
160167
IsAutoDelete bool
@@ -235,6 +242,11 @@ func (q *ClassicQueueSpecification) buildArguments() map[string]any {
235242
return result
236243
}
237244

245+
/*
246+
AutoGeneratedQueueSpecification represents the specification of the auto-generated queue.
247+
It is a classic queue with auto-generated name.
248+
It is useful in context like RPC or when you need a temporary queue.
249+
*/
238250
type AutoGeneratedQueueSpecification struct {
239251
IsAutoDelete bool
240252
IsExclusive bool

pkg/rabbitmq_amqp/life_cycle.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ func statusToString(status LifeCycleState) string {
6565
}
6666

6767
type StateChanged struct {
68-
From LifeCycleState
69-
To LifeCycleState
70-
Error error
68+
From LifeCycleState
69+
To LifeCycleState
7170
}
7271

7372
func (s StateChanged) String() string {
74-
return fmt.Sprintf("From: %s, To: %s, Error: %v", statusToString(s.From), statusToString(s.To), s.Error)
73+
74+
return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To))
7575
}
7676

7777
type LifeCycle struct {
@@ -107,16 +107,8 @@ func (l *LifeCycle) SetState(value LifeCycleState) {
107107
return
108108
}
109109

110-
var stateError error
111-
switch value.(type) {
112-
case *StateClosed:
113-
stateError = value.(*StateClosed).GetError()
114-
115-
}
116-
117110
l.chStatusChanged <- &StateChanged{
118-
From: oldState,
119-
To: value,
120-
Error: stateError,
111+
From: oldState,
112+
To: value,
121113
}
122114
}

0 commit comments

Comments
 (0)