Skip to content

Commit 90fe9e4

Browse files
author
Sergiu Ghitea
authored
Merge pull request #36 from hellofresh/patch/sql-based-projector
Projector Locking strategies
2 parents 5567b32 + 6c66819 commit 90fe9e4

31 files changed

+1436
-845
lines changed

aggregate/aggregate_type.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ import (
77
"github.com/hellofresh/goengine"
88
)
99

10-
var (
11-
// ErrInitiatorMustReturnRoot occurs when a aggregate.Initiator did not return a pointer
12-
ErrInitiatorMustReturnRoot = errors.New("goengine: the aggregate.Initiator must return a pointer to the aggregate.Root")
13-
)
10+
// ErrInitiatorMustReturnRoot occurs when a aggregate.Initiator did not return a pointer
11+
var ErrInitiatorMustReturnRoot = errors.New("goengine: the aggregate.Initiator must return a pointer to the aggregate.Root")
1412

1513
type (
1614
// Initiator creates a new empty instance of a aggregate.Root

aggregate/repository.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ var (
2626
ErrEmptyEventStream = errors.New("goengine: unsupported empty event stream")
2727
)
2828

29-
type (
30-
// Repository a repository to save and load aggregate.Root's of a specific type
31-
Repository struct {
32-
aggregateType *Type
33-
eventStore goengine.EventStore
34-
streamName goengine.StreamName
35-
}
36-
)
29+
// Repository a repository to save and load aggregate.Root's of a specific type
30+
type Repository struct {
31+
aggregateType *Type
32+
eventStore goengine.EventStore
33+
streamName goengine.StreamName
34+
}
3735

3836
// NewRepository instantiates a new AggregateRepository
3937
func NewRepository(

driver/inmemory/internal/generate_matcher/main.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import (
2121
"github.com/hellofresh/goengine/metadata"
2222
)
2323
24-
var (
25-
ErrUnsupportedType = errors.New("the value is not a scalar type")
26-
)
24+
var ErrUnsupportedType = errors.New("the value is not a scalar type")
2725
2826
func asScalar(value interface{}) (interface{}, error) {
2927
switch value.(type) {

driver/inmemory/matcher_gen.go

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

driver/sql/internal/sql.go

Lines changed: 0 additions & 52 deletions
This file was deleted.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package postgres
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
8+
"github.com/hellofresh/goengine"
9+
driverSQL "github.com/hellofresh/goengine/driver/sql"
10+
)
11+
12+
var _ driverSQL.ProjectorTransaction = &advisoryLockProjectorTransaction{}
13+
14+
type advisoryLockProjectorTransaction struct {
15+
conn *sql.Conn
16+
queryPersistState string
17+
queryReleaseLock string
18+
19+
stateSerialization driverSQL.ProjectionStateSerialization
20+
rawState *driverSQL.ProjectionRawState
21+
22+
projectionID string
23+
projectionState driverSQL.ProjectionState
24+
25+
logger goengine.Logger
26+
}
27+
28+
func (t *advisoryLockProjectorTransaction) AcquireState(ctx context.Context) (driverSQL.ProjectionState, error) {
29+
if t.rawState == nil {
30+
return t.projectionState, nil
31+
}
32+
33+
var err error
34+
state := driverSQL.ProjectionState{
35+
Position: t.rawState.Position,
36+
}
37+
38+
// Decode or initialize projection state
39+
if state.Position == 0 {
40+
// This is the fist time the projection runs so initialize the state
41+
state.ProjectionState, err = t.stateSerialization.Init(ctx)
42+
} else {
43+
// Unmarshal the projection state
44+
state.ProjectionState, err = t.stateSerialization.DecodeState(t.rawState.ProjectionState)
45+
}
46+
47+
if err != nil {
48+
return state, err
49+
}
50+
51+
t.projectionState = state
52+
t.rawState = nil
53+
54+
return t.projectionState, err
55+
}
56+
57+
func (t *advisoryLockProjectorTransaction) CommitState(newState driverSQL.ProjectionState) error {
58+
encodedState, err := t.stateSerialization.EncodeState(newState.ProjectionState)
59+
if err != nil {
60+
return err
61+
}
62+
63+
_, err = t.conn.ExecContext(context.Background(), t.queryPersistState, t.projectionID, newState.Position, encodedState)
64+
if err != nil {
65+
return err
66+
}
67+
68+
t.projectionState = newState
69+
70+
t.logger.Debug("updated projection state", func(e goengine.LoggerEntry) {
71+
e.String("projection_id", t.projectionID)
72+
e.Int64("projection_position", newState.Position)
73+
e.Any("state", newState)
74+
})
75+
76+
return nil
77+
}
78+
79+
func (t *advisoryLockProjectorTransaction) Close() error {
80+
res := t.conn.QueryRowContext(context.Background(), t.queryReleaseLock, t.projectionID)
81+
82+
var unlocked bool
83+
if err := res.Scan(&unlocked); err != nil {
84+
return err
85+
}
86+
87+
if !unlocked {
88+
return errors.New("failed to release db connection projection lock")
89+
}
90+
91+
t.logger.Debug("released projection lock", func(e goengine.LoggerEntry) {
92+
e.String("projection_id", t.projectionID)
93+
})
94+
95+
return nil
96+
}
97+
98+
type advisoryLockWithUpdateProjectorTransaction struct {
99+
advisoryLockProjectorTransaction
100+
101+
querySetRowLocked string
102+
}
103+
104+
func (t *advisoryLockWithUpdateProjectorTransaction) AcquireState(ctx context.Context) (driverSQL.ProjectionState, error) {
105+
if t.rawState == nil {
106+
return t.projectionState, nil
107+
}
108+
109+
// Set the projection as row locked
110+
_, err := t.conn.ExecContext(ctx, t.querySetRowLocked, t.projectionID, true)
111+
if err != nil {
112+
return driverSQL.ProjectionState{
113+
Position: t.rawState.Position,
114+
}, err
115+
}
116+
117+
return t.advisoryLockProjectorTransaction.AcquireState(ctx)
118+
}
119+
120+
func (t *advisoryLockWithUpdateProjectorTransaction) Close() error {
121+
// Set the projection as row unlocked
122+
_, err := t.conn.ExecContext(context.Background(), t.querySetRowLocked, t.projectionID, false)
123+
if err != nil {
124+
return err
125+
}
126+
127+
return t.advisoryLockProjectorTransaction.Close()
128+
}

0 commit comments

Comments
 (0)