Skip to content

Commit 77734a9

Browse files
committed
Fixed mongo usage
1 parent aceeab0 commit 77734a9

File tree

3 files changed

+27
-30
lines changed

3 files changed

+27
-30
lines changed

cmd/goengine/main.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ import (
1111
"github.com/hellofresh/goengine/rabbit"
1212
"github.com/mongodb/mongo-go-driver/mongo"
1313
"github.com/mongodb/mongo-go-driver/mongo/options"
14-
"github.com/mongodb/mongo-go-driver/mongo/readconcern"
15-
"github.com/mongodb/mongo-go-driver/mongo/readpref"
16-
"github.com/mongodb/mongo-go-driver/mongo/writeconcern"
1714
)
1815

1916
func main() {
@@ -31,11 +28,7 @@ func main() {
3128
goengine.Log("Connecting to the database", map[string]interface{}{"dsn": mongoDSN}, nil)
3229
mongoClient, err := mongo.NewClientWithOptions(
3330
mongoDSN,
34-
options.Client().
35-
SetAppName("goengine").
36-
SetReadConcern(readconcern.Linearizable()).
37-
SetReadPreference(readpref.Nearest()).
38-
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
31+
options.Client().SetAppName("goengine"),
3932
)
4033
if err != nil {
4134
goengine.Log("Failed to create new Mongo mongoClient", nil, err)
@@ -68,11 +61,11 @@ func main() {
6861
bus := rabbit.NewEventBus(brokerDSN, "events", "events")
6962

7063
goengine.Log("Setting up the event store", nil, nil)
71-
es := mongodb.NewEventStore(mongoClient.Database("event_store"), registry)
64+
eventStore := mongodb.NewEventStore(mongoClient.Database("event_store"), registry)
7265

7366
eventDispatcher := goengine.NewVersionedEventDispatchManager(bus, registry)
7467
eventDispatcher.RegisterEventHandler(&RecipeCreated{}, func(event *goengine.DomainMessage) error {
75-
goengine.Log("Event received", nil, nil)
68+
goengine.Log("Event received", map[string]interface{}{"event": event}, nil)
7669
return nil
7770
})
7871

@@ -82,12 +75,15 @@ func main() {
8275
goengine.Log("Creating a recipe", nil, nil)
8376
aggregateRoot := CreateScenario()
8477

85-
repository := goengine.NewPublisherRepository(es, bus)
86-
repository.Save(aggregateRoot, streamName)
78+
repository := goengine.NewPublisherRepository(eventStore, bus)
79+
if err := repository.Save(aggregateRoot, streamName); err != nil {
80+
goengine.Log("Failed to save aggregate to stream", nil, err)
81+
panic(err)
82+
}
8783

88-
_, err = NewRecipeFromHisotry(aggregateRoot.ID, streamName, repository)
84+
_, err = NewRecipeFromHistory(aggregateRoot.ID, streamName, repository)
8985
if err != nil {
90-
goengine.Log("Failed to connect to Mongo", nil, err)
86+
goengine.Log("Failed get a recipe from history", nil, err)
9187
panic(err)
9288
}
9389

cmd/goengine/model.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,21 @@ import (
77
)
88

99
type RecipeCreated struct {
10-
ocurredOn time.Time
11-
Name string
10+
occurredOn time.Time
11+
Name string
1212
}
1313

14-
func (e RecipeCreated) OcurredOn() time.Time {
15-
return e.ocurredOn
14+
func (e RecipeCreated) OccurredOn() time.Time {
15+
return e.occurredOn
1616
}
1717

1818
type RecipeRated struct {
19-
ocurredOn time.Time
20-
Rating int
19+
occurredOn time.Time
20+
Rating int
2121
}
2222

23-
func (e RecipeRated) OcurredOn() time.Time {
24-
return e.ocurredOn
23+
func (e RecipeRated) OccurredOn() time.Time {
24+
return e.occurredOn
2525
}
2626

2727
type Recipe struct {
@@ -38,7 +38,7 @@ func NewRecipe(name string) *Recipe {
3838
return recipe
3939
}
4040

41-
func NewRecipeFromHisotry(id string, streamName goengine.StreamName, repo goengine.AggregateRepository) (*Recipe, error) {
41+
func NewRecipeFromHistory(id string, streamName goengine.StreamName, repo goengine.AggregateRepository) (*Recipe, error) {
4242
recipe := new(Recipe)
4343
recipe.AggregateRootBased = goengine.NewEventSourceBasedWithID(recipe, id)
4444
err := repo.Reconstitute(id, recipe, streamName)

mongodb/eventstore.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/mongodb/mongo-go-driver/bson"
1111
"github.com/mongodb/mongo-go-driver/mongo"
1212
"github.com/mongodb/mongo-go-driver/mongo/options"
13+
"github.com/mongodb/mongo-go-driver/x/bsonx"
1314
)
1415

1516
// MongoEvent represents an event on mongodb
@@ -62,7 +63,7 @@ func (s *MongoDBEventStore) Append(events *goengine.EventStream) error {
6263

6364
// GetEventsFor gets events for an id on the specified stream
6465
func (s *MongoDBEventStore) GetEventsFor(streamName goengine.StreamName, id string) (*goengine.EventStream, error) {
65-
var mongoEvents []*MongoEvent
66+
var mongoEvents []MongoEvent
6667
coll := s.mongoDB.Collection(string(streamName))
6768

6869
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -74,8 +75,8 @@ func (s *MongoDBEventStore) GetEventsFor(streamName goengine.StreamName, id stri
7475
}
7576

7677
for cur.Next(ctx) {
77-
var mongoEvent *MongoEvent
78-
err := cur.Decode(mongoEvent)
78+
var mongoEvent MongoEvent
79+
err := cur.Decode(&mongoEvent)
7980
if err != nil {
8081
return nil, err
8182
}
@@ -102,7 +103,7 @@ func (s *MongoDBEventStore) GetEventsFor(streamName goengine.StreamName, id stri
102103

103104
// FromVersion gets events for an id and version on the specified stream
104105
func (s *MongoDBEventStore) FromVersion(streamName goengine.StreamName, id string, version int) (*goengine.EventStream, error) {
105-
var mongoEvents []*MongoEvent
106+
var mongoEvents []MongoEvent
106107
coll := s.mongoDB.Collection(string(streamName))
107108

108109
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -121,7 +122,7 @@ func (s *MongoDBEventStore) FromVersion(streamName goengine.StreamName, id strin
121122
}
122123

123124
for cur.Next(ctx) {
124-
var mongoEvent *MongoEvent
125+
var mongoEvent MongoEvent
125126
err := cur.Decode(mongoEvent)
126127
if err != nil {
127128
return nil, err
@@ -162,7 +163,7 @@ func (s *MongoDBEventStore) createIndexes(c *mongo.Collection) error {
162163
_, err := c.Indexes().CreateOne(
163164
ctx,
164165
mongo.IndexModel{
165-
Keys: []string{"aggregate_id", "version"},
166+
Keys: bsonx.Doc{{"aggregate_id", bsonx.Int32(1)}, {"version", bsonx.Int32(-1)}},
166167
Options: options.Index().SetUnique(true).SetBackground(true),
167168
},
168169
)
@@ -185,7 +186,7 @@ func (s *MongoDBEventStore) toMongoEvent(event *goengine.DomainMessage) (*MongoE
185186
}, nil
186187
}
187188

188-
func (s *MongoDBEventStore) fromMongoEvent(mongoEvent *MongoEvent) (*goengine.DomainMessage, error) {
189+
func (s *MongoDBEventStore) fromMongoEvent(mongoEvent MongoEvent) (*goengine.DomainMessage, error) {
189190
event, err := s.registry.Get(mongoEvent.Type)
190191
if nil != err {
191192
return nil, err

0 commit comments

Comments
 (0)