Skip to content

Commit b9b0848

Browse files
committed
Adde comment stubs, made nicer naming
1 parent 7eb4de2 commit b9b0848

File tree

16 files changed

+88
-41
lines changed

16 files changed

+88
-41
lines changed

aggregate_repository.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
package goengine
22

3-
import (
4-
"fmt"
5-
)
3+
import "fmt"
64

5+
// AggregateRepository ...
76
type AggregateRepository interface {
87
Load(string, StreamName) (*EventStream, error)
98
Save(AggregateRoot, StreamName) error
109
Reconstitute(string, AggregateRoot, StreamName) error
1110
}
1211

12+
// PublisherRepository ...
1313
type PublisherRepository struct {
1414
EventStore EventStore
1515
EventBus VersionedEventPublisher
1616
}
1717

18+
// NewPublisherRepository ...
1819
func NewPublisherRepository(eventStore EventStore, eventBus VersionedEventPublisher) *PublisherRepository {
1920
return &PublisherRepository{eventStore, eventBus}
2021
}
2122

23+
// Load ...
2224
func (r *PublisherRepository) Load(id string, streamName StreamName) (*EventStream, error) {
2325
Log("Loading events from stream for aggregate", map[string]interface{}{"stream": streamName, "id": id}, nil)
2426
stream, err := r.EventStore.GetEventsFor(streamName, id)
@@ -29,6 +31,7 @@ func (r *PublisherRepository) Load(id string, streamName StreamName) (*EventStre
2931
return stream, nil
3032
}
3133

34+
// Save ...
3235
func (r *PublisherRepository) Save(aggregateRoot AggregateRoot, streamName StreamName) error {
3336
events := aggregateRoot.GetUncommittedEvents()
3437
eventStream := NewEventStream(streamName, events)
@@ -51,6 +54,7 @@ func (r *PublisherRepository) Save(aggregateRoot AggregateRoot, streamName Strea
5154
return nil
5255
}
5356

57+
// Reconstitute ...
5458
func (r *PublisherRepository) Reconstitute(id string, source AggregateRoot, streamName StreamName) error {
5559
Log("Reconstituting aggregate from stream", map[string]interface{}{"stream": streamName, "id": id}, nil)
5660

@@ -61,7 +65,7 @@ func (r *PublisherRepository) Reconstitute(id string, source AggregateRoot, stre
6165
events := stream.Events
6266

6367
if len(events) == 0 {
64-
return fmt.Errorf("No events found for id: %s", id)
68+
return fmt.Errorf("no events found for id: %s", id)
6569
}
6670

6771
for _, event := range events {

aggregate_root.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/hellofresh/goengine/reflection"
88
)
99

10+
// AggregateRoot ...
1011
type AggregateRoot interface {
1112
GetID() string
1213
GetVersion() int
@@ -15,35 +16,40 @@ type AggregateRoot interface {
1516
GetUncommittedEvents() []*DomainMessage
1617
}
1718

19+
// AggregateRootBased ...
1820
type AggregateRootBased struct {
1921
ID string
2022
version int
2123
source interface{}
2224
uncommittedEvents []*DomainMessage
2325
}
2426

25-
// NewAggregateRootBased constructor
27+
// NewAggregateRootBased ...
2628
func NewAggregateRootBased(source interface{}) *AggregateRootBased {
2729
return NewEventSourceBasedWithID(source, uuid.Must(uuid.NewV4()).String())
2830
}
2931

30-
// NewEventSourceBasedWithID constructor
32+
// NewEventSourceBasedWithID ...
3133
func NewEventSourceBasedWithID(source interface{}, id string) *AggregateRootBased {
3234
return &AggregateRootBased{id, 0, source, []*DomainMessage{}}
3335
}
3436

37+
// GetID ...
3538
func (r *AggregateRootBased) GetID() string {
3639
return r.ID
3740
}
3841

42+
// GetVersion ...
3943
func (r *AggregateRootBased) GetVersion() int {
4044
return r.version
4145
}
4246

47+
// SetVersion ...
4348
func (r *AggregateRootBased) SetVersion(version int) {
4449
r.version = version
4550
}
4651

52+
// GetUncommittedEvents ...
4753
func (r *AggregateRootBased) GetUncommittedEvents() []*DomainMessage {
4854
stream := r.uncommittedEvents
4955
r.uncommittedEvents = nil
@@ -52,6 +58,7 @@ func (r *AggregateRootBased) GetUncommittedEvents() []*DomainMessage {
5258
return stream
5359
}
5460

61+
// Apply ...
5562
func (r *AggregateRootBased) Apply(event DomainEvent) {
5663
t := reflection.TypeOf(event)
5764
methodName := fmt.Sprintf("When%s", t.Name())
@@ -62,12 +69,14 @@ func (r *AggregateRootBased) Apply(event DomainEvent) {
6269
Log("Event applied", fields, nil)
6370
}
6471

72+
// RecordThat ...
6573
func (r *AggregateRootBased) RecordThat(event DomainEvent) {
6674
r.version++
6775
r.Apply(event)
6876
r.Record(event)
6977
}
7078

79+
// Record ...
7180
func (r *AggregateRootBased) Record(event DomainEvent) {
7281
message := RecordNow(r.ID, r.version, event)
7382
r.uncommittedEvents = append(r.uncommittedEvents, message)

cmd/goengine/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func main() {
7373
go eventDispatcher.Listen(stopChannel, false)
7474

7575
goengine.Log("Creating a recipe", nil, nil)
76-
aggregateRoot := CreateScenario()
76+
aggregateRoot := createScenario()
7777

7878
repository := goengine.NewPublisherRepository(eventStore, bus)
7979
if err := repository.Save(aggregateRoot, streamName); err != nil {
@@ -91,7 +91,7 @@ func main() {
9191
stopChannel <- true
9292
}
9393

94-
func CreateScenario() *Recipe {
94+
func createScenario() *Recipe {
9595
recipe := NewRecipe("Test Recipe")
9696
recipe.Rate(4)
9797
return recipe

cmd/goengine/model.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,36 @@ import (
66
"github.com/hellofresh/goengine"
77
)
88

9+
// RecipeCreated ...
910
type RecipeCreated struct {
1011
occurredOn time.Time
1112
Name string
1213
}
1314

15+
// OccurredOn ...
1416
func (e RecipeCreated) OccurredOn() time.Time {
1517
return e.occurredOn
1618
}
1719

20+
// RecipeRated ...
1821
type RecipeRated struct {
1922
occurredOn time.Time
2023
Rating int
2124
}
2225

26+
// OccurredOn ...
2327
func (e RecipeRated) OccurredOn() time.Time {
2428
return e.occurredOn
2529
}
2630

31+
// Recipe ...
2732
type Recipe struct {
2833
*goengine.AggregateRootBased
2934
Name string
3035
Rating int
3136
}
3237

38+
// NewRecipe ...
3339
func NewRecipe(name string) *Recipe {
3440
recipe := new(Recipe)
3541
recipe.AggregateRootBased = goengine.NewAggregateRootBased(recipe)
@@ -38,6 +44,7 @@ func NewRecipe(name string) *Recipe {
3844
return recipe
3945
}
4046

47+
// NewRecipeFromHistory ...
4148
func NewRecipeFromHistory(id string, streamName goengine.StreamName, repo goengine.AggregateRepository) (*Recipe, error) {
4249
recipe := new(Recipe)
4350
recipe.AggregateRootBased = goengine.NewEventSourceBasedWithID(recipe, id)
@@ -46,14 +53,17 @@ func NewRecipeFromHistory(id string, streamName goengine.StreamName, repo goengi
4653
return recipe, err
4754
}
4855

56+
// Rate ...
4957
func (r *Recipe) Rate(rate int) {
5058
r.RecordThat(&RecipeRated{time.Now(), rate})
5159
}
5260

61+
// WhenRecipeCreated ...
5362
func (r *Recipe) WhenRecipeCreated(event *RecipeCreated) {
5463
r.Name = event.Name
5564
}
5665

66+
// WhenRecipeRated ...
5767
func (r *Recipe) WhenRecipeRated(event *RecipeRated) {
5868
r.Rating = event.Rating
5969
}

domain_message.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,30 @@ import (
55
"time"
66
)
77

8+
// DomainEvent ...
89
type DomainEvent interface {
910
OccurredOn() time.Time
1011
}
1112

13+
// DomainMessage ...
1214
type DomainMessage struct {
1315
ID string `json:"aggregate_id,omitempty"`
1416
Version int `json:"version"`
1517
Payload DomainEvent `json:"payload"`
1618
RecordedOn time.Time `json:"recorded_on"`
1719
}
1820

21+
// String ...
1922
func (dm *DomainMessage) String() string {
2023
return fmt.Sprintf("DomainMessage{ ID: %s, Version: %d }", dm.ID, dm.Version)
2124
}
2225

26+
// NewDomainMessage ...
2327
func NewDomainMessage(id string, version int, payload DomainEvent, recordedOn time.Time) *DomainMessage {
2428
return &DomainMessage{id, version, payload, recordedOn}
2529
}
2630

31+
// RecordNow ...
2732
func RecordNow(id string, version int, payload DomainEvent) *DomainMessage {
2833
recordedTime := time.Now()
2934
return NewDomainMessage(id, version, payload, recordedTime)

errors/type.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package errors
33
import "errors"
44

55
var (
6-
ErrorTypeNotRegistred = errors.New("The type is not registereds")
7-
ErrorTypeNotStruct = errors.New("Input param is not a struct")
8-
ErrorTypeNotFound = errors.New("The type was not found")
6+
// ErrorTypeNotRegistered ...
7+
ErrorTypeNotRegistered = errors.New("the type is not registered")
8+
// ErrorTypeNotStruct ...
9+
ErrorTypeNotStruct = errors.New("input param is not a struct")
10+
// ErrorTypeNotFound ...
11+
ErrorTypeNotFound = errors.New("the type was not found")
912
)

event_store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package goengine
22

3+
// EventStore ...
34
type EventStore interface {
45
Append(events *EventStream) error
56
GetEventsFor(streamName StreamName, id string) (*EventStream, error)

event_stream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package goengine
22

3+
// StreamName ...
34
type StreamName string
45

6+
// EventStream ...
57
type EventStream struct {
68
Name StreamName
79
Events []*DomainMessage
810
}
911

12+
// NewEventStream ...
1013
func NewEventStream(name StreamName, events []*DomainMessage) *EventStream {
1114
return &EventStream{name, events}
1215
}

inmemory/eventbus.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
package inmemory
22

3-
import (
4-
"github.com/hellofresh/goengine"
5-
)
3+
import "github.com/hellofresh/goengine"
64

7-
// InMemoryEventBus provides an inmemory implementation of the VersionedEventPublisher VersionedEventReceiver interfaces
8-
type InMemoryEventBus struct {
5+
// EventBus provides an in memory implementation of the VersionedEventPublisher VersionedEventReceiver interfaces
6+
type EventBus struct {
97
publishedEventsChannel chan *goengine.DomainMessage
108
startReceiving bool
119
}
1210

13-
// NewInMemoryEventBus constructor
14-
func NewInMemoryEventBus() *InMemoryEventBus {
11+
// NewEventBus ...
12+
func NewEventBus() *EventBus {
1513
publishedEventsChannel := make(chan *goengine.DomainMessage, 0)
16-
return &InMemoryEventBus{publishedEventsChannel, false}
14+
return &EventBus{publishedEventsChannel, false}
1715
}
1816

1917
// PublishEvents publishes events to the event bus
20-
func (bus *InMemoryEventBus) PublishEvents(events []*goengine.DomainMessage) error {
18+
func (bus *EventBus) PublishEvents(events []*goengine.DomainMessage) error {
2119
if !bus.startReceiving {
2220
return nil
2321
}
@@ -30,7 +28,7 @@ func (bus *InMemoryEventBus) PublishEvents(events []*goengine.DomainMessage) err
3028
}
3129

3230
// ReceiveEvents starts a go routine that monitors incoming events and routes them to a receiver channel specified within the options
33-
func (bus *InMemoryEventBus) ReceiveEvents(options goengine.VersionedEventReceiverOptions) error {
31+
func (bus *EventBus) ReceiveEvents(options goengine.VersionedEventReceiverOptions) error {
3432
bus.startReceiving = true
3533

3634
go func() {

inmemory/eventstore.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ package inmemory
22

33
import "github.com/hellofresh/goengine"
44

5-
type InMemoryEventStore struct {
5+
// EventStore ...
6+
type EventStore struct {
67
events map[goengine.StreamName]map[string][]*goengine.DomainMessage
78
}
89

9-
func NewEventStore() *InMemoryEventStore {
10-
return &InMemoryEventStore{make(map[goengine.StreamName]map[string][]*goengine.DomainMessage)}
10+
// NewEventStore ...
11+
func NewEventStore() *EventStore {
12+
return &EventStore{make(map[goengine.StreamName]map[string][]*goengine.DomainMessage)}
1113
}
1214

13-
func (s *InMemoryEventStore) Append(events *goengine.EventStream) error {
15+
// Append ...
16+
func (s *EventStore) Append(events *goengine.EventStream) error {
1417
name := events.Name
1518
for _, event := range events.Events {
1619
err := s.save(name, event)
@@ -22,11 +25,13 @@ func (s *InMemoryEventStore) Append(events *goengine.EventStream) error {
2225
return nil
2326
}
2427

25-
func (s *InMemoryEventStore) GetEventsFor(streamName goengine.StreamName, id string) (*goengine.EventStream, error) {
28+
// GetEventsFor ...
29+
func (s *EventStore) GetEventsFor(streamName goengine.StreamName, id string) (*goengine.EventStream, error) {
2630
return goengine.NewEventStream(streamName, s.events[streamName][id]), nil
2731
}
2832

29-
func (s *InMemoryEventStore) FromVersion(streamName goengine.StreamName, id string, version int) (*goengine.EventStream, error) {
33+
// FromVersion ...
34+
func (s *EventStore) FromVersion(streamName goengine.StreamName, id string, version int) (*goengine.EventStream, error) {
3035
events, _ := s.GetEventsFor(streamName, id)
3136
var filtered []*goengine.DomainMessage
3237

@@ -39,12 +44,13 @@ func (s *InMemoryEventStore) FromVersion(streamName goengine.StreamName, id stri
3944
return goengine.NewEventStream(streamName, filtered), nil
4045
}
4146

42-
func (s *InMemoryEventStore) CountEventsFor(streamName goengine.StreamName, id string) (int64, error) {
47+
// CountEventsFor ...
48+
func (s *EventStore) CountEventsFor(streamName goengine.StreamName, id string) (int64, error) {
4349
stream, _ := s.GetEventsFor(streamName, id)
4450
return int64(len(stream.Events)), nil
4551
}
4652

47-
func (s *InMemoryEventStore) save(streamName goengine.StreamName, event *goengine.DomainMessage) error {
53+
func (s *EventStore) save(streamName goengine.StreamName, event *goengine.DomainMessage) error {
4854
id := event.ID
4955
events, exists := s.events[streamName][id]
5056

0 commit comments

Comments
 (0)