Skip to content

Commit 6aa1ebe

Browse files
committed
Added configurable context
1 parent d60bc90 commit 6aa1ebe

File tree

6 files changed

+262
-12
lines changed

6 files changed

+262
-12
lines changed

goengine_suite_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
)
99

1010
func TestGoengine(t *testing.T) {
11+
SetLogHandler(func(msg string, fields map[string]interface{}, err error) {})
12+
1113
RegisterFailHandler(Fail)
1214
RunSpecs(t, "GO Engine Suite")
1315
}

mongodb/context_strategy.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package mongodb
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// ContextStrategy is an interface that represents strategy for providing contexts for running MongoDB requests
9+
type ContextStrategy interface {
10+
// Append is the context used for EventStore.Append() calls
11+
Append() (context.Context, context.CancelFunc)
12+
// GetEventsFor is the context used for EventStore.GetEventsFor() calls
13+
GetEventsFor() (context.Context, context.CancelFunc)
14+
// FromVersion is the context used for EventStore.FromVersion() calls
15+
FromVersion() (context.Context, context.CancelFunc)
16+
// CountEventsFor is the context used for EventStore.CountEventsFor() calls
17+
CountEventsFor() (context.Context, context.CancelFunc)
18+
// CreateIndices is the context used for MongoDB EventStore implementation indices creation
19+
CreateIndices() (context.Context, context.CancelFunc)
20+
}
21+
22+
// BackgroundContextStrategy is the ContextStrategy implementation that always returns Background context and noop cancel
23+
type BackgroundContextStrategy struct {
24+
ctx context.Context
25+
}
26+
27+
// NewBackgroundContextStrategy instantiates new BackgroundContextStrategy
28+
func NewBackgroundContextStrategy() *BackgroundContextStrategy {
29+
return &BackgroundContextStrategy{ctx: context.Background()}
30+
}
31+
32+
// Append is the context used for EventStore.Append() calls
33+
func (s *BackgroundContextStrategy) Append() (context.Context, context.CancelFunc) {
34+
return s.ctx, func() {}
35+
}
36+
37+
// GetEventsFor is the context used for EventStore.GetEventsFor() calls
38+
func (s *BackgroundContextStrategy) GetEventsFor() (context.Context, context.CancelFunc) {
39+
return s.ctx, func() {}
40+
}
41+
42+
// FromVersion is the context used for EventStore.FromVersion() calls
43+
func (s *BackgroundContextStrategy) FromVersion() (context.Context, context.CancelFunc) {
44+
return s.ctx, func() {}
45+
}
46+
47+
// CountEventsFor is the context used for EventStore.CountEventsFor() calls
48+
func (s *BackgroundContextStrategy) CountEventsFor() (context.Context, context.CancelFunc) {
49+
return s.ctx, func() {}
50+
}
51+
52+
// CreateIndices is the context used for MongoDB EventStore implementation indices creation
53+
func (s *BackgroundContextStrategy) CreateIndices() (context.Context, context.CancelFunc) {
54+
return s.ctx, func() {}
55+
}
56+
57+
// TimeoutContextStrategy is the ContextStrategy implementation that returns configurable WithTimeout context and its cancel
58+
type TimeoutContextStrategy struct {
59+
append time.Duration
60+
getEventsFor time.Duration
61+
fromVersion time.Duration
62+
countEventsFor time.Duration
63+
createIndices time.Duration
64+
}
65+
66+
// TimeoutContextStrategyOption is the options type to configure TimeoutContextStrategy creation
67+
type TimeoutContextStrategyOption func(s *TimeoutContextStrategy)
68+
69+
// NewTimeoutContextStrategy instantiates new TimeoutContextStrategy
70+
func NewTimeoutContextStrategy(options ...TimeoutContextStrategyOption) *TimeoutContextStrategy {
71+
s := &TimeoutContextStrategy{
72+
append: 5 * time.Second,
73+
getEventsFor: 30 * time.Second,
74+
fromVersion: 30 * time.Second,
75+
countEventsFor: 5 * time.Second,
76+
createIndices: 5 * time.Second,
77+
}
78+
79+
for _, o := range options {
80+
o(s)
81+
}
82+
83+
return s
84+
}
85+
86+
// Append is the context used for EventStore.Append() calls
87+
func (s *TimeoutContextStrategy) Append() (context.Context, context.CancelFunc) {
88+
return context.WithTimeout(context.Background(), s.append)
89+
}
90+
91+
// GetEventsFor is the context used for EventStore.GetEventsFor() calls
92+
func (s *TimeoutContextStrategy) GetEventsFor() (context.Context, context.CancelFunc) {
93+
return context.WithTimeout(context.Background(), s.getEventsFor)
94+
}
95+
96+
// FromVersion is the context used for EventStore.FromVersion() calls
97+
func (s *TimeoutContextStrategy) FromVersion() (context.Context, context.CancelFunc) {
98+
return context.WithTimeout(context.Background(), s.fromVersion)
99+
}
100+
101+
// CountEventsFor is the context used for EventStore.CountEventsFor() calls
102+
func (s *TimeoutContextStrategy) CountEventsFor() (context.Context, context.CancelFunc) {
103+
return context.WithTimeout(context.Background(), s.countEventsFor)
104+
}
105+
106+
// CreateIndices is the context used for MongoDB EventStore implementation indices creation
107+
func (s *TimeoutContextStrategy) CreateIndices() (context.Context, context.CancelFunc) {
108+
return context.WithTimeout(context.Background(), s.createIndices)
109+
}
110+
111+
// NewAppendTimeout is the TimeoutContextStrategy configuration option to set a timeout for Append call
112+
func NewAppendTimeout(timeout time.Duration) TimeoutContextStrategyOption {
113+
return func(s *TimeoutContextStrategy) {
114+
s.append = timeout
115+
}
116+
}
117+
118+
// NewGetEventsForTimeout is the TimeoutContextStrategy configuration option to set a timeout for GetEventsFor call
119+
func NewGetEventsForTimeout(timeout time.Duration) TimeoutContextStrategyOption {
120+
return func(s *TimeoutContextStrategy) {
121+
s.getEventsFor = timeout
122+
}
123+
}
124+
125+
// NewFromVersionTimeout is the TimeoutContextStrategy configuration option to set a timeout for FromVersion call
126+
func NewFromVersionTimeout(timeout time.Duration) TimeoutContextStrategyOption {
127+
return func(s *TimeoutContextStrategy) {
128+
s.fromVersion = timeout
129+
}
130+
}
131+
132+
// NewCountEventsForTimeout is the TimeoutContextStrategy configuration option to set a timeout for CountEventsFor call
133+
func NewCountEventsForTimeout(timeout time.Duration) TimeoutContextStrategyOption {
134+
return func(s *TimeoutContextStrategy) {
135+
s.countEventsFor = timeout
136+
}
137+
}
138+
139+
// NewCreateIndicesTimeout is the TimeoutContextStrategy configuration option to set a timeout for CreateIndices call
140+
func NewCreateIndicesTimeout(timeout time.Duration) TimeoutContextStrategyOption {
141+
return func(s *TimeoutContextStrategy) {
142+
s.createIndices = timeout
143+
}
144+
}

mongodb/eventstore.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package mongodb
22

33
import (
4-
"context"
54
"encoding/json"
65
"time"
76

@@ -24,14 +23,25 @@ type MongoEvent struct {
2423

2524
// EventStore The mongodb event store
2625
type EventStore struct {
27-
mongoDB *mongo.Database
28-
26+
mongoDB *mongo.Database
2927
registry goengine.TypeRegistry
28+
29+
cs ContextStrategy
3030
}
3131

3232
// NewEventStore creates new MongoDB based event store
33-
func NewEventStore(mongoDB *mongo.Database, r goengine.TypeRegistry) *EventStore {
34-
return &EventStore{mongoDB, r}
33+
func NewEventStore(mongoDB *mongo.Database, registry goengine.TypeRegistry, options ...Option) *EventStore {
34+
es := &EventStore{
35+
mongoDB: mongoDB,
36+
registry: registry,
37+
cs: NewBackgroundContextStrategy(),
38+
}
39+
40+
for _, o := range options {
41+
o(es)
42+
}
43+
44+
return es
3545
}
3646

3747
// Append adds an event to the event store
@@ -44,12 +54,12 @@ func (s *EventStore) Append(events *goengine.EventStream) error {
4454
}
4555

4656
coll := s.mongoDB.Collection(streamName)
47-
err = s.createIndexes(coll)
57+
err = s.createIndices(coll)
4858
if nil != err {
4959
return err
5060
}
5161

52-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
62+
ctx, cancel := s.cs.Append()
5363
_, err = coll.InsertOne(ctx, mongoEvent)
5464
cancel()
5565

@@ -66,7 +76,7 @@ func (s *EventStore) GetEventsFor(streamName goengine.StreamName, id string) (*g
6676
var mongoEvents []MongoEvent
6777
coll := s.mongoDB.Collection(string(streamName))
6878

69-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
79+
ctx, cancel := s.cs.GetEventsFor()
7080
defer cancel()
7181

7282
cur, err := coll.Find(ctx, bson.M{"aggregate_id": id})
@@ -106,7 +116,7 @@ func (s *EventStore) FromVersion(streamName goengine.StreamName, id string, vers
106116
var mongoEvents []MongoEvent
107117
coll := s.mongoDB.Collection(string(streamName))
108118

109-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
119+
ctx, cancel := s.cs.FromVersion()
110120
defer cancel()
111121

112122
cur, err := coll.Find(
@@ -150,14 +160,14 @@ func (s *EventStore) FromVersion(streamName goengine.StreamName, id string, vers
150160

151161
// CountEventsFor counts events for an id on the specified stream
152162
func (s *EventStore) CountEventsFor(streamName goengine.StreamName, id string) (int64, error) {
153-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
163+
ctx, cancel := s.cs.CountEventsFor()
154164
defer cancel()
155165

156166
return s.mongoDB.Collection(string(streamName)).Count(ctx, bson.M{"aggregate_id": string(streamName)})
157167
}
158168

159-
func (s *EventStore) createIndexes(c *mongo.Collection) error {
160-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
169+
func (s *EventStore) createIndices(c *mongo.Collection) error {
170+
ctx, cancel := s.cs.CreateIndices()
161171
defer cancel()
162172

163173
_, err := c.Indexes().CreateOne(

mongodb/mongodb_suite_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package mongodb
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/onsi/ginkgo"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
func TestGoengine(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "MongoDB EventStore Suite")
13+
}

mongodb/options.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package mongodb
2+
3+
// Option is the options type to configure MongoDB EventStore implementation creation
4+
type Option func(eventStore *EventStore)
5+
6+
// ContextBackground sets Context Strategy for EventStore to BackgroundContextStrategy
7+
func ContextBackground() Option {
8+
return func(eventStore *EventStore) {
9+
eventStore.cs = NewBackgroundContextStrategy()
10+
}
11+
}
12+
13+
// ContextTimeout sets Context Strategy for EventStore to TimeoutContextStrategy
14+
func ContextTimeout(options ...TimeoutContextStrategyOption) Option {
15+
return func(eventStore *EventStore) {
16+
eventStore.cs = NewTimeoutContextStrategy(options...)
17+
}
18+
}

mongodb/options_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package mongodb
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
7+
. "github.com/onsi/ginkgo"
8+
. "github.com/onsi/gomega"
9+
)
10+
11+
var _ = Describe("MongoDB Event Store Context Background", func() {
12+
Describe("when I use ContextBackground option", func() {
13+
es := NewEventStore(nil, nil, ContextBackground())
14+
15+
It("should have BackgroundContextStrategy context strategy", func() {
16+
Expect(es.cs).Should(BeAssignableToTypeOf(NewBackgroundContextStrategy()))
17+
})
18+
})
19+
})
20+
21+
var _ = Describe("MongoDB Event Store Context Timeout", func() {
22+
Describe("when I use ContextTimeout option", func() {
23+
es := NewEventStore(nil, nil, ContextTimeout())
24+
25+
It("should have TimeoutContextStrategy context strategy with some predefined values", func() {
26+
Expect(es.cs).Should(BeAssignableToTypeOf(NewTimeoutContextStrategy()))
27+
28+
csTimeout, _ := es.cs.(*TimeoutContextStrategy)
29+
Expect(csTimeout.append).Should(BeNumerically(">", 0))
30+
Expect(csTimeout.getEventsFor).Should(BeNumerically(">", 0))
31+
Expect(csTimeout.fromVersion).Should(BeNumerically(">", 0))
32+
Expect(csTimeout.countEventsFor).Should(BeNumerically(">", 0))
33+
Expect(csTimeout.createIndices).Should(BeNumerically(">", 0))
34+
})
35+
})
36+
37+
Describe("when I use ContextTimeout option with creation options", func() {
38+
appendTimeout := rand.Uint64()
39+
getEventsFor := rand.Uint64()
40+
fromVersion := rand.Uint64()
41+
countEventsFor := rand.Uint64()
42+
createIndices := rand.Uint64()
43+
44+
es := NewEventStore(nil, nil, ContextTimeout(
45+
NewAppendTimeout(time.Duration(appendTimeout)),
46+
NewGetEventsForTimeout(time.Duration(getEventsFor)),
47+
NewFromVersionTimeout(time.Duration(fromVersion)),
48+
NewCountEventsForTimeout(time.Duration(countEventsFor)),
49+
NewCreateIndicesTimeout(time.Duration(createIndices)),
50+
))
51+
52+
It("should have all the timeouts set respectively", func() {
53+
Expect(es.cs).Should(BeAssignableToTypeOf(NewTimeoutContextStrategy()))
54+
55+
csTimeout, _ := es.cs.(*TimeoutContextStrategy)
56+
Expect(csTimeout.append).Should(BeEquivalentTo(appendTimeout))
57+
Expect(csTimeout.getEventsFor).Should(BeEquivalentTo(getEventsFor))
58+
Expect(csTimeout.fromVersion).Should(BeEquivalentTo(fromVersion))
59+
Expect(csTimeout.countEventsFor).Should(BeEquivalentTo(countEventsFor))
60+
Expect(csTimeout.createIndices).Should(BeEquivalentTo(createIndices))
61+
})
62+
})
63+
})

0 commit comments

Comments
 (0)