Skip to content

Commit 3e9eddb

Browse files
Merge pull request #14 from hellofresh/patch/projection-without-state
Return an error when an unexpected state is provided
2 parents 3cf3ece + 56c13df commit 3e9eddb

File tree

7 files changed

+265
-25
lines changed

7 files changed

+265
-25
lines changed

driver/sql/postgres/projector.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package postgres
33
import (
44
"context"
55
"database/sql/driver"
6+
"errors"
67

78
"github.com/hellofresh/goengine/driver/sql"
89
)
@@ -45,3 +46,12 @@ func resolveErrorAction(
4546

4647
return errorFallthrough
4748
}
49+
50+
// defaultProjectionStateEncoder this `ProjectionStateEncoder` is used for a goeninge.Projection
51+
func defaultProjectionStateEncoder(state interface{}) ([]byte, error) {
52+
if state == nil {
53+
return []byte{'{', '}'}, nil
54+
}
55+
56+
return nil, errors.New("unexpected state provided (Did you forget to implement goengine.ProjectionSaga?)")
57+
}

driver/sql/postgres/projector_aggregate_storage.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,15 @@ func newAggregateProjectionStorage(
5353
if logger == nil {
5454
logger = goengine.NopLogger
5555
}
56+
if projectionStateEncoder == nil {
57+
projectionStateEncoder = defaultProjectionStateEncoder
58+
}
5659

5760
projectionTableQuoted := QuoteIdentifier(projectionTable)
5861
projectionTableStr := QuoteString(projectionTable)
5962
eventStoreTableQuoted := QuoteIdentifier(eventStoreTable)
6063

64+
/* #nosec */
6165
return &aggregateProjectionStorage{
6266
projectionStateEncoder: projectionStateEncoder,
6367
logger: logger,
@@ -115,15 +119,9 @@ func (a *aggregateProjectionStorage) LoadOutOfSync(ctx context.Context, conn *sq
115119
}
116120

117121
func (a *aggregateProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
118-
var (
119-
err error
120-
encodedState = []byte{'{', '}'}
121-
)
122-
if a.projectionStateEncoder != nil {
123-
encodedState, err = a.projectionStateEncoder(state.ProjectionState)
124-
if err != nil {
125-
return err
126-
}
122+
encodedState, err := a.projectionStateEncoder(state.ProjectionState)
123+
if err != nil {
124+
return err
127125
}
128126

129127
_, err = conn.ExecContext(context.Background(), a.queryPersistState, notification.AggregateID, state.Position, encodedState)
@@ -191,8 +189,8 @@ func (a *aggregateProjectionStorage) Acquire(
191189
// Set the projection as row locked
192190
_, err := conn.ExecContext(ctx, a.querySetRowLocked, aggregateID, true)
193191
if err != nil {
194-
if err := a.releaseProjection(conn, aggregateID); err != nil {
195-
logger.WithError(err).Error("failed to release lock while setting projection rows as locked")
192+
if releaseErr := a.releaseProjection(conn, aggregateID); releaseErr != nil {
193+
logger.WithError(releaseErr).Error("failed to release lock while setting projection rows as locked")
196194
} else {
197195
logger.Debug("failed to set projection as locked")
198196
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// +build unit
2+
3+
package postgres
4+
5+
import (
6+
"context"
7+
"database/sql"
8+
"testing"
9+
10+
sqlmock "github.com/DATA-DOG/go-sqlmock"
11+
"github.com/hellofresh/goengine"
12+
driverSQL "github.com/hellofresh/goengine/driver/sql"
13+
"github.com/hellofresh/goengine/driver/sql/internal/test"
14+
"github.com/pkg/errors"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestAggregateProjectionStorage_PersistState(t *testing.T) {
20+
mockedProjectionState := "I'm a projection state"
21+
notification := &driverSQL.ProjectionNotification{
22+
No: 1,
23+
AggregateID: "d8490e85-dd22-4c32-9cb0-a29e57949951",
24+
}
25+
projectionState := driverSQL.ProjectionState{
26+
Position: 5,
27+
ProjectionState: mockedProjectionState,
28+
}
29+
30+
test.RunWithMockDB(t, "Persist projection state", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
31+
stateEncoderCalls := 0
32+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
33+
assert.Equal(t, mockedProjectionState, i)
34+
35+
stateEncoderCalls++
36+
return []byte(mockedProjectionState), nil
37+
}
38+
39+
conn, err := db.Conn(context.Background())
40+
require.NoError(t, err)
41+
42+
storage, err := newAggregateProjectionStorage("store_table", "projection_table", stateEncoder, goengine.NopLogger)
43+
require.NoError(t, err)
44+
45+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
46+
WithArgs(notification.AggregateID, 5, []byte(mockedProjectionState)).
47+
WillReturnResult(sqlmock.NewResult(18, 1))
48+
49+
err = storage.PersistState(conn, notification, projectionState)
50+
assert.NoError(t, err)
51+
assert.Equal(t, 1, stateEncoderCalls)
52+
})
53+
54+
test.RunWithMockDB(t, "Persist projection state (using default encoder)", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
55+
projectionState := driverSQL.ProjectionState{
56+
Position: 6,
57+
ProjectionState: nil,
58+
}
59+
60+
conn, err := db.Conn(context.Background())
61+
require.NoError(t, err)
62+
63+
storage, err := newAggregateProjectionStorage("store_table", "projection_table", nil, goengine.NopLogger)
64+
require.NoError(t, err)
65+
66+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
67+
WithArgs(notification.AggregateID, 6, []byte("{}")).
68+
WillReturnResult(sqlmock.NewResult(18, 1))
69+
70+
err = storage.PersistState(conn, notification, projectionState)
71+
assert.NoError(t, err)
72+
})
73+
74+
test.RunWithMockDB(t, "Fail when state encoding fails", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
75+
stateEncoderErr := errors.New("Failed to encode")
76+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
77+
return nil, stateEncoderErr
78+
}
79+
80+
conn, err := db.Conn(context.Background())
81+
require.NoError(t, err)
82+
83+
storage, err := newAggregateProjectionStorage("store_table", "projection_table", stateEncoder, goengine.NopLogger)
84+
require.NoError(t, err)
85+
86+
err = storage.PersistState(conn, notification, projectionState)
87+
assert.Equal(t, stateEncoderErr, err)
88+
})
89+
}

driver/sql/postgres/projector_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,14 @@ func NewStreamProjector(
6868
stateEncoder = saga.EncodeState
6969
}
7070

71+
storage, err := newStreamProjectionStorage(projection.Name(), projectionTable, stateEncoder, logger)
72+
if err != nil {
73+
return nil, err
74+
}
75+
7176
executor, err := internalSQL.NewNotificationProjector(
7277
db,
73-
newStreamProjectionStorage(projection.Name(), projectionTable, stateEncoder, logger),
78+
storage,
7479
stateDecoder,
7580
projection.Handlers(),
7681
streamProjectionEventStreamLoader(eventStore, projection.FromStream()),

driver/sql/postgres/projector_stream_storage.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
"strings"
89

910
"github.com/hellofresh/goengine"
1011

@@ -37,10 +38,25 @@ func newStreamProjectionStorage(
3738
projectionTable string,
3839
projectionStateEncoder driverSQL.ProjectionStateEncoder,
3940
logger goengine.Logger,
40-
) *streamProjectionStorage {
41+
) (*streamProjectionStorage, error) {
42+
switch {
43+
case strings.TrimSpace(projectionName) == "":
44+
return nil, goengine.InvalidArgumentError("projectionName")
45+
case strings.TrimSpace(projectionTable) == "":
46+
return nil, goengine.InvalidArgumentError("projectionTable")
47+
}
48+
49+
if logger == nil {
50+
logger = goengine.NopLogger
51+
}
52+
if projectionStateEncoder == nil {
53+
projectionStateEncoder = defaultProjectionStateEncoder
54+
}
55+
4156
projectionTableQuoted := QuoteIdentifier(projectionTable)
4257
projectionTableStr := QuoteString(projectionTable)
4358

59+
/* #nosec */
4460
return &streamProjectionStorage{
4561
projectionName: projectionName,
4662
projectionStateEncoder: projectionStateEncoder,
@@ -69,19 +85,13 @@ func newStreamProjectionStorage(
6985
`UPDATE ONLY %[1]s SET locked = $2 WHERE name = $1`,
7086
projectionTableQuoted,
7187
),
72-
}
88+
}, nil
7389
}
7490

7591
func (s *streamProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
76-
var (
77-
err error
78-
encodedState = []byte{'{', '}'}
79-
)
80-
if s.projectionStateEncoder != nil {
81-
encodedState, err = s.projectionStateEncoder(state.ProjectionState)
82-
if err != nil {
83-
return err
84-
}
92+
encodedState, err := s.projectionStateEncoder(state.ProjectionState)
93+
if err != nil {
94+
return err
8595
}
8696

8797
_, err = conn.ExecContext(context.Background(), s.queryPersistState, state.Position, encodedState, s.projectionName)
@@ -146,8 +156,8 @@ func (s *streamProjectionStorage) Acquire(
146156
// Set the projection as row locked
147157
_, err := conn.ExecContext(ctx, s.querySetRowLocked, s.projectionName, true)
148158
if err != nil {
149-
if err := s.releaseProjectionLock(conn); err != nil {
150-
logger.WithError(err).Error("failed to release lock while setting projection row as locked")
159+
if releaseErr := s.releaseProjectionLock(conn); releaseErr != nil {
160+
logger.WithError(releaseErr).Error("failed to release lock while setting projection row as locked")
151161
} else {
152162
logger.Debug("failed to set projection as locked")
153163
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// +build unit
2+
3+
package postgres
4+
5+
import (
6+
"context"
7+
"database/sql"
8+
"errors"
9+
"testing"
10+
11+
sqlmock "github.com/DATA-DOG/go-sqlmock"
12+
"github.com/hellofresh/goengine"
13+
driverSQL "github.com/hellofresh/goengine/driver/sql"
14+
"github.com/hellofresh/goengine/driver/sql/internal/test"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestStreamProjectionStorage_PersistState(t *testing.T) {
20+
mockedProjectionState := "I'm a projection state"
21+
notification := &driverSQL.ProjectionNotification{
22+
No: 1,
23+
AggregateID: "d8490e85-dd22-4c32-9cb0-a29e57949951",
24+
}
25+
projectionState := driverSQL.ProjectionState{
26+
Position: 5,
27+
ProjectionState: mockedProjectionState,
28+
}
29+
30+
test.RunWithMockDB(t, "Persist projection state", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
31+
stateEncoderCalls := 0
32+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
33+
assert.Equal(t, mockedProjectionState, i)
34+
35+
stateEncoderCalls++
36+
return []byte(mockedProjectionState), nil
37+
}
38+
39+
conn, err := db.Conn(context.Background())
40+
require.NoError(t, err)
41+
42+
storage, err := newStreamProjectionStorage("my_projection", "projection_table", stateEncoder, goengine.NopLogger)
43+
require.NoError(t, err)
44+
45+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
46+
WithArgs(5, []byte(mockedProjectionState), "my_projection").
47+
WillReturnResult(sqlmock.NewResult(18, 1))
48+
49+
err = storage.PersistState(conn, notification, projectionState)
50+
assert.NoError(t, err)
51+
assert.Equal(t, 1, stateEncoderCalls)
52+
})
53+
54+
test.RunWithMockDB(t, "Persist projection state (using default encoder)", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
55+
projectionState := driverSQL.ProjectionState{
56+
Position: 6,
57+
ProjectionState: nil,
58+
}
59+
60+
conn, err := db.Conn(context.Background())
61+
require.NoError(t, err)
62+
63+
storage, err := newStreamProjectionStorage("my_projection", "projection_table", nil, goengine.NopLogger)
64+
require.NoError(t, err)
65+
66+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
67+
WithArgs(6, []byte("{}"), "my_projection").
68+
WillReturnResult(sqlmock.NewResult(18, 1))
69+
70+
err = storage.PersistState(conn, notification, projectionState)
71+
assert.NoError(t, err)
72+
})
73+
74+
test.RunWithMockDB(t, "Fail when state encoding fails", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
75+
stateEncoderErr := errors.New("Failed to encode")
76+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
77+
return nil, stateEncoderErr
78+
}
79+
80+
conn, err := db.Conn(context.Background())
81+
require.NoError(t, err)
82+
83+
storage, err := newStreamProjectionStorage("my_projection", "projection_table", stateEncoder, goengine.NopLogger)
84+
require.NoError(t, err)
85+
86+
err = storage.PersistState(conn, notification, projectionState)
87+
assert.Equal(t, stateEncoderErr, err)
88+
})
89+
90+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// +build unit
2+
3+
package postgres
4+
5+
import (
6+
"fmt"
7+
"github.com/hellofresh/goengine"
8+
"github.com/stretchr/testify/assert"
9+
"testing"
10+
)
11+
12+
func TestDefaultProjectionStateEncoder(t *testing.T) {
13+
t.Run("Only accept nil values as valid", func(t *testing.T) {
14+
res, err := defaultProjectionStateEncoder(nil)
15+
16+
assert.Equal(t, []byte{'{', '}'}, res)
17+
assert.NoError(t, err)
18+
})
19+
20+
t.Run("Reject any state this not nil", func(t *testing.T) {
21+
var pointer *goengine.Projection
22+
testCases := []interface{}{
23+
struct{}{},
24+
pointer,
25+
"",
26+
0,
27+
}
28+
29+
for i, v := range testCases {
30+
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
31+
res, err := defaultProjectionStateEncoder(v)
32+
33+
assert.Error(t, err)
34+
assert.Nil(t, res)
35+
})
36+
}
37+
})
38+
}

0 commit comments

Comments
 (0)