Skip to content

Commit 5fe9093

Browse files
authored
Merge pull request #2 from hellofresh/feature/event-bus-rabbit
Initial rabbit event dispatcher
2 parents 16a2728 + a06f75c commit 5fe9093

File tree

5 files changed

+290
-11
lines changed

5 files changed

+290
-11
lines changed

cmd/goengine/main.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55

66
log "github.com/Sirupsen/logrus"
77
"github.com/hellofresh/goengine"
8-
"github.com/hellofresh/goengine/inmemory"
98
"github.com/hellofresh/goengine/mongodb"
9+
"github.com/hellofresh/goengine/rabbit"
1010

1111
"gopkg.in/mgo.v2"
1212
)
@@ -32,16 +32,13 @@ func main() {
3232
registry.RegisterType(&RecipeRated{})
3333

3434
log.Info("Setting up the event bus")
35-
bus := inmemory.NewInMemoryEventBus()
35+
// bus := inmemory.NewInMemoryEventBus()
36+
bus := rabbit.NewEventBus(os.Getenv("BROKER_DSN"), "events", "events")
3637

3738
log.Info("Setting up the event store")
3839
es := mongodb.NewEventStore(session, registry)
3940

40-
log.Info("Creating a recipe")
41-
repository := goengine.NewPublisherRepository(es, bus)
42-
4341
eventDispatcher := goengine.NewVersionedEventDispatchManager(bus, registry)
44-
4542
eventDispatcher.RegisterEventHandler(&RecipeCreated{}, func(event *goengine.DomainMessage) error {
4643
log.Debug("Event received")
4744
return nil
@@ -53,17 +50,16 @@ func main() {
5350
log.Info("Creating a recipe")
5451
aggregateRoot := CreateScenario(streamName)
5552

53+
repository := goengine.NewPublisherRepository(es, bus)
5654
repository.Save(aggregateRoot, streamName)
5755

58-
history, err := NewRecipeFromHisotry(aggregateRoot.ID, streamName, repository)
56+
_, err = NewRecipeFromHisotry(aggregateRoot.ID, streamName, repository)
5957
if err != nil {
6058
log.Panic(err)
6159
}
6260

6361
log.Println("Stop channel")
6462
stopChannel <- true
65-
66-
log.Info(history)
6763
}
6864

6965
func CreateScenario(streamName goengine.StreamName) *Recipe {

docker-compose.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
version: '2'
22
services:
33

4+
rabbitmq:
5+
image: rabbitmq:3.6.1-management
6+
ports:
7+
- "15672:15672"
8+
- "5672:5672"
9+
410
redis:
511
image: redis
612

event_dispatcher_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import (
44
log "github.com/Sirupsen/logrus"
55
)
66

7-
// VersionedEventDispatchManager is responsible for coordinating receiving messages from event receivers and dispatching them to the event dispatcher.
7+
// VersionedEventDispatchManager is responsible for coordinating receiving messages
8+
// from event receivers and dispatching them to the event dispatcher.
89
type VersionedEventDispatchManager struct {
910
versionedEventDispatcher *MapBasedVersionedEventDispatcher
1011
typeRegistry TypeRegistry
@@ -47,7 +48,8 @@ func (m *VersionedEventDispatchManager) Listen(stop <-chan bool, exclusive bool)
4748
for {
4849
// Wait on multiple channels using the select control flow.
4950
select {
50-
// Version event received channel receives a result with a channel to respond to, signifying successful processing of the message.
51+
// Version event received channel receives a result with a channel to respond to,
52+
// signifying successful processing of the message.
5153
// This should eventually call an event handler. See NewVersionedEventDispatcher()
5254
case event := <-receiveEventChannel:
5355
log.Debugf("EventDispatchManager.DispatchEvent: %s", event.Event)

rabbit/eventbus.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
package rabbit
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
log "github.com/Sirupsen/logrus"
10+
"github.com/hellofresh/goengine"
11+
"github.com/hellofresh/goengine/reflection"
12+
"github.com/streadway/amqp"
13+
)
14+
15+
// RawVersionedEvent represents the event that goes into rabbitmq
16+
type RawVersionedEvent struct {
17+
ID string `bson:"aggregate_id,omitempty"`
18+
Version int `bson:"version"`
19+
Type string `bson:"type"`
20+
Payload json.RawMessage `bson:"payload"`
21+
RecordedOn time.Time `bson:"recorded_on"`
22+
}
23+
24+
// EventBus ...
25+
type EventBus struct {
26+
brokerDSN string
27+
name string
28+
exchange string
29+
}
30+
31+
// NewEventBus ...
32+
func NewEventBus(brokerDSN string, name string, exchange string) *EventBus {
33+
return &EventBus{brokerDSN, name, exchange}
34+
}
35+
36+
// PublishEvents will publish events
37+
func (bus *EventBus) PublishEvents(events []*goengine.DomainMessage) error {
38+
// Connects opens an AMQP connection from the credentials in the URL.
39+
conn, err := amqp.Dial(bus.brokerDSN)
40+
if err != nil {
41+
return err
42+
}
43+
44+
// This waits for a server acknowledgment which means the sockets will have
45+
// flushed all outbound publishings prior to returning. It's important to
46+
// block on Close to not lose any publishings.
47+
defer conn.Close()
48+
49+
c, err := conn.Channel()
50+
if err != nil {
51+
return fmt.Errorf("channel.open: %s", err)
52+
}
53+
54+
// We declare our topology on both the publisher and consumer to ensure they
55+
// are the same. This is part of AMQP being a programmable messaging model.
56+
//
57+
// See the Channel.Consume example for the complimentary declare.
58+
err = c.ExchangeDeclare(bus.exchange, "fanout", true, false, false, false, nil)
59+
if err != nil {
60+
return fmt.Errorf("exchange.declare: %v", err)
61+
}
62+
63+
for _, event := range events {
64+
rabbitEvent, err := bus.toRawEvent(event)
65+
if nil != err {
66+
return err
67+
}
68+
69+
encodedEvent, err := json.Marshal(rabbitEvent)
70+
if nil != err {
71+
return err
72+
}
73+
74+
// Prepare this message to be persistent. Your publishing requirements may
75+
// be different.
76+
msg := amqp.Publishing{
77+
DeliveryMode: amqp.Persistent,
78+
Timestamp: time.Now(),
79+
ContentEncoding: "UTF-8",
80+
ContentType: "application/json",
81+
Body: encodedEvent,
82+
}
83+
84+
err = c.Publish(bus.exchange, "", true, false, msg)
85+
if err != nil {
86+
// Since publish is asynchronous this can happen if the network connection
87+
// is reset or if the server has run out of resources.
88+
return fmt.Errorf("basic.publish: %v", err)
89+
}
90+
}
91+
92+
return nil
93+
}
94+
95+
// ReceiveEvents will receive events
96+
func (bus *EventBus) ReceiveEvents(options goengine.VersionedEventReceiverOptions) error {
97+
conn, c, events, err := bus.consumeEventsQueue(options.Exclusive)
98+
if err != nil {
99+
return err
100+
}
101+
102+
go func() {
103+
for {
104+
select {
105+
case ch := <-options.Close:
106+
defer conn.Close()
107+
ch <- c.Cancel(bus.name, false)
108+
return
109+
110+
case message, more := <-events:
111+
if more {
112+
var raw RawVersionedEvent
113+
if err := json.Unmarshal(message.Body, &raw); err != nil {
114+
options.Error <- fmt.Errorf("json.Unmarshal received event: %v", err)
115+
} else {
116+
domainEvent, err := bus.fromRawEvent(options.TypeRegistry, &raw)
117+
if nil != err {
118+
log.Debugf("EventBus.Cannot find event type %s", raw.Type)
119+
options.Error <- errors.New("Cannot find event type " + raw.Type)
120+
message.Ack(true)
121+
} else {
122+
ackCh := make(chan bool)
123+
124+
log.Debug("EventBus.Dispatching Message")
125+
options.ReceiveEvent <- goengine.VersionedEventTransactedAccept{Event: domainEvent, ProcessedSuccessfully: ackCh}
126+
result := <-ackCh
127+
message.Ack(result)
128+
}
129+
}
130+
} else {
131+
log.Debug("RabbitMQ: Could have been disconnected")
132+
for {
133+
retryError := exponential(func() error {
134+
connR, cR, eventsR, errR := bus.consumeEventsQueue(options.Exclusive)
135+
if errR == nil {
136+
conn, c, events, err = connR, cR, eventsR, errR
137+
}
138+
139+
log.Debug(err)
140+
141+
return errR
142+
}, 5)
143+
144+
if retryError == nil {
145+
break
146+
}
147+
}
148+
}
149+
}
150+
}
151+
}()
152+
153+
return nil
154+
}
155+
156+
// DeleteQueue will delete a queue
157+
func (bus *EventBus) DeleteQueue(name string) error {
158+
conn, err := amqp.Dial(bus.brokerDSN)
159+
if err != nil {
160+
return err
161+
}
162+
163+
c, err := conn.Channel()
164+
if err != nil {
165+
return fmt.Errorf("channel.open: %s", err)
166+
}
167+
168+
_, err = c.QueueDelete(name, false, false, true)
169+
return err
170+
}
171+
172+
func (bus *EventBus) consumeEventsQueue(exclusive bool) (*amqp.Connection, *amqp.Channel, <-chan amqp.Delivery, error) {
173+
conn, err := amqp.Dial(bus.brokerDSN)
174+
if err != nil {
175+
return nil, nil, nil, err
176+
}
177+
178+
c, err := conn.Channel()
179+
if err != nil {
180+
return nil, nil, nil, fmt.Errorf("channel.open: %s", err)
181+
}
182+
183+
// We declare our topology on both the publisher and consumer to ensure they
184+
// are the same. This is part of AMQP being a programmable messaging model.
185+
//
186+
// See the Channel.Consume example for the complimentary declare.
187+
err = c.ExchangeDeclare(bus.exchange, "fanout", true, false, false, false, nil)
188+
if err != nil {
189+
return nil, nil, nil, fmt.Errorf("exchange.declare: %v", err)
190+
}
191+
192+
if _, err = c.QueueDeclare(bus.name, true, false, false, false, nil); err != nil {
193+
return nil, nil, nil, fmt.Errorf("queue.declare: %v", err)
194+
}
195+
196+
if err = c.QueueBind(bus.name, bus.name, bus.exchange, false, nil); err != nil {
197+
return nil, nil, nil, fmt.Errorf("queue.bind: %v", err)
198+
}
199+
200+
events, err := c.Consume(bus.name, bus.name, false, exclusive, false, false, nil)
201+
if err != nil {
202+
return nil, nil, nil, fmt.Errorf("basic.consume: %v", err)
203+
}
204+
205+
if err := c.Qos(1, 0, false); err != nil {
206+
return nil, nil, nil, fmt.Errorf("Qos: %v", err)
207+
}
208+
209+
return conn, c, events, nil
210+
}
211+
212+
func (bus *EventBus) toRawEvent(event *goengine.DomainMessage) (*RawVersionedEvent, error) {
213+
serializedPayload, err := json.Marshal(event.Payload)
214+
if nil != err {
215+
return nil, err
216+
}
217+
218+
typeName := reflection.TypeOf(event.Payload)
219+
return &RawVersionedEvent{
220+
ID: event.ID,
221+
Version: event.Version,
222+
Type: typeName.String(),
223+
Payload: serializedPayload,
224+
RecordedOn: event.RecordedOn,
225+
}, nil
226+
}
227+
228+
func (bus *EventBus) fromRawEvent(typeRegistry goengine.TypeRegistry, rabbitEvent *RawVersionedEvent) (*goengine.DomainMessage, error) {
229+
event, err := typeRegistry.Get(rabbitEvent.Type)
230+
if nil != err {
231+
return nil, err
232+
}
233+
234+
err = json.Unmarshal(rabbitEvent.Payload, event)
235+
if nil != err {
236+
return nil, err
237+
}
238+
239+
return goengine.NewDomainMessage(
240+
rabbitEvent.ID,
241+
rabbitEvent.Version,
242+
event.(goengine.DomainEvent),
243+
rabbitEvent.RecordedOn,
244+
), nil
245+
}

rabbit/retry.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package rabbit
2+
3+
import (
4+
"math"
5+
"time"
6+
7+
log "github.com/Sirupsen/logrus"
8+
)
9+
10+
type function func() error
11+
12+
func exponential(operation function, maxRetries int) error {
13+
var err error
14+
var sleepTime int
15+
for i := 0; i < maxRetries; i++ {
16+
err = operation()
17+
if err == nil {
18+
return nil
19+
}
20+
if i == 0 {
21+
sleepTime = 1
22+
} else {
23+
sleepTime = int(math.Exp2(float64(i)) * 100)
24+
}
25+
time.Sleep(time.Duration(sleepTime) * time.Millisecond)
26+
log.Debugf("Retry exponential: Attempt %d, sleep %d", i, sleepTime)
27+
}
28+
29+
return err
30+
}

0 commit comments

Comments
 (0)