Skip to content

Commit b1bfa9d

Browse files
authored
Merge pull request #52 from hellofresh/feature/amqp-publisher
Add amqp publisher
2 parents 87fc02c + 20c033c commit b1bfa9d

File tree

7 files changed

+243
-3
lines changed

7 files changed

+243
-3
lines changed

driver/sql/projection.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/mailru/easyjson/jlexer"
9+
"github.com/mailru/easyjson/jwriter"
910
"github.com/pkg/errors"
1011

1112
"github.com/hellofresh/goengine"
@@ -129,6 +130,16 @@ func (p *ProjectionNotification) UnmarshalEasyJSON(in *jlexer.Lexer) {
129130
}
130131
}
131132

133+
// MarshalEasyJSON supports easyjson.Marshaler interface
134+
func (p *ProjectionNotification) MarshalEasyJSON(w *jwriter.Writer) {
135+
w.RawByte('{')
136+
w.RawString("\"no\":")
137+
w.Int64(p.No)
138+
w.RawString(",\"aggregate_id\":")
139+
w.String(p.AggregateID)
140+
w.RawByte('}')
141+
}
142+
132143
// GetProjectionStateSerialization returns a ProjectionStateSerialization based on the provided projection
133144
func GetProjectionStateSerialization(projection goengine.Projection) ProjectionStateSerialization {
134145
if saga, ok := projection.(ProjectionStateSerialization); ok {

extension/amqp/amqp.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package amqp
2+
3+
import (
4+
"io"
5+
6+
"github.com/streadway/amqp"
7+
)
8+
9+
// setup returns a connection and channel to be used for the Queue setup
10+
func setup(url, queue string) (io.Closer, NotificationChannel, error) {
11+
conn, err := amqp.Dial(url)
12+
if err != nil {
13+
return nil, nil, err
14+
}
15+
16+
ch, err := conn.Channel()
17+
if err != nil {
18+
return nil, nil, err
19+
}
20+
21+
if _, err := ch.QueueDeclare(queue, true, false, false, false, nil); err != nil {
22+
return nil, nil, err
23+
}
24+
25+
return conn, ch, nil
26+
}
27+
28+
// NotificationChannel represents a channel for notifications
29+
type NotificationChannel interface {
30+
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
31+
}

extension/amqp/amqp_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package amqp_test
2+
3+
import "github.com/streadway/amqp"
4+
5+
type mockConnection struct {
6+
}
7+
8+
type mockChannel struct {
9+
}
10+
11+
func (cn mockConnection) Close() error {
12+
return nil
13+
}
14+
15+
func (ch mockChannel) Publish(
16+
exchange string,
17+
queue string,
18+
mandatory bool,
19+
immediate bool,
20+
msg amqp.Publishing,
21+
) error {
22+
return nil
23+
}

extension/amqp/publisher.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package amqp
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
"time"
8+
9+
"github.com/hellofresh/goengine"
10+
"github.com/hellofresh/goengine/driver/sql"
11+
"github.com/mailru/easyjson"
12+
"github.com/streadway/amqp"
13+
)
14+
15+
var _ sql.ProjectionTrigger = (&NotificationPublisher{}).Publish
16+
17+
// NotificationPublisher is responsible of publishing a notification to queue
18+
type NotificationPublisher struct {
19+
amqpDSN string
20+
queue string
21+
minReconnectInterval time.Duration
22+
maxReconnectInterval time.Duration
23+
logger goengine.Logger
24+
25+
connection io.Closer
26+
channel NotificationChannel
27+
28+
mux sync.Mutex
29+
}
30+
31+
// NewNotificationPublisher returns an instance of NotificationPublisher
32+
func NewNotificationPublisher(
33+
amqpDSN,
34+
queue string,
35+
minReconnectInterval time.Duration,
36+
maxReconnectInterval time.Duration,
37+
logger goengine.Logger,
38+
connection io.Closer,
39+
channel NotificationChannel,
40+
) (*NotificationPublisher, error) {
41+
42+
if _, err := amqp.ParseURI(amqpDSN); err != nil {
43+
return nil, goengine.InvalidArgumentError("amqpDSN")
44+
}
45+
if len(queue) == 0 {
46+
return nil, goengine.InvalidArgumentError("queue")
47+
}
48+
return &NotificationPublisher{
49+
amqpDSN: amqpDSN,
50+
queue: queue,
51+
minReconnectInterval: minReconnectInterval,
52+
maxReconnectInterval: maxReconnectInterval,
53+
logger: logger,
54+
connection: connection,
55+
channel: channel,
56+
}, nil
57+
}
58+
59+
// Publish sends a ProjectionNotification to Queue
60+
func (p *NotificationPublisher) Publish(ctx context.Context, notification *sql.ProjectionNotification) error {
61+
reconnectInterval := p.minReconnectInterval
62+
// Ignore nil notifications since this is not supported
63+
// Skipping as we may receive a nil notification from dispatcher for the first time
64+
if notification == nil {
65+
p.logger.Warn("unable to handle nil notification, skipping", nil)
66+
return nil
67+
}
68+
69+
msgBody, err := easyjson.Marshal(notification)
70+
if err != nil {
71+
return err
72+
}
73+
74+
for {
75+
p.mux.Lock()
76+
if p.connection == nil {
77+
p.connection, p.channel, err = setup(p.amqpDSN, p.queue)
78+
if err != nil {
79+
return err
80+
}
81+
}
82+
p.mux.Unlock()
83+
84+
err = p.channel.Publish("", p.queue, true, false, amqp.Publishing{
85+
Body: msgBody,
86+
})
87+
if err == amqp.ErrClosed || err == amqp.ErrFrame || err == amqp.ErrUnexpectedFrame {
88+
if err := p.connection.Close(); err != nil {
89+
p.logger.Error("failed to close amqp connection", func(entry goengine.LoggerEntry) {
90+
entry.Error(err)
91+
})
92+
}
93+
94+
time.Sleep(reconnectInterval)
95+
reconnectInterval *= 2
96+
if reconnectInterval > p.maxReconnectInterval {
97+
reconnectInterval = p.maxReconnectInterval
98+
}
99+
100+
p.connection = nil
101+
p.channel = nil
102+
continue
103+
}
104+
105+
return err
106+
}
107+
}

extension/amqp/publisher_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package amqp_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/hellofresh/goengine"
9+
"github.com/hellofresh/goengine/driver/sql"
10+
goengineAmqp "github.com/hellofresh/goengine/extension/amqp"
11+
goengineLogger "github.com/hellofresh/goengine/extension/logrus"
12+
"github.com/sirupsen/logrus"
13+
"github.com/sirupsen/logrus/hooks/test"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestNotificationPublisher_Publish(t *testing.T) {
19+
20+
ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second)
21+
defer ctxCancel()
22+
23+
channel := &mockChannel{}
24+
connection := &mockConnection{}
25+
26+
t.Run("Invalid arguments", func(t *testing.T) {
27+
logger, _ := getLogger()
28+
29+
_, err := goengineAmqp.NewNotificationPublisher("http://localhost:5672/", "my-queue", 3, 4, logger, connection, channel)
30+
assert.Equal(t, goengine.InvalidArgumentError("amqpDSN"), err)
31+
32+
_, err = goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "", 3, 4, logger, connection, channel)
33+
assert.Equal(t, goengine.InvalidArgumentError("queue"), err)
34+
35+
})
36+
37+
t.Run("Publish Nil Notification Message", func(t *testing.T) {
38+
ensure := require.New(t)
39+
logger, loggerHook := getLogger()
40+
41+
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", 3, 4, logger, connection, channel)
42+
ensure.NoError(err)
43+
err = publisher.Publish(ctx, nil)
44+
ensure.Nil(err)
45+
ensure.Len(loggerHook.Entries, 1)
46+
ensure.Equal("unable to handle nil notification, skipping", loggerHook.LastEntry().Message)
47+
})
48+
49+
t.Run("Publish Message", func(t *testing.T) {
50+
ensure := require.New(t)
51+
logger, loggerHook := getLogger()
52+
53+
publisher, err := goengineAmqp.NewNotificationPublisher("amqp://localhost:5672/", "my-queue", 3, 4, logger, connection, channel)
54+
ensure.NoError(err)
55+
56+
err = publisher.Publish(ctx, &sql.ProjectionNotification{No: 1, AggregateID: "8150276e-34fe-49d9-aeae-a35af0040a4f"})
57+
58+
ensure.NoError(err)
59+
ensure.Len(loggerHook.Entries, 0)
60+
})
61+
}
62+
63+
func getLogger() (goengine.Logger, *test.Hook) {
64+
logger, loggerHook := test.NewNullLogger()
65+
logger.SetLevel(logrus.DebugLevel)
66+
67+
return goengineLogger.Wrap(logger), loggerHook
68+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/pkg/errors v0.8.0
1212
github.com/prometheus/client_golang v0.9.4
1313
github.com/sirupsen/logrus v1.2.0
14+
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
1415
github.com/stretchr/testify v1.3.0
1516
go.uber.org/atomic v1.3.2 // indirect
1617
go.uber.org/multierr v1.1.0 // indirect

go.sum

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
4848
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
4949
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
5050
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
51-
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
5251
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
5352
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
53+
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
54+
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
5455
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5556
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5657
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -62,13 +63,11 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
6263
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
6364
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
6465
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
65-
golang.org/x/crypto v0.0.0-20180808211826-de0752318171/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
6666
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
6767
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
6868
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
6969
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
7070
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
71-
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7271
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7372
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94=
7473
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

0 commit comments

Comments
 (0)