Skip to content

Commit 96fbddd

Browse files
committed
Tests for publisher.Publish
1 parent 8d87332 commit 96fbddd

File tree

4 files changed

+122
-8
lines changed

4 files changed

+122
-8
lines changed

extension/amqp/amqp.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"github.com/streadway/amqp"
55
)
66

7-
// setup returns a connection and channel to be used for consumption with the Queue setup
8-
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
7+
// setup returns a connection and channel to be used for the Queue setup
8+
func setup(url, queue string) (NotificationConnection, NotificationChannel, error) {
99
conn, err := amqp.Dial(url)
1010
if err != nil {
1111
return nil, nil, err
@@ -22,3 +22,13 @@ func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
2222

2323
return conn, ch, nil
2424
}
25+
26+
// NotificationChannel represents a channel for notifications
27+
type NotificationChannel interface {
28+
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
29+
}
30+
31+
// NotificationConnection represents a connection to the notification queue
32+
type NotificationConnection interface {
33+
Close() error
34+
}

extension/amqp/amqp_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package amqp_test
2+
3+
import "github.com/streadway/amqp"
4+
5+
type mockConnection struct {
6+
}
7+
8+
type mockChannel struct {
9+
}
10+
11+
func (cn mockConnection) Close() error {
12+
return nil
13+
}
14+
15+
func (ch mockChannel) Publish(
16+
exchange string,
17+
queue string,
18+
mandatory bool,
19+
immediate bool,
20+
msg amqp.Publishing,
21+
) error {
22+
return nil
23+
}

extension/amqp/publisher.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,29 @@ type NotificationPublisher struct {
1717
queue string
1818
logger goengine.Logger
1919

20-
connection *amqp.Connection
21-
channel *amqp.Channel
20+
connection NotificationConnection
21+
channel NotificationChannel
2222
}
2323

2424
// NewNotificationPublisher returns an instance of NotificationPublisher
25-
func NewNotificationPublisher(amqpDSN, queue string, logger goengine.Logger) (*NotificationPublisher, error) {
25+
func NewNotificationPublisher(amqpDSN, queue string,
26+
logger goengine.Logger,
27+
connection NotificationConnection,
28+
channel NotificationChannel,
29+
) (*NotificationPublisher, error) {
30+
31+
if _, err := amqp.ParseURI(amqpDSN); err != nil {
32+
return nil, goengine.InvalidArgumentError("amqpDSN")
33+
}
34+
if len(queue) == 0 {
35+
return nil, goengine.InvalidArgumentError("queue")
36+
}
2637
return &NotificationPublisher{
27-
amqpDSN: amqpDSN,
28-
queue: queue,
29-
logger: logger,
38+
amqpDSN: amqpDSN,
39+
queue: queue,
40+
logger: logger,
41+
connection: connection,
42+
channel: channel,
3043
}, nil
3144
}
3245

extension/amqp/publisher_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package amqp_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/hellofresh/goengine"
9+
"github.com/hellofresh/goengine/driver/sql"
10+
goengineAmqp "github.com/hellofresh/goengine/extension/amqp"
11+
goengineLogger "github.com/hellofresh/goengine/extension/logrus"
12+
"github.com/sirupsen/logrus"
13+
"github.com/sirupsen/logrus/hooks/test"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestNotificationPublisher_Publish(t *testing.T) {
19+
20+
ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second)
21+
defer ctxCancel()
22+
23+
channel := &mockChannel{}
24+
connection := &mockConnection{}
25+
26+
t.Run("Invalid arguments", func(t *testing.T) {
27+
logger, _ := getLogger()
28+
29+
_, err := goengineAmqp.NewNotificationPublisher("http://localhost:5672/", "my-queue", logger, connection, channel)
30+
assert.Equal(t, goengine.InvalidArgumentError("amqpDSN"), err)
31+
32+
_, err = goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "", logger, connection, channel)
33+
assert.Equal(t, goengine.InvalidArgumentError("queue"), err)
34+
35+
})
36+
37+
t.Run("Publish Nil Notification Message", func(t *testing.T) {
38+
ensure := require.New(t)
39+
logger, loggerHook := getLogger()
40+
41+
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", logger, connection, channel)
42+
ensure.NoError(err)
43+
err = publisher.Publish(ctx, nil)
44+
ensure.Nil(err)
45+
ensure.Len(loggerHook.Entries, 1)
46+
ensure.Equal("unable to handle nil notification, skipping", loggerHook.LastEntry().Message)
47+
})
48+
49+
t.Run("Publish Message", func(t *testing.T) {
50+
ensure := require.New(t)
51+
logger, loggerHook := getLogger()
52+
53+
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", logger, connection, channel)
54+
ensure.NoError(err)
55+
56+
err = publisher.Publish(ctx, &sql.ProjectionNotification{No: 1, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"})
57+
58+
ensure.NoError(err)
59+
ensure.Len(loggerHook.Entries, 0)
60+
})
61+
}
62+
63+
func getLogger() (goengine.Logger, *test.Hook) {
64+
logger, loggerHook := test.NewNullLogger()
65+
logger.SetLevel(logrus.DebugLevel)
66+
67+
return goengineLogger.Wrap(logger), loggerHook
68+
}

0 commit comments

Comments
 (0)