Skip to content

Commit 3253d45

Browse files
Ensure Projection Init is called
During one of the refactors we lost the Projection Init call. This init call has been added again to ensure projections can setup there initial state.
1 parent 044eaf1 commit 3253d45

File tree

6 files changed

+53
-49
lines changed

6 files changed

+53
-49
lines changed

driver/sql/internal/notification_projector.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ type NotificationProjector struct {
1818

1919
storage driverSQL.ProjectionStorage
2020

21-
decodeProjectionState driverSQL.ProjectionStateDecoder
21+
projectionStateInit driverSQL.ProjectionStateInitializer
22+
projectionStateDecode driverSQL.ProjectionStateDecoder
2223
handlers map[string]goengine.MessageHandler
2324

2425
eventLoader driverSQL.EventStreamLoader
@@ -31,23 +32,26 @@ type NotificationProjector struct {
3132
func NewNotificationProjector(
3233
db *sql.DB,
3334
storage driverSQL.ProjectionStorage,
34-
acquireUnmarshalState driverSQL.ProjectionStateDecoder,
35+
projectionStateInit driverSQL.ProjectionStateInitializer,
36+
projectionStateDecode driverSQL.ProjectionStateDecoder,
3537
eventHandlers map[string]goengine.MessageHandler,
3638
eventLoader driverSQL.EventStreamLoader,
3739
resolver goengine.MessagePayloadResolver,
3840
logger goengine.Logger,
3941
) (*NotificationProjector, error) {
4042
switch {
4143
case db == nil:
42-
return nil, errors.New("db cannot be nil")
44+
return nil, goengine.InvalidArgumentError("db")
4345
case storage == nil:
44-
return nil, errors.New("storage cannot be nil")
46+
return nil, goengine.InvalidArgumentError("storage")
47+
case projectionStateInit == nil:
48+
return nil, goengine.InvalidArgumentError("projectionStateInit")
4549
case len(eventHandlers) == 0:
46-
return nil, errors.New("eventHandlers cannot be empty")
50+
return nil, goengine.InvalidArgumentError("eventHandlers")
4751
case eventLoader == nil:
48-
return nil, errors.New("eventLoader cannot be nil")
52+
return nil, goengine.InvalidArgumentError("eventLoader")
4953
case resolver == nil:
50-
return nil, errors.New("resolver cannot be nil")
54+
return nil, goengine.InvalidArgumentError("resolver")
5155
}
5256

5357
if logger == nil {
@@ -57,7 +61,8 @@ func NewNotificationProjector(
5761
return &NotificationProjector{
5862
db: db,
5963
storage: storage,
60-
decodeProjectionState: acquireUnmarshalState,
64+
projectionStateInit: projectionStateInit,
65+
projectionStateDecode: projectionStateDecode,
6166
handlers: wrapProjectionHandlers(eventHandlers),
6267
eventLoader: eventLoader,
6368
resolver: resolver,
@@ -114,10 +119,17 @@ func (s *NotificationProjector) project(
114119
}
115120
defer releaseLock()
116121

117-
// Unmarshal the projection state
122+
// Decode or initialize projection state
118123
var projectionState interface{}
119-
if s.decodeProjectionState != nil {
120-
projectionState, err = s.decodeProjectionState(rawState.ProjectionState)
124+
if rawState.Position == 0 {
125+
// This is the fist time the projection runs so initialize the state
126+
projectionState, err = s.projectionStateInit(ctx)
127+
if err != nil {
128+
return err
129+
}
130+
} else if s.projectionStateDecode != nil {
131+
// Unmarshal the projection state
132+
projectionState, err = s.projectionStateDecode(rawState.ProjectionState)
121133
if err != nil {
122134
return err
123135
}

driver/sql/postgres/projector_aggregate.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func NewAggregateProjector(
8484
executor, err := internal.NewNotificationProjector(
8585
db,
8686
storage,
87+
projection.Init,
8788
stateDecoder,
8889
projection.Handlers(),
8990
aggregateProjectionEventStreamLoader(eventStore, projection.FromStream(), aggregateTypeName),

driver/sql/postgres/projector_aggregate_storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func newAggregateProjectionStorage(
9292
queryAcquireLock: fmt.Sprintf(
9393
`WITH new_projection AS (
9494
INSERT INTO %[1]s (aggregate_id, state) SELECT $1, 'null' WHERE NOT EXISTS (
95-
SELECT * FROM %[1]s WHERE aggregate_id = $1
95+
SELECT * FROM %[1]s WHERE aggregate_id = $1 LIMIT 1
9696
) ON CONFLICT DO NOTHING
9797
RETURNING *
9898
)
@@ -118,7 +118,7 @@ func (a *aggregateProjectionStorage) LoadOutOfSync(ctx context.Context, conn dri
118118
return conn.QueryContext(ctx, a.queryOutOfSyncProjections)
119119
}
120120

121-
func (a *aggregateProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
121+
func (a *aggregateProjectionStorage) PersistState(conn driverSQL.Execer, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
122122
encodedState, err := a.projectionStateEncoder(state.ProjectionState)
123123
if err != nil {
124124
return err

driver/sql/postgres/projector_stream.go

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package postgres
33
import (
44
"context"
55
"database/sql"
6-
"fmt"
76
"math"
87
"strings"
98
"sync"
@@ -17,15 +16,14 @@ import (
1716
// StreamProjector is a postgres projector used to execute a projection against an event stream.
1817
type StreamProjector struct {
1918
sync.Mutex
20-
executor *internalSQL.NotificationProjector
21-
22-
db *sql.DB
2319

24-
projectionName string
25-
projectionTable string
20+
db *sql.DB
21+
executor *internalSQL.NotificationProjector
22+
storage *streamProjectionStorage
2623

27-
logger goengine.Logger
2824
projectionErrorHandler driverSQL.ProjectionErrorCallback
25+
26+
logger goengine.Logger
2927
}
3028

3129
// NewStreamProjector creates a new projector for a projection
@@ -75,6 +73,7 @@ func NewStreamProjector(
7573
executor, err := internalSQL.NewNotificationProjector(
7674
db,
7775
storage,
76+
projection.Init,
7877
stateDecoder,
7978
projection.Handlers(),
8079
streamProjectionEventStreamLoader(eventStore, projection.FromStream()),
@@ -86,15 +85,11 @@ func NewStreamProjector(
8685
}
8786

8887
return &StreamProjector{
89-
executor: executor,
90-
91-
db: db,
92-
93-
projectionName: projection.Name(),
94-
projectionTable: projectionTable,
88+
db: db,
89+
executor: executor,
90+
storage: storage,
9591
projectionErrorHandler: projectionErrorHandler,
96-
97-
logger: logger,
92+
logger: logger,
9893
}, nil
9994
}
10095

@@ -110,7 +105,7 @@ func (s *StreamProjector) Run(ctx context.Context) error {
110105
return nil
111106
}
112107

113-
if err := s.setupProjection(ctx); err != nil {
108+
if err := s.storage.CreateProjection(ctx, s.db); err != nil {
114109
return err
115110
}
116111

@@ -129,7 +124,7 @@ func (s *StreamProjector) RunAndListen(ctx context.Context, listener driverSQL.L
129124
return nil
130125
}
131126

132-
if err := s.setupProjection(ctx); err != nil {
127+
if err := s.storage.CreateProjection(ctx, s.db); err != nil {
133128
return err
134129
}
135130

@@ -168,19 +163,3 @@ func (s *StreamProjector) processNotification(
168163
math.MaxInt16,
169164
)
170165
}
171-
172-
// setupProjection Creates the projection if none exists
173-
func (s *StreamProjector) setupProjection(ctx context.Context) error {
174-
// Ignore duplicate inserts
175-
_, err := s.db.ExecContext(
176-
ctx,
177-
/* #nosec */
178-
fmt.Sprintf(
179-
`INSERT INTO %s (name) VALUES ($1) ON CONFLICT DO NOTHING`,
180-
QuoteIdentifier(s.projectionTable),
181-
),
182-
s.projectionName,
183-
)
184-
185-
return err
186-
}

driver/sql/postgres/projector_stream_storage.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"strings"
99

1010
"github.com/hellofresh/goengine"
11-
1211
driverSQL "github.com/hellofresh/goengine/driver/sql"
1312
)
1413

@@ -26,6 +25,7 @@ type streamProjectionStorage struct {
2625

2726
logger goengine.Logger
2827

28+
queryCreateProjection string
2929
queryAcquireLock string
3030
queryAcquirePositionLock string
3131
queryReleaseLock string
@@ -62,6 +62,10 @@ func newStreamProjectionStorage(
6262
projectionStateEncoder: projectionStateEncoder,
6363
logger: logger,
6464

65+
queryCreateProjection: fmt.Sprintf(
66+
`INSERT INTO %s (name) VALUES ($1) ON CONFLICT DO NOTHING`,
67+
projectionTableQuoted,
68+
),
6569
queryAcquireLock: fmt.Sprintf(
6670
`SELECT pg_try_advisory_lock(%[2]s::regclass::oid::int, no), locked, position, state FROM %[1]s WHERE name = $1`,
6771
projectionTableQuoted,
@@ -88,7 +92,12 @@ func newStreamProjectionStorage(
8892
}, nil
8993
}
9094

91-
func (s *streamProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
95+
func (s *streamProjectionStorage) CreateProjection(ctx context.Context, conn driverSQL.Execer) error {
96+
_, err := conn.ExecContext(ctx, s.queryCreateProjection, s.projectionName)
97+
return err
98+
}
99+
100+
func (s *streamProjectionStorage) PersistState(conn driverSQL.Execer, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
92101
encodedState, err := s.projectionStateEncoder(state.ProjectionState)
93102
if err != nil {
94103
return err

driver/sql/projection.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type (
2929
ProjectionState []byte
3030
}
3131

32+
// ProjectionStateInitializer is a func to initialize a ProjectionState.ProjectionState
33+
ProjectionStateInitializer func(ctx context.Context) (interface{}, error)
34+
3235
// ProjectionStateEncoder is a func to marshal the ProjectionState.ProjectionState
3336
ProjectionStateEncoder func(interface{}) ([]byte, error)
3437

@@ -38,7 +41,7 @@ type (
3841
// ProjectionStorage is an interface for handling the projection storage
3942
ProjectionStorage interface {
4043
// PersistState persists the state of the projection
41-
PersistState(conn *sql.Conn, notification *ProjectionNotification, state ProjectionState) error
44+
PersistState(conn Execer, notification *ProjectionNotification, state ProjectionState) error
4245

4346
// Acquire this function is used to acquire the projection and it's projectionState
4447
// A projection can only be acquired once and must be released using the returned func

0 commit comments

Comments
 (0)