Skip to content

Commit 28aeb6b

Browse files
boekkooi-freshbilaljaved
authored andcommitted
Parse amqpDSN in order to fail early
1 parent 89c6aee commit 28aeb6b

File tree

3 files changed

+31
-12
lines changed

3 files changed

+31
-12
lines changed

extension/amqp/amqp.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ func setup(url, queue string) (io.Closer, NotificationChannel, error) {
3636

3737
// DirectQueueConsume returns a Consume func that will connect to the provided AMQP server and create a queue for direct message delivery
3838
func DirectQueueConsume(amqpDSN, queue string) (Consume, error) {
39-
switch {
40-
case len(amqpDSN) == 0:
39+
if _, err := amqp.ParseURI(amqpDSN); err != nil {
4140
return nil, goengine.InvalidArgumentError("amqpDSN")
42-
case len(queue) == 0:
41+
}
42+
if len(queue) == 0 {
4343
return nil, goengine.InvalidArgumentError("queue")
4444
}
4545

@@ -54,7 +54,7 @@ func DirectQueueConsume(amqpDSN, queue string) (Consume, error) {
5454
return nil, nil, err
5555
}
5656

57-
// Exclusive consumer
57+
// Since there can be multiple consumers, fair distribution of deliveries is required
5858
deliveries, err := ch.Consume(queue, "", true, false, false, false, nil)
5959

6060
return conn, deliveries, err

extension/amqp/amqp_test.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
1+
// +build unit
2+
13
package amqp_test
24

3-
import "github.com/streadway/amqp"
5+
import (
6+
"testing"
7+
8+
"github.com/hellofresh/goengine"
9+
goengineAmqp "github.com/hellofresh/goengine/extension/amqp"
10+
"github.com/streadway/amqp"
11+
"github.com/stretchr/testify/assert"
12+
)
413

514
type mockConnection struct {
615
}
@@ -36,3 +45,20 @@ func (ch mockChannel) Consume(
3645
func (ch mockChannel) Qos(prefetchCount, prefetchSize int, global bool) error {
3746
return nil
3847
}
48+
49+
func TestDirectQueueConsume(t *testing.T) {
50+
t.Run("Invalid arguments", func(t *testing.T) {
51+
_, err := goengineAmqp.DirectQueueConsume("http://localhost:5672/", "my-queue")
52+
assert.Equal(t, goengine.InvalidArgumentError("amqpDSN"), err)
53+
54+
_, err = goengineAmqp.DirectQueueConsume("amqp://localhost:5672/", "")
55+
assert.Equal(t, goengine.InvalidArgumentError("queue"), err)
56+
})
57+
58+
t.Run("Returns amqp.Consume", func(t *testing.T) {
59+
c, err := goengineAmqp.DirectQueueConsume("amqp://localhost:5672/", "my-queue")
60+
assert.NoError(t, err)
61+
assert.NotNil(t, c)
62+
assert.IsType(t, (goengineAmqp.Consume)(nil), c)
63+
})
64+
}

extension/amqp/publisher_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,3 @@ func TestNotificationPublisher_Publish(t *testing.T) {
5858
ensure.Len(loggerHook.Entries, 0)
5959
})
6060
}
61-
62-
//func getLogger() (goengine.Logger, *test.Hook) {
63-
// logger, loggerHook := test.NewNullLogger()
64-
// logger.SetLevel(logrus.DebugLevel)
65-
//
66-
// return goengineLogger.Wrap(logger), loggerHook
67-
//}

0 commit comments

Comments
 (0)