Skip to content

Commit db65a92

Browse files
committed
Misc cleanups:
* Remove empty lines * Use `:=` when appropriate * Start ensuring that connections are closed always * Use `Background` context in tests * No need to wait when establishing links * Use `Err` prefix for errors as suggested by gopls
1 parent 71ec38a commit db65a92

File tree

8 files changed

+38
-35
lines changed

8 files changed

+38
-35
lines changed

rabbitmq_amqp/amqp_binding.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ func (b *AMQPBinding) DestinationQueue(queueSpec IQueueSpecification) IBindingSp
3535
}
3636

3737
func (b *AMQPBinding) Bind(ctx context.Context) error {
38-
3938
path := bindingPath()
4039
kv := make(map[string]any)
4140
kv["binding_key"] = b.bindingKey
@@ -44,7 +43,6 @@ func (b *AMQPBinding) Bind(ctx context.Context) error {
4443
kv["arguments"] = make(map[string]any)
4544
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
4645
return err
47-
4846
}
4947

5048
func (b *AMQPBinding) Unbind(ctx context.Context) error {

rabbitmq_amqp/amqp_binding_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
)
88

99
var _ = Describe("AMQP Bindings test ", func() {
10-
1110
var connection IConnection
1211
var management IManagement
1312
BeforeEach(func() {
@@ -26,9 +25,9 @@ var _ = Describe("AMQP Bindings test ", func() {
2625
Expect(connection.Close(context.Background())).To(BeNil())
2726
})
2827

29-
It("AMQP Bindings between Exchange and Queue Should success ", func() {
30-
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success"
31-
const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success"
28+
It("AMQP Bindings between Exchange and Queue Should succeed", func() {
29+
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess"
30+
const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed"
3231
exchangeSpec := management.Exchange(exchangeName)
3332
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
3433
Expect(err).To(BeNil())
@@ -50,7 +49,5 @@ var _ = Describe("AMQP Bindings test ", func() {
5049
Expect(err).To(BeNil())
5150
err = queueSpec.Delete(context.TODO())
5251
Expect(err).To(BeNil())
53-
5452
})
55-
5653
})

rabbitmq_amqp/amqp_connection.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ func (c *ConnectionSettings) Port(port int) IConnectionSettings {
4444
}
4545

4646
func (c *ConnectionSettings) User(userName string) IConnectionSettings {
47-
4847
c.user = userName
4948
return c
5049
}
@@ -71,7 +70,6 @@ func (c *ConnectionSettings) GetHost() string {
7170
func (c *ConnectionSettings) Host(hostName string) IConnectionSettings {
7271
c.host = hostName
7372
return c
74-
7573
}
7674

7775
func (c *ConnectionSettings) GetPort() int {
@@ -170,6 +168,7 @@ func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectio
170168

171169
err = a.Management().Open(ctx, a)
172170
if err != nil {
171+
// TODO close connection?
173172
return err
174173
}
175174
return nil

rabbitmq_amqp/amqp_connection_test.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ var _ = Describe("AMQP Connection Test", func() {
1717
Expect(connectionSettings).NotTo(BeNil())
1818
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
1919
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
20-
err := amqpConnection.Open(context.TODO(), connectionSettings)
20+
21+
err := amqpConnection.Open(context.Background(), connectionSettings)
22+
Expect(err).To(BeNil())
23+
err = amqpConnection.Close(context.Background())
2124
Expect(err).To(BeNil())
2225
})
2326

@@ -30,7 +33,10 @@ var _ = Describe("AMQP Connection Test", func() {
3033
Expect(connectionSettings).NotTo(BeNil())
3134
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
3235
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
33-
err := amqpConnection.Open(context.TODO(), connectionSettings)
36+
37+
err := amqpConnection.Open(context.Background(), connectionSettings)
38+
Expect(err).To(BeNil())
39+
err = amqpConnection.Close(context.Background())
3440
Expect(err).To(BeNil())
3541
})
3642

@@ -42,12 +48,12 @@ var _ = Describe("AMQP Connection Test", func() {
4248
Expect(connectionSettings).NotTo(BeNil())
4349
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
4450
connectionSettings.Host("localhost").Port(1234)
45-
err := amqpConnection.Open(context.TODO(), connectionSettings)
51+
52+
err := amqpConnection.Open(context.Background(), connectionSettings)
4653
Expect(err).NotTo(BeNil())
4754
})
4855

4956
It("AMQP Connection should fail due of wrong host", func() {
50-
5157
amqpConnection := NewAmqpConnection()
5258
Expect(amqpConnection).NotTo(BeNil())
5359
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@@ -56,7 +62,8 @@ var _ = Describe("AMQP Connection Test", func() {
5662
Expect(connectionSettings).NotTo(BeNil())
5763
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
5864
connectionSettings.Host("wronghost").Port(5672)
59-
err := amqpConnection.Open(context.TODO(), connectionSettings)
65+
66+
err := amqpConnection.Open(context.Background(), connectionSettings)
6067
Expect(err).NotTo(BeNil())
6168
})
6269

@@ -74,7 +81,7 @@ var _ = Describe("AMQP Connection Test", func() {
7481
Expect(amqpConnection).NotTo(BeNil())
7582
ch := make(chan *StatusChanged, 1)
7683
amqpConnection.NotifyStatusChange(ch)
77-
err := amqpConnection.Open(context.TODO(), NewConnectionSettings())
84+
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
7885
Expect(err).To(BeNil())
7986
recv := <-ch
8087
Expect(recv).NotTo(BeNil())
@@ -88,7 +95,6 @@ var _ = Describe("AMQP Connection Test", func() {
8895

8996
Expect(recv.From).To(Equal(Open))
9097
Expect(recv.To).To(Equal(Closed))
91-
9298
})
9399

94100
//It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() {
@@ -103,8 +109,7 @@ var _ = Describe("AMQP Connection Test", func() {
103109
// })
104110
// Expect(connectionSettings).NotTo(BeNil())
105111
// Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
106-
// err := amqpConnection.Open(context.TODO(), connectionSettings)
112+
// err := amqpConnection.Open(context.Background(), connectionSettings)
107113
// Expect(err).To(BeNil())
108114
//})
109-
110115
})

rabbitmq_amqp/amqp_exchange.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
3434
}
3535

3636
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
37-
3837
path := exchangePath(e.name)
3938
kv := make(map[string]any)
4039
kv["auto_delete"] = e.isAutoDelete

rabbitmq_amqp/amqp_managent.go renamed to rabbitmq_amqp/amqp_management.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ import (
77
"github.com/Azure/go-amqp"
88
"github.com/google/uuid"
99
"strconv"
10-
"time"
1110
)
1211

13-
var PreconditionFailed = errors.New("precondition Failed")
12+
var ErrPreconditionFailed = errors.New("precondition Failed")
1413

1514
type AmqpManagement struct {
1615
session *amqp.Session
@@ -110,19 +109,18 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error
110109
if err != nil {
111110
return err
112111
}
112+
113113
a.session = session
114114
err = a.ensureSenderLink(ctx)
115-
116115
if err != nil {
117116
return err
118117
}
119118

120-
time.Sleep(500 * time.Millisecond)
121119
err = a.ensureReceiverLink(ctx)
122-
time.Sleep(500 * time.Millisecond)
123120
if err != nil {
124121
return err
125122
}
123+
126124
a.lifeCycle.SetStatus(Open)
127125
return ctx.Err()
128126
}
@@ -139,13 +137,11 @@ func (a *AmqpManagement) Request(ctx context.Context, body any, path string, met
139137
expectedResponseCodes []int) (map[string]any, error) {
140138

141139
return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes)
142-
143140
}
144141

145142
func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error {
146-
147143
if responseCode == responseCode409 {
148-
return PreconditionFailed
144+
return ErrPreconditionFailed
149145
}
150146

151147
for _, code := range expectedResponseCodes {
@@ -154,34 +150,40 @@ func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponse
154150
}
155151
}
156152

157-
return errors.New(fmt.Sprintf("expected response code %d got %d", expectedResponseCodes, responseCode))
153+
return fmt.Errorf("expected response code %d got %d", expectedResponseCodes, responseCode)
158154
}
159155

160156
func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string,
161157
expectedResponseCodes []int) (map[string]any, error) {
162158
amqpMessage := &amqp.Message{
163159
Value: body,
164160
}
161+
165162
s := commandReplyTo
166163
amqpMessage.Properties = &amqp.MessageProperties{
167164
ReplyTo: &s,
168165
To: &path,
169166
Subject: &method,
170167
MessageID: &id,
171168
}
169+
172170
opts := &amqp.SendOptions{Settled: true}
171+
173172
err := a.sender.Send(ctx, amqpMessage, opts)
174173
if err != nil {
175174
return make(map[string]any), err
176175
}
176+
177177
msg, err := a.receiver.Receive(ctx, nil)
178178
if err != nil {
179179
return make(map[string]any), err
180180
}
181+
181182
err = a.receiver.AcceptMessage(ctx, msg)
182183
if err != nil {
183184
return nil, err
184185
}
186+
185187
if msg.Properties == nil {
186188
return make(map[string]any), fmt.Errorf("expected properties in the message")
187189
}
@@ -193,6 +195,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
193195
if msg.Properties.CorrelationID != id {
194196
return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID)
195197
}
198+
196199
switch msg.Value.(type) {
197200
case map[string]interface{}:
198201
return msg.Value.(map[string]any), nil

rabbitmq_amqp/amqp_managent_test.go renamed to rabbitmq_amqp/amqp_management_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ var _ = Describe("Management tests", func() {
1919
cancel()
2020
err = amqpConnection.Management().Open(ctx, amqpConnection)
2121
Expect(err).NotTo(BeNil())
22+
amqpConnection.Close(context.Background())
2223
})
2324

2425
It("AMQP Management should receive events ", func() {
2526
amqpConnection := NewAmqpConnection()
2627
Expect(amqpConnection).NotTo(BeNil())
2728
ch := make(chan *StatusChanged, 1)
2829
amqpConnection.Management().NotifyStatusChange(ch)
29-
err := amqpConnection.Open(context.TODO(), NewConnectionSettings())
30+
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
3031
Expect(err).To(BeNil())
3132
recv := <-ch
3233
Expect(recv).NotTo(BeNil())
@@ -40,7 +41,7 @@ var _ = Describe("Management tests", func() {
4041

4142
Expect(recv.From).To(Equal(Open))
4243
Expect(recv.To).To(Equal(Closed))
43-
44+
amqpConnection.Close(context.Background())
4445
})
4546

4647
It("Request", func() {
@@ -51,7 +52,7 @@ var _ = Describe("Management tests", func() {
5152
connectionSettings := NewConnectionSettings()
5253
Expect(connectionSettings).NotTo(BeNil())
5354
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
54-
err := amqpConnection.Open(context.TODO(), connectionSettings)
55+
err := amqpConnection.Open(context.Background(), connectionSettings)
5556
Expect(err).To(BeNil())
5657

5758
management := amqpConnection.Management()
@@ -62,9 +63,10 @@ var _ = Describe("Management tests", func() {
6263
_queueArguments["x-queue-type"] = "quorum"
6364
kv["arguments"] = _queueArguments
6465
path := "/queues/test"
65-
result, err := management.Request(context.TODO(), kv, path, "PUT", []int{200})
66+
result, err := management.Request(context.Background(), kv, path, "PUT", []int{200})
6667
Expect(err).To(BeNil())
6768
Expect(result).NotTo(BeNil())
68-
Expect(management.Close(context.TODO())).To(BeNil())
69+
Expect(management.Close(context.Background())).To(BeNil())
70+
amqpConnection.Close(context.Background())
6971
})
7072
})

rabbitmq_amqp/amqp_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ var _ = Describe("AMQP Queue test ", func() {
126126
queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum})
127127
_, err = queueSpecFail.Declare(context.TODO())
128128
Expect(err).NotTo(BeNil())
129-
Expect(err).To(Equal(PreconditionFailed))
129+
Expect(err).To(Equal(ErrPreconditionFailed))
130130
err = queueSpec.Delete(context.TODO())
131131
Expect(err).To(BeNil())
132132
})

0 commit comments

Comments
 (0)