Skip to content

Commit 6ece76d

Browse files
committed
Removed logrus dependency from the project
1 parent 88b2f1a commit 6ece76d

File tree

9 files changed

+90
-51
lines changed

9 files changed

+90
-51
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Engine is divided in a few small independent components.
2626
## Install
2727

2828
```sh
29-
go get github.com/hellofresh/goengine
29+
go get -u github.com/hellofresh/goengine
3030
```
3131

3232
## Usage

aggregate_repository.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package goengine
22

33
import (
44
"fmt"
5-
6-
log "github.com/sirupsen/logrus"
75
)
86

97
type AggregateRepository interface {
@@ -22,7 +20,7 @@ func NewPublisherRepository(eventStore EventStore, eventBus VersionedEventPublis
2220
}
2321

2422
func (r *PublisherRepository) Load(id string, streamName StreamName) (*EventStream, error) {
25-
log.WithFields(log.Fields{"stream": streamName, "id": id}).Debug("Loading events from stream for aggregate")
23+
Log("Loading events from stream for aggregate", map[string]interface{}{"stream": streamName, "id": id}, nil)
2624
stream, err := r.EventStore.GetEventsFor(streamName, id)
2725
if nil != err {
2826
return nil, err
@@ -34,14 +32,15 @@ func (r *PublisherRepository) Load(id string, streamName StreamName) (*EventStre
3432
func (r *PublisherRepository) Save(aggregateRoot AggregateRoot, streamName StreamName) error {
3533
events := aggregateRoot.GetUncommittedEvents()
3634
eventStream := NewEventStream(streamName, events)
37-
log.WithFields(log.Fields{"count": len(events), "stream": streamName}).Debug("Saving events to stream")
35+
Log("Saving events to stream", map[string]interface{}{"count": len(events), "stream": streamName}, nil)
36+
3837
err := r.EventStore.Append(eventStream)
3938
if nil != err {
4039
return err
4140
}
4241

4342
if nil == r.EventBus {
44-
log.Debug("Event bus not detected, skipping publishing events")
43+
Log("Event bus not detected, skipping publishing events", nil, nil)
4544
return nil
4645
}
4746

@@ -53,7 +52,8 @@ func (r *PublisherRepository) Save(aggregateRoot AggregateRoot, streamName Strea
5352
}
5453

5554
func (r *PublisherRepository) Reconstitute(id string, source AggregateRoot, streamName StreamName) error {
56-
log.WithFields(log.Fields{"stream": streamName, "id": id}).Debug("Reconstituting aggregate from stream")
55+
Log("Reconstituting aggregate from stream", map[string]interface{}{"stream": streamName, "id": id}, nil)
56+
5757
stream, err := r.Load(id, streamName)
5858
if nil != err {
5959
return err
@@ -69,6 +69,6 @@ func (r *PublisherRepository) Reconstitute(id string, source AggregateRoot, stre
6969
}
7070

7171
source.SetVersion(events[len(events)-1].Version)
72-
log.WithField("id", id).Debug("Aggregate reconstituted")
72+
Log("Aggregate reconstituted", map[string]interface{}{"id": id}, nil)
7373
return nil
7474
}

aggregate_root.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/hellofresh/goengine/reflection"
77
"github.com/pborman/uuid"
8-
log "github.com/sirupsen/logrus"
98
)
109

1110
type AggregateRoot interface {
@@ -48,7 +47,7 @@ func (r *AggregateRootBased) SetVersion(version int) {
4847
func (r *AggregateRootBased) GetUncommittedEvents() []*DomainMessage {
4948
stream := r.uncommittedEvents
5049
r.uncommittedEvents = nil
51-
log.WithField("count", len(stream)).Debug("Uncommitted events cleaned")
50+
Log("Uncommitted events cleaned", map[string]interface{}{"count": len(stream)}, nil)
5251

5352
return stream
5453
}
@@ -57,10 +56,10 @@ func (r *AggregateRootBased) Apply(event DomainEvent) {
5756
t := reflection.TypeOf(event)
5857
methodName := fmt.Sprintf("When%s", t.Name())
5958

60-
entry := log.WithFields(log.Fields{"source": fmt.Sprintf("%+v", r.source), "method": methodName, "event": fmt.Sprintf("%+v", event)})
61-
entry.Debug("Applying event")
59+
fields := map[string]interface{}{"source": fmt.Sprintf("%+v", r.source), "method": methodName, "event": fmt.Sprintf("%+v", event)}
60+
Log("Applying event", fields, nil)
6261
reflection.CallMethod(r.source, methodName, event)
63-
entry.Debug("Event applied")
62+
Log("Event applied", fields, nil)
6463
}
6564

6665
func (r *AggregateRootBased) RecordThat(event DomainEvent) {
@@ -72,5 +71,5 @@ func (r *AggregateRootBased) RecordThat(event DomainEvent) {
7271
func (r *AggregateRootBased) Record(event DomainEvent) {
7372
message := RecordNow(r.ID, r.version, event)
7473
r.uncommittedEvents = append(r.uncommittedEvents, message)
75-
log.Debug("Event recorded")
74+
Log("Event recorded", nil, nil)
7675
}

cmd/goengine/main.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,58 +6,59 @@ import (
66
"github.com/hellofresh/goengine"
77
"github.com/hellofresh/goengine/mongodb"
88
"github.com/hellofresh/goengine/rabbit"
9-
log "github.com/sirupsen/logrus"
109
"gopkg.in/mgo.v2"
1110
)
1211

1312
func main() {
14-
log.SetLevel(log.DebugLevel)
1513
var streamName goengine.StreamName = "test"
1614

1715
mongoDSN := os.Getenv("STORAGE_DSN")
18-
log.WithField("dsn", mongoDSN).Debug("Connecting to the database")
16+
goengine.Log("Connecting to the database", map[string]interface{}{"dsn": mongoDSN}, nil)
1917
session, err := mgo.Dial(mongoDSN)
2018
if err != nil {
21-
log.Panic(err)
19+
goengine.Log("Failed to connect to Mongo", nil, err)
20+
panic(err)
2221
}
2322
defer session.Close()
2423

2524
// Optional. Switch the session to a monotonic behavior.
2625
session.SetMode(mgo.Monotonic, true)
2726

28-
log.Info("Setting up the registry")
27+
goengine.Log("Setting up the registry", nil, nil)
2928
registry := goengine.NewInMemoryTypeRegistry()
3029
registry.RegisterType(&RecipeCreated{})
3130
registry.RegisterType(&RecipeRated{})
3231

33-
log.Info("Setting up the event bus")
3432
// bus := inmemory.NewInMemoryEventBus()
35-
bus := rabbit.NewEventBus(os.Getenv("BROKER_DSN"), "events", "events")
33+
brokerDSN := os.Getenv("BROKER_DSN")
34+
goengine.Log("Setting up the event bus", map[string]interface{}{"dsn": brokerDSN}, nil)
35+
bus := rabbit.NewEventBus(brokerDSN, "events", "events")
3636

37-
log.Info("Setting up the event store")
37+
goengine.Log("Setting up the event store", nil, nil)
3838
es := mongodb.NewEventStore(session, registry)
3939

4040
eventDispatcher := goengine.NewVersionedEventDispatchManager(bus, registry)
4141
eventDispatcher.RegisterEventHandler(&RecipeCreated{}, func(event *goengine.DomainMessage) error {
42-
log.Debug("Event received")
42+
goengine.Log("Event received", nil, nil)
4343
return nil
4444
})
4545

4646
stopChannel := make(chan bool)
4747
go eventDispatcher.Listen(stopChannel, false)
4848

49-
log.Info("Creating a recipe")
49+
goengine.Log("Creating a recipe", nil, nil)
5050
aggregateRoot := CreateScenario(streamName)
5151

5252
repository := goengine.NewPublisherRepository(es, bus)
5353
repository.Save(aggregateRoot, streamName)
5454

5555
_, err = NewRecipeFromHisotry(aggregateRoot.ID, streamName, repository)
5656
if err != nil {
57-
log.Panic(err)
57+
goengine.Log("Failed to connect to Mongo", nil, err)
58+
panic(err)
5859
}
5960

60-
log.Println("Stop channel")
61+
goengine.Log("Stopping channel", nil, err)
6162
stopChannel <- true
6263
}
6364

event_dispatcher_manager.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package goengine
22

3-
import (
4-
log "github.com/sirupsen/logrus"
5-
)
6-
73
// VersionedEventDispatchManager is responsible for coordinating receiving messages
84
// from event receivers and dispatching them to the event dispatcher.
95
type VersionedEventDispatchManager struct {
@@ -52,25 +48,25 @@ func (m *VersionedEventDispatchManager) Listen(stop <-chan bool, exclusive bool)
5248
// signifying successful processing of the message.
5349
// This should eventually call an event handler. See NewVersionedEventDispatcher()
5450
case event := <-receiveEventChannel:
55-
entry := log.WithField("event", event.Event)
56-
entry.Debugf("EventDispatchManager.DispatchEvent")
51+
fields := map[string]interface{}{"event": event.Event}
52+
Log("EventDispatchManager.DispatchEvent", fields, nil)
5753
if err := m.versionedEventDispatcher.DispatchEvent(event.Event); err != nil {
58-
entry.WithError(err).Error("Error dispatching event")
54+
Log("Error dispatching event", fields, err)
5955
}
6056

6157
event.ProcessedSuccessfully <- true
62-
entry.Debug("EventDispatchManager.DispatchSuccessful")
58+
Log("EventDispatchManager.DispatchSuccessful", fields, nil)
6359

6460
case <-stop:
65-
log.Debug("EventDispatchManager.Stopping")
61+
Log("EventDispatchManager.Stopping", nil, nil)
6662
closeSignal := make(chan error)
6763
closeChannel <- closeSignal
68-
log.Debug("EventDispatchManager.Stopped")
64+
Log("EventDispatchManager.Stopped", nil, nil)
6965
return <-closeSignal
7066

7167
// Receiving on this channel signifys an error has occurred worker processor side
7268
case err := <-errorChannel:
73-
log.WithError(err).Debugf("EventDispatchManager.ErrorReceived")
69+
Log("EventDispatchManager.ErrorReceived", nil, err)
7470
return err
7571
}
7672
}

logging.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package goengine
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os"
7+
"strings"
8+
)
9+
10+
type LogHandler func(msg string, fields map[string]interface{}, err error)
11+
12+
var logHandler LogHandler
13+
14+
func init() {
15+
l := log.New(os.Stderr, "[GoEngine] ", log.LstdFlags)
16+
17+
SetLogHandler(func(msg string, fields map[string]interface{}, err error) {
18+
if nil != err {
19+
if nil == fields {
20+
fields = make(map[string]interface{})
21+
}
22+
fields["error"] = err.Error()
23+
}
24+
25+
msgParts := make([]string, len(fields)+1)
26+
27+
msgParts[0] = msg
28+
idx := 1
29+
for k, v := range fields {
30+
msgParts[idx] = fmt.Sprintf("%s=%v", k, v)
31+
idx++
32+
}
33+
34+
l.Println(strings.Join(msgParts, "\t"))
35+
})
36+
}
37+
38+
func SetLogHandler(handler LogHandler) {
39+
logHandler = handler
40+
}
41+
42+
func Log(msg string, fields map[string]interface{}, err error) {
43+
logHandler(msg, fields, err)
44+
}

rabbit/eventbus.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/hellofresh/goengine"
1010
"github.com/hellofresh/goengine/reflection"
11-
log "github.com/sirupsen/logrus"
1211
"github.com/streadway/amqp"
1312
)
1413

@@ -115,29 +114,28 @@ func (bus *EventBus) ReceiveEvents(options goengine.VersionedEventReceiverOption
115114
} else {
116115
domainEvent, err := bus.fromRawEvent(options.TypeRegistry, &raw)
117116
if nil != err {
118-
log.Debugf("EventBus.Cannot find event type %s", raw.Type)
117+
goengine.Log("EventBus.Cannot find event type", map[string]interface{}{"type": raw.Type}, nil)
119118
options.Error <- errors.New("Cannot find event type " + raw.Type)
120119
message.Ack(true)
121120
} else {
122121
ackCh := make(chan bool)
123122

124-
log.Debug("EventBus.Dispatching Message")
123+
goengine.Log("EventBus.Dispatching Message", nil, nil)
125124
options.ReceiveEvent <- goengine.VersionedEventTransactedAccept{Event: domainEvent, ProcessedSuccessfully: ackCh}
126125
result := <-ackCh
127126
message.Ack(result)
128127
}
129128
}
130129
} else {
131-
log.Debug("RabbitMQ: Could have been disconnected")
130+
goengine.Log("RabbitMQ: Could have been disconnected", nil, nil)
132131
for {
133132
retryError := exponential(func() error {
134133
connR, cR, eventsR, errR := bus.consumeEventsQueue(options.Exclusive)
135134
if errR == nil {
136135
conn, c, events, err = connR, cR, eventsR, errR
137136
}
138137

139-
log.WithError(err).Debug("Failed to reconnect to RabbitMQ")
140-
138+
goengine.Log("Failed to reconnect to RabbitMQ", nil, err)
141139
return errR
142140
}, 5)
143141

rabbit/retry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"math"
55
"time"
66

7-
log "github.com/sirupsen/logrus"
7+
"github.com/hellofresh/goengine"
88
)
99

1010
type function func() error
@@ -26,7 +26,7 @@ func exponential(operation function, maxRetries int) error {
2626

2727
sleepTime := time.Duration(sleepMs) * time.Millisecond
2828
time.Sleep(sleepTime)
29-
log.WithFields(log.Fields{"attempt": i, "sleep": sleepTime.String()}).Debug("Retry exponential")
29+
goengine.Log("Retry exponential", map[string]interface{}{"attempt": i, "sleep": sleepTime.String()}, err)
3030
}
3131

3232
return err

type_registry.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/hellofresh/goengine/errors"
77
"github.com/hellofresh/goengine/reflection"
8-
log "github.com/sirupsen/logrus"
98
)
109

1110
// TypeRegistry is a registry for go types
@@ -35,16 +34,18 @@ func NewInMemoryTypeRegistry() *InMemoryTypeRegistry {
3534
func (r *InMemoryTypeRegistry) RegisterType(i interface{}) {
3635
rawType := reflection.TypeOf(i)
3736
r.types[rawType.String()] = rawType
38-
log.WithField("type", rawType.String()).Debug("Type was registered")
37+
Log("Type was registered", map[string]interface{}{"type": rawType.String()}, nil)
3938
}
4039

4140
func (r *InMemoryTypeRegistry) RegisterAggregate(aggregate AggregateRoot, events ...interface{}) {
4241
r.RegisterType(aggregate)
43-
entry := log.WithField("aggregate", aggregate.GetID())
44-
entry.Debug("Aggregate was registered")
42+
43+
fields := map[string]interface{}{"aggregate": aggregate.GetID()}
44+
Log("Aggregate was registered", fields, nil)
4545

4646
r.RegisterEvents(events)
47-
entry.WithField("count", len(events)).Debug("Events were registered for aggregate")
47+
fields["count"] = len(events)
48+
Log("Events were registered for aggregate", fields, nil)
4849
}
4950

5051
func (r *InMemoryTypeRegistry) RegisterEvents(events ...interface{}) {
@@ -67,6 +68,6 @@ func (r *InMemoryTypeRegistry) Get(name string) (interface{}, error) {
6768
return reflect.New(typ).Interface(), nil
6869
}
6970

70-
log.WithField("type", name).Debug("Type not found")
71+
Log("Type not found", map[string]interface{}{"type": name}, nil)
7172
return nil, errors.ErrorTypeNotFound
7273
}

0 commit comments

Comments
 (0)