Skip to content

Commit 8d87332

Browse files
committed
Add amqp publisher
1 parent cb67f42 commit 8d87332

File tree

5 files changed

+108
-3
lines changed

5 files changed

+108
-3
lines changed

driver/sql/projection.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/hellofresh/goengine"
99
"github.com/mailru/easyjson/jlexer"
10+
"github.com/mailru/easyjson/jwriter"
1011
"github.com/pkg/errors"
1112
)
1213

@@ -128,6 +129,16 @@ func (p *ProjectionNotification) UnmarshalEasyJSON(in *jlexer.Lexer) {
128129
}
129130
}
130131

132+
// MarshalEasyJSON supports easyjson.Marshaler interface
133+
func (p *ProjectionNotification) MarshalEasyJSON(w *jwriter.Writer) {
134+
w.RawByte('{')
135+
w.RawString("\"no\":")
136+
w.Int64(p.No)
137+
w.RawString(",\"aggregate_id\":")
138+
w.String(p.AggregateID)
139+
w.RawByte('}')
140+
}
141+
131142
// GetProjectionStateSerialization returns a ProjectionStateSerialization based on the provided projection
132143
func GetProjectionStateSerialization(projection goengine.Projection) ProjectionStateSerialization {
133144
if saga, ok := projection.(ProjectionStateSerialization); ok {

extension/amqp/amqp.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package amqp
2+
3+
import (
4+
"github.com/streadway/amqp"
5+
)
6+
7+
// setup returns a connection and channel to be used for consumption with the Queue setup
8+
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
9+
conn, err := amqp.Dial(url)
10+
if err != nil {
11+
return nil, nil, err
12+
}
13+
14+
ch, err := conn.Channel()
15+
if err != nil {
16+
return nil, nil, err
17+
}
18+
19+
if _, err := ch.QueueDeclare(queue, true, false, false, false, nil); err != nil {
20+
return nil, nil, err
21+
}
22+
23+
return conn, ch, nil
24+
}

extension/amqp/publisher.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package amqp
2+
3+
import (
4+
"context"
5+
6+
"github.com/hellofresh/goengine"
7+
"github.com/hellofresh/goengine/driver/sql"
8+
"github.com/mailru/easyjson"
9+
"github.com/streadway/amqp"
10+
)
11+
12+
var _ sql.ProjectionTrigger = (&NotificationPublisher{}).Publish
13+
14+
// NotificationPublisher is responsible of publishing a notification to queue
15+
type NotificationPublisher struct {
16+
amqpDSN string
17+
queue string
18+
logger goengine.Logger
19+
20+
connection *amqp.Connection
21+
channel *amqp.Channel
22+
}
23+
24+
// NewNotificationPublisher returns an instance of NotificationPublisher
25+
func NewNotificationPublisher(amqpDSN, queue string, logger goengine.Logger) (*NotificationPublisher, error) {
26+
return &NotificationPublisher{
27+
amqpDSN: amqpDSN,
28+
queue: queue,
29+
logger: logger,
30+
}, nil
31+
}
32+
33+
// Publish sends a ProjectionNotification to Queue
34+
func (p *NotificationPublisher) Publish(ctx context.Context, notification *sql.ProjectionNotification) error {
35+
// Ignore nil notifications since this is not supported
36+
if notification == nil {
37+
p.logger.Warn("unable to handle nil notification, skipping", nil)
38+
return nil
39+
}
40+
41+
msgBody, err := easyjson.Marshal(notification)
42+
if err != nil {
43+
return err
44+
}
45+
46+
for {
47+
if p.connection == nil {
48+
p.connection, p.channel, err = setup(p.amqpDSN, p.queue)
49+
if err != nil {
50+
return err
51+
}
52+
}
53+
54+
err = p.channel.Publish("", p.queue, true, false, amqp.Publishing{
55+
Body: msgBody,
56+
})
57+
if err == amqp.ErrClosed || err == amqp.ErrFrame || err == amqp.ErrUnexpectedFrame {
58+
if err := p.connection.Close(); err != nil {
59+
p.logger.Error("failed to close amqp connection", func(entry goengine.LoggerEntry) {
60+
entry.Error(err)
61+
})
62+
}
63+
p.connection = nil
64+
p.channel = nil
65+
continue
66+
}
67+
68+
return err
69+
}
70+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/pkg/errors v0.8.0
1212
github.com/prometheus/client_golang v0.9.4
1313
github.com/sirupsen/logrus v1.2.0
14+
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
1415
github.com/stretchr/testify v1.3.0
1516
go.uber.org/atomic v1.3.2 // indirect
1617
go.uber.org/multierr v1.1.0 // indirect

go.sum

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
4848
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
4949
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
5050
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
51-
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
5251
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
5352
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
53+
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
54+
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
5455
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5556
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5657
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -62,13 +63,11 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
6263
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
6364
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
6465
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
65-
golang.org/x/crypto v0.0.0-20180808211826-de0752318171/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
6666
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
6767
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
6868
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
6969
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
7070
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
71-
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7271
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7372
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94=
7473
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

0 commit comments

Comments
 (0)