Skip to content

Commit a35e092

Browse files
Remove useless []*Changed
1 parent 212d3cd commit a35e092

File tree

3 files changed

+28
-28
lines changed

3 files changed

+28
-28
lines changed

aggregate/aggregate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type (
3939
}
4040

4141
eventSourced interface {
42-
replay(aggregate EventApplier, historyEvents []*Changed)
42+
replay(aggregate EventApplier, historyEvents goengine.EventStream) error
4343
recordThat(aggregate EventApplier, event *Changed)
4444
}
4545
)

aggregate/base_root.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package aggregate
22

33
import (
44
"sync"
5+
6+
"github.com/hellofresh/goengine"
57
)
68

79
var (
@@ -49,13 +51,32 @@ func (b *BaseRoot) popRecordedEvents() []*Changed {
4951
return pendingEvents
5052
}
5153

52-
func (b *BaseRoot) replay(aggregate EventApplier, historyEvents []*Changed) {
54+
func (b *BaseRoot) replay(aggregate EventApplier, streamEvents goengine.EventStream) error {
5355
b.Lock()
5456
defer b.Unlock()
5557

56-
for _, pastEvent := range historyEvents {
57-
b.version = pastEvent.Version()
58+
for streamEvents.Next() {
59+
msg, _, err := streamEvents.Message()
60+
if err != nil {
61+
return err
62+
}
63+
64+
changedEvent, ok := msg.(*Changed)
65+
if !ok {
66+
return ErrUnexpectedMessageType
67+
}
68+
69+
b.version = changedEvent.Version()
70+
aggregate.Apply(changedEvent)
71+
}
5872

59-
aggregate.Apply(pastEvent)
73+
if err := streamEvents.Err(); err != nil {
74+
return err
6075
}
76+
77+
if b.version == 0 {
78+
return ErrEmptyEventStream
79+
}
80+
81+
return nil
6182
}

aggregate/repository.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -94,32 +94,11 @@ func (r *Repository) GetAggregateRoot(ctx context.Context, aggregateID ID) (Root
9494
}
9595
defer streamEvents.Close()
9696

97-
var changedStream []*Changed
98-
for streamEvents.Next() {
99-
msg, _, err := streamEvents.Message()
100-
if err != nil {
101-
return nil, err
102-
}
103-
104-
changedEvent, ok := msg.(*Changed)
105-
if !ok {
106-
return nil, ErrUnexpectedMessageType
107-
}
108-
109-
changedStream = append(changedStream, changedEvent)
110-
}
111-
112-
if err := streamEvents.Err(); err != nil {
97+
root := r.aggregateType.CreateInstance()
98+
if err = root.replay(root, streamEvents); err != nil {
11399
return nil, err
114100
}
115101

116-
if len(changedStream) == 0 {
117-
return nil, ErrEmptyEventStream
118-
}
119-
120-
root := r.aggregateType.CreateInstance()
121-
root.replay(root, changedStream)
122-
123102
return root, nil
124103
}
125104

0 commit comments

Comments
 (0)