Skip to content

Commit 2b94a3d

Browse files
Merge pull request #19 from hellofresh/patch/projection-init
Ensure Projection Init is called
2 parents 9cb38b1 + 9935b2b commit 2b94a3d

File tree

6 files changed

+84
-69
lines changed

6 files changed

+84
-69
lines changed

driver/sql/internal/notification_projector.go

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ type NotificationProjector struct {
1818

1919
storage driverSQL.ProjectionStorage
2020

21-
decodeProjectionState driverSQL.ProjectionStateDecoder
21+
projectionStateInit driverSQL.ProjectionStateInitializer
22+
projectionStateDecode driverSQL.ProjectionStateDecoder
2223
handlers map[string]goengine.MessageHandler
2324

2425
eventLoader driverSQL.EventStreamLoader
@@ -31,23 +32,26 @@ type NotificationProjector struct {
3132
func NewNotificationProjector(
3233
db *sql.DB,
3334
storage driverSQL.ProjectionStorage,
34-
acquireUnmarshalState driverSQL.ProjectionStateDecoder,
35+
projectionStateInit driverSQL.ProjectionStateInitializer,
36+
projectionStateDecode driverSQL.ProjectionStateDecoder,
3537
eventHandlers map[string]goengine.MessageHandler,
3638
eventLoader driverSQL.EventStreamLoader,
3739
resolver goengine.MessagePayloadResolver,
3840
logger goengine.Logger,
3941
) (*NotificationProjector, error) {
4042
switch {
4143
case db == nil:
42-
return nil, errors.New("db cannot be nil")
44+
return nil, goengine.InvalidArgumentError("db")
4345
case storage == nil:
44-
return nil, errors.New("storage cannot be nil")
46+
return nil, goengine.InvalidArgumentError("storage")
47+
case projectionStateInit == nil:
48+
return nil, goengine.InvalidArgumentError("projectionStateInit")
4549
case len(eventHandlers) == 0:
46-
return nil, errors.New("eventHandlers cannot be empty")
50+
return nil, goengine.InvalidArgumentError("eventHandlers")
4751
case eventLoader == nil:
48-
return nil, errors.New("eventLoader cannot be nil")
52+
return nil, goengine.InvalidArgumentError("eventLoader")
4953
case resolver == nil:
50-
return nil, errors.New("resolver cannot be nil")
54+
return nil, goengine.InvalidArgumentError("resolver")
5155
}
5256

5357
if logger == nil {
@@ -57,7 +61,8 @@ func NewNotificationProjector(
5761
return &NotificationProjector{
5862
db: db,
5963
storage: storage,
60-
decodeProjectionState: acquireUnmarshalState,
64+
projectionStateInit: projectionStateInit,
65+
projectionStateDecode: projectionStateDecode,
6166
handlers: wrapProjectionHandlers(eventHandlers),
6267
eventLoader: eventLoader,
6368
resolver: resolver,
@@ -114,21 +119,8 @@ func (s *NotificationProjector) project(
114119
}
115120
defer releaseLock()
116121

117-
// Unmarshal the projection state
118-
var projectionState interface{}
119-
if s.decodeProjectionState != nil {
120-
projectionState, err = s.decodeProjectionState(rawState.ProjectionState)
121-
if err != nil {
122-
return err
123-
}
124-
}
125-
state := driverSQL.ProjectionState{
126-
Position: rawState.Position,
127-
ProjectionState: projectionState,
128-
}
129-
130122
// Load the event stream
131-
eventStream, err := s.eventLoader(ctx, streamConn, notification, state)
123+
eventStream, err := s.eventLoader(ctx, streamConn, notification, rawState.Position)
132124
if err != nil {
133125
return err
134126
}
@@ -139,7 +131,7 @@ func (s *NotificationProjector) project(
139131
}()
140132

141133
// project event stream
142-
if err := s.projectStream(ctx, conn, notification, state, eventStream); err != nil {
134+
if err := s.projectStream(ctx, conn, notification, rawState, eventStream); err != nil {
143135
return err
144136
}
145137

@@ -149,11 +141,15 @@ func (s *NotificationProjector) project(
149141
// projectStream will project the events in the event stream and persist the state after the projection
150142
func (s *NotificationProjector) projectStream(
151143
ctx context.Context,
152-
conn *sql.Conn,
144+
conn driverSQL.Execer,
153145
notification *driverSQL.ProjectionNotification,
154-
state driverSQL.ProjectionState,
146+
rawState *driverSQL.ProjectionRawState,
155147
stream goengine.EventStream,
156148
) error {
149+
var (
150+
state driverSQL.ProjectionState
151+
stateAcquired bool
152+
)
157153
for stream.Next() {
158154
// Check if the context is expired
159155
select {
@@ -167,7 +163,6 @@ func (s *NotificationProjector) projectStream(
167163
if err != nil {
168164
return err
169165
}
170-
state.Position = msgNumber
171166

172167
// Resolve the payload event name
173168
eventName, err := s.resolver.ResolveName(msg.Payload())
@@ -184,7 +179,17 @@ func (s *NotificationProjector) projectStream(
184179
continue
185180
}
186181

182+
// Acquire the state if we have none
183+
if !stateAcquired {
184+
state, err = s.acquireProjectState(ctx, rawState)
185+
if err != nil {
186+
return err
187+
}
188+
stateAcquired = true
189+
}
190+
187191
// Execute the handler
192+
state.Position = msgNumber
188193
state.ProjectionState, err = handler(ctx, state.ProjectionState, msg)
189194
if err != nil {
190195
return err
@@ -199,6 +204,24 @@ func (s *NotificationProjector) projectStream(
199204
return stream.Err()
200205
}
201206

207+
func (s *NotificationProjector) acquireProjectState(ctx context.Context, rawState *driverSQL.ProjectionRawState) (driverSQL.ProjectionState, error) {
208+
state := driverSQL.ProjectionState{
209+
Position: rawState.Position,
210+
}
211+
212+
// Decode or initialize projection state
213+
var err error
214+
if rawState.Position == 0 {
215+
// This is the fist time the projection runs so initialize the state
216+
state.ProjectionState, err = s.projectionStateInit(ctx)
217+
} else if s.projectionStateDecode != nil {
218+
// Unmarshal the projection state
219+
state.ProjectionState, err = s.projectionStateDecode(rawState.ProjectionState)
220+
}
221+
222+
return state, err
223+
}
224+
202225
// wrapProjectionHandlers wraps the projection handlers so that any error or panic is caught and returned
203226
func wrapProjectionHandlers(handlers map[string]goengine.MessageHandler) map[string]goengine.MessageHandler {
204227
res := make(map[string]goengine.MessageHandler, len(handlers))

driver/sql/postgres/projector_aggregate.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func NewAggregateProjector(
8484
executor, err := internal.NewNotificationProjector(
8585
db,
8686
storage,
87+
projection.Init,
8788
stateDecoder,
8889
projection.Handlers(),
8990
aggregateProjectionEventStreamLoader(eventStore, projection.FromStream(), aggregateTypeName),

driver/sql/postgres/projector_aggregate_storage.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ import (
1414
)
1515

1616
func aggregateProjectionEventStreamLoader(eventStore driverSQL.ReadOnlyEventStore, streamName goengine.StreamName, aggregateTypeName string) driverSQL.EventStreamLoader {
17-
return func(ctx context.Context, conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) (goengine.EventStream, error) {
17+
return func(ctx context.Context, conn *sql.Conn, notification *driverSQL.ProjectionNotification, position int64) (goengine.EventStream, error) {
1818
matcher := metadata.NewMatcher()
1919
matcher = metadata.WithConstraint(matcher, aggregate.IDKey, metadata.Equals, notification.AggregateID)
2020
matcher = metadata.WithConstraint(matcher, aggregate.TypeKey, metadata.Equals, aggregateTypeName)
2121

22-
return eventStore.LoadWithConnection(ctx, conn, streamName, state.Position+1, nil, matcher)
22+
return eventStore.LoadWithConnection(ctx, conn, streamName, position+1, nil, matcher)
2323
}
2424
}
2525

@@ -92,7 +92,7 @@ func newAggregateProjectionStorage(
9292
queryAcquireLock: fmt.Sprintf(
9393
`WITH new_projection AS (
9494
INSERT INTO %[1]s (aggregate_id, state) SELECT $1, 'null' WHERE NOT EXISTS (
95-
SELECT * FROM %[1]s WHERE aggregate_id = $1
95+
SELECT * FROM %[1]s WHERE aggregate_id = $1 LIMIT 1
9696
) ON CONFLICT DO NOTHING
9797
RETURNING *
9898
)
@@ -118,7 +118,7 @@ func (a *aggregateProjectionStorage) LoadOutOfSync(ctx context.Context, conn dri
118118
return conn.QueryContext(ctx, a.queryOutOfSyncProjections)
119119
}
120120

121-
func (a *aggregateProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
121+
func (a *aggregateProjectionStorage) PersistState(conn driverSQL.Execer, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
122122
encodedState, err := a.projectionStateEncoder(state.ProjectionState)
123123
if err != nil {
124124
return err

driver/sql/postgres/projector_stream.go

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package postgres
33
import (
44
"context"
55
"database/sql"
6-
"fmt"
76
"math"
87
"strings"
98
"sync"
@@ -17,15 +16,14 @@ import (
1716
// StreamProjector is a postgres projector used to execute a projection against an event stream.
1817
type StreamProjector struct {
1918
sync.Mutex
20-
executor *internalSQL.NotificationProjector
21-
22-
db *sql.DB
2319

24-
projectionName string
25-
projectionTable string
20+
db *sql.DB
21+
executor *internalSQL.NotificationProjector
22+
storage *streamProjectionStorage
2623

27-
logger goengine.Logger
2824
projectionErrorHandler driverSQL.ProjectionErrorCallback
25+
26+
logger goengine.Logger
2927
}
3028

3129
// NewStreamProjector creates a new projector for a projection
@@ -75,6 +73,7 @@ func NewStreamProjector(
7573
executor, err := internalSQL.NewNotificationProjector(
7674
db,
7775
storage,
76+
projection.Init,
7877
stateDecoder,
7978
projection.Handlers(),
8079
streamProjectionEventStreamLoader(eventStore, projection.FromStream()),
@@ -86,15 +85,11 @@ func NewStreamProjector(
8685
}
8786

8887
return &StreamProjector{
89-
executor: executor,
90-
91-
db: db,
92-
93-
projectionName: projection.Name(),
94-
projectionTable: projectionTable,
88+
db: db,
89+
executor: executor,
90+
storage: storage,
9591
projectionErrorHandler: projectionErrorHandler,
96-
97-
logger: logger,
92+
logger: logger,
9893
}, nil
9994
}
10095

@@ -110,7 +105,7 @@ func (s *StreamProjector) Run(ctx context.Context) error {
110105
return nil
111106
}
112107

113-
if err := s.setupProjection(ctx); err != nil {
108+
if err := s.storage.CreateProjection(ctx, s.db); err != nil {
114109
return err
115110
}
116111

@@ -129,7 +124,7 @@ func (s *StreamProjector) RunAndListen(ctx context.Context, listener driverSQL.L
129124
return nil
130125
}
131126

132-
if err := s.setupProjection(ctx); err != nil {
127+
if err := s.storage.CreateProjection(ctx, s.db); err != nil {
133128
return err
134129
}
135130

@@ -168,19 +163,3 @@ func (s *StreamProjector) processNotification(
168163
math.MaxInt16,
169164
)
170165
}
171-
172-
// setupProjection Creates the projection if none exists
173-
func (s *StreamProjector) setupProjection(ctx context.Context) error {
174-
// Ignore duplicate inserts
175-
_, err := s.db.ExecContext(
176-
ctx,
177-
/* #nosec */
178-
fmt.Sprintf(
179-
`INSERT INTO %s (name) VALUES ($1) ON CONFLICT DO NOTHING`,
180-
QuoteIdentifier(s.projectionTable),
181-
),
182-
s.projectionName,
183-
)
184-
185-
return err
186-
}

driver/sql/postgres/projector_stream_storage.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,12 @@ import (
88
"strings"
99

1010
"github.com/hellofresh/goengine"
11-
1211
driverSQL "github.com/hellofresh/goengine/driver/sql"
1312
)
1413

1514
func streamProjectionEventStreamLoader(eventStore driverSQL.ReadOnlyEventStore, streamName goengine.StreamName) driverSQL.EventStreamLoader {
16-
return func(ctx context.Context, conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) (goengine.EventStream, error) {
17-
return eventStore.LoadWithConnection(ctx, conn, streamName, state.Position+1, nil, nil)
15+
return func(ctx context.Context, conn *sql.Conn, notification *driverSQL.ProjectionNotification, position int64) (goengine.EventStream, error) {
16+
return eventStore.LoadWithConnection(ctx, conn, streamName, position+1, nil, nil)
1817
}
1918
}
2019

@@ -26,6 +25,7 @@ type streamProjectionStorage struct {
2625

2726
logger goengine.Logger
2827

28+
queryCreateProjection string
2929
queryAcquireLock string
3030
queryAcquirePositionLock string
3131
queryReleaseLock string
@@ -62,6 +62,10 @@ func newStreamProjectionStorage(
6262
projectionStateEncoder: projectionStateEncoder,
6363
logger: logger,
6464

65+
queryCreateProjection: fmt.Sprintf(
66+
`INSERT INTO %s (name) VALUES ($1) ON CONFLICT DO NOTHING`,
67+
projectionTableQuoted,
68+
),
6569
queryAcquireLock: fmt.Sprintf(
6670
`SELECT pg_try_advisory_lock(%[2]s::regclass::oid::int, no), locked, position, state FROM %[1]s WHERE name = $1`,
6771
projectionTableQuoted,
@@ -88,7 +92,12 @@ func newStreamProjectionStorage(
8892
}, nil
8993
}
9094

91-
func (s *streamProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
95+
func (s *streamProjectionStorage) CreateProjection(ctx context.Context, conn driverSQL.Execer) error {
96+
_, err := conn.ExecContext(ctx, s.queryCreateProjection, s.projectionName)
97+
return err
98+
}
99+
100+
func (s *streamProjectionStorage) PersistState(conn driverSQL.Execer, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
92101
encodedState, err := s.projectionStateEncoder(state.ProjectionState)
93102
if err != nil {
94103
return err

driver/sql/projection.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type (
2929
ProjectionState []byte
3030
}
3131

32+
// ProjectionStateInitializer is a func to initialize a ProjectionState.ProjectionState
33+
ProjectionStateInitializer func(ctx context.Context) (interface{}, error)
34+
3235
// ProjectionStateEncoder is a func to marshal the ProjectionState.ProjectionState
3336
ProjectionStateEncoder func(interface{}) ([]byte, error)
3437

@@ -38,7 +41,7 @@ type (
3841
// ProjectionStorage is an interface for handling the projection storage
3942
ProjectionStorage interface {
4043
// PersistState persists the state of the projection
41-
PersistState(conn *sql.Conn, notification *ProjectionNotification, state ProjectionState) error
44+
PersistState(conn Execer, notification *ProjectionNotification, state ProjectionState) error
4245

4346
// Acquire this function is used to acquire the projection and it's projectionState
4447
// A projection can only be acquired once and must be released using the returned func
@@ -52,5 +55,5 @@ type (
5255
ProjectionErrorAction int
5356

5457
// EventStreamLoader loads a event stream based on the provided notification and state
55-
EventStreamLoader func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification, state ProjectionState) (goengine.EventStream, error)
58+
EventStreamLoader func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification, position int64) (goengine.EventStream, error)
5659
)

0 commit comments

Comments
 (0)