Skip to content

Commit 56c13df

Browse files
Added tests for PersistState
By adding tests to the `PersistState` we can avoid regressions.
1 parent 9d82db3 commit 56c13df

File tree

5 files changed

+201
-7
lines changed

5 files changed

+201
-7
lines changed

driver/sql/postgres/projector_aggregate_storage.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func newAggregateProjectionStorage(
6161
projectionTableStr := QuoteString(projectionTable)
6262
eventStoreTableQuoted := QuoteIdentifier(eventStoreTable)
6363

64+
/* #nosec */
6465
return &aggregateProjectionStorage{
6566
projectionStateEncoder: projectionStateEncoder,
6667
logger: logger,
@@ -188,8 +189,8 @@ func (a *aggregateProjectionStorage) Acquire(
188189
// Set the projection as row locked
189190
_, err := conn.ExecContext(ctx, a.querySetRowLocked, aggregateID, true)
190191
if err != nil {
191-
if err := a.releaseProjection(conn, aggregateID); err != nil {
192-
logger.WithError(err).Error("failed to release lock while setting projection rows as locked")
192+
if releaseErr := a.releaseProjection(conn, aggregateID); releaseErr != nil {
193+
logger.WithError(releaseErr).Error("failed to release lock while setting projection rows as locked")
193194
} else {
194195
logger.Debug("failed to set projection as locked")
195196
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// +build unit
2+
3+
package postgres
4+
5+
import (
6+
"context"
7+
"database/sql"
8+
"testing"
9+
10+
sqlmock "github.com/DATA-DOG/go-sqlmock"
11+
"github.com/hellofresh/goengine"
12+
driverSQL "github.com/hellofresh/goengine/driver/sql"
13+
"github.com/hellofresh/goengine/driver/sql/internal/test"
14+
"github.com/pkg/errors"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestAggregateProjectionStorage_PersistState(t *testing.T) {
20+
mockedProjectionState := "I'm a projection state"
21+
notification := &driverSQL.ProjectionNotification{
22+
No: 1,
23+
AggregateID: "d8490e85-dd22-4c32-9cb0-a29e57949951",
24+
}
25+
projectionState := driverSQL.ProjectionState{
26+
Position: 5,
27+
ProjectionState: mockedProjectionState,
28+
}
29+
30+
test.RunWithMockDB(t, "Persist projection state", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
31+
stateEncoderCalls := 0
32+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
33+
assert.Equal(t, mockedProjectionState, i)
34+
35+
stateEncoderCalls++
36+
return []byte(mockedProjectionState), nil
37+
}
38+
39+
conn, err := db.Conn(context.Background())
40+
require.NoError(t, err)
41+
42+
storage, err := newAggregateProjectionStorage("store_table", "projection_table", stateEncoder, goengine.NopLogger)
43+
require.NoError(t, err)
44+
45+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
46+
WithArgs(notification.AggregateID, 5, []byte(mockedProjectionState)).
47+
WillReturnResult(sqlmock.NewResult(18, 1))
48+
49+
err = storage.PersistState(conn, notification, projectionState)
50+
assert.NoError(t, err)
51+
assert.Equal(t, 1, stateEncoderCalls)
52+
})
53+
54+
test.RunWithMockDB(t, "Persist projection state (using default encoder)", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
55+
projectionState := driverSQL.ProjectionState{
56+
Position: 6,
57+
ProjectionState: nil,
58+
}
59+
60+
conn, err := db.Conn(context.Background())
61+
require.NoError(t, err)
62+
63+
storage, err := newAggregateProjectionStorage("store_table", "projection_table", nil, goengine.NopLogger)
64+
require.NoError(t, err)
65+
66+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
67+
WithArgs(notification.AggregateID, 6, []byte("{}")).
68+
WillReturnResult(sqlmock.NewResult(18, 1))
69+
70+
err = storage.PersistState(conn, notification, projectionState)
71+
assert.NoError(t, err)
72+
})
73+
74+
test.RunWithMockDB(t, "Fail when state encoding fails", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
75+
stateEncoderErr := errors.New("Failed to encode")
76+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
77+
return nil, stateEncoderErr
78+
}
79+
80+
conn, err := db.Conn(context.Background())
81+
require.NoError(t, err)
82+
83+
storage, err := newAggregateProjectionStorage("store_table", "projection_table", stateEncoder, goengine.NopLogger)
84+
require.NoError(t, err)
85+
86+
err = storage.PersistState(conn, notification, projectionState)
87+
assert.Equal(t, stateEncoderErr, err)
88+
})
89+
}

driver/sql/postgres/projector_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,14 @@ func NewStreamProjector(
6868
stateEncoder = saga.EncodeState
6969
}
7070

71+
storage, err := newStreamProjectionStorage(projection.Name(), projectionTable, stateEncoder, logger)
72+
if err != nil {
73+
return nil, err
74+
}
75+
7176
executor, err := internalSQL.NewNotificationProjector(
7277
db,
73-
newStreamProjectionStorage(projection.Name(), projectionTable, stateEncoder, logger),
78+
storage,
7479
stateDecoder,
7580
projection.Handlers(),
7681
streamProjectionEventStreamLoader(eventStore, projection.FromStream()),

driver/sql/postgres/projector_stream_storage.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
"strings"
89

910
"github.com/hellofresh/goengine"
1011

@@ -37,7 +38,14 @@ func newStreamProjectionStorage(
3738
projectionTable string,
3839
projectionStateEncoder driverSQL.ProjectionStateEncoder,
3940
logger goengine.Logger,
40-
) *streamProjectionStorage {
41+
) (*streamProjectionStorage, error) {
42+
switch {
43+
case strings.TrimSpace(projectionName) == "":
44+
return nil, goengine.InvalidArgumentError("projectionName")
45+
case strings.TrimSpace(projectionTable) == "":
46+
return nil, goengine.InvalidArgumentError("projectionTable")
47+
}
48+
4149
if logger == nil {
4250
logger = goengine.NopLogger
4351
}
@@ -48,6 +56,7 @@ func newStreamProjectionStorage(
4856
projectionTableQuoted := QuoteIdentifier(projectionTable)
4957
projectionTableStr := QuoteString(projectionTable)
5058

59+
/* #nosec */
5160
return &streamProjectionStorage{
5261
projectionName: projectionName,
5362
projectionStateEncoder: projectionStateEncoder,
@@ -76,7 +85,7 @@ func newStreamProjectionStorage(
7685
`UPDATE ONLY %[1]s SET locked = $2 WHERE name = $1`,
7786
projectionTableQuoted,
7887
),
79-
}
88+
}, nil
8089
}
8190

8291
func (s *streamProjectionStorage) PersistState(conn *sql.Conn, notification *driverSQL.ProjectionNotification, state driverSQL.ProjectionState) error {
@@ -147,8 +156,8 @@ func (s *streamProjectionStorage) Acquire(
147156
// Set the projection as row locked
148157
_, err := conn.ExecContext(ctx, s.querySetRowLocked, s.projectionName, true)
149158
if err != nil {
150-
if err := s.releaseProjectionLock(conn); err != nil {
151-
logger.WithError(err).Error("failed to release lock while setting projection row as locked")
159+
if releaseErr := s.releaseProjectionLock(conn); releaseErr != nil {
160+
logger.WithError(releaseErr).Error("failed to release lock while setting projection row as locked")
152161
} else {
153162
logger.Debug("failed to set projection as locked")
154163
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// +build unit
2+
3+
package postgres
4+
5+
import (
6+
"context"
7+
"database/sql"
8+
"errors"
9+
"testing"
10+
11+
sqlmock "github.com/DATA-DOG/go-sqlmock"
12+
"github.com/hellofresh/goengine"
13+
driverSQL "github.com/hellofresh/goengine/driver/sql"
14+
"github.com/hellofresh/goengine/driver/sql/internal/test"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestStreamProjectionStorage_PersistState(t *testing.T) {
20+
mockedProjectionState := "I'm a projection state"
21+
notification := &driverSQL.ProjectionNotification{
22+
No: 1,
23+
AggregateID: "d8490e85-dd22-4c32-9cb0-a29e57949951",
24+
}
25+
projectionState := driverSQL.ProjectionState{
26+
Position: 5,
27+
ProjectionState: mockedProjectionState,
28+
}
29+
30+
test.RunWithMockDB(t, "Persist projection state", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
31+
stateEncoderCalls := 0
32+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
33+
assert.Equal(t, mockedProjectionState, i)
34+
35+
stateEncoderCalls++
36+
return []byte(mockedProjectionState), nil
37+
}
38+
39+
conn, err := db.Conn(context.Background())
40+
require.NoError(t, err)
41+
42+
storage, err := newStreamProjectionStorage("my_projection", "projection_table", stateEncoder, goengine.NopLogger)
43+
require.NoError(t, err)
44+
45+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
46+
WithArgs(5, []byte(mockedProjectionState), "my_projection").
47+
WillReturnResult(sqlmock.NewResult(18, 1))
48+
49+
err = storage.PersistState(conn, notification, projectionState)
50+
assert.NoError(t, err)
51+
assert.Equal(t, 1, stateEncoderCalls)
52+
})
53+
54+
test.RunWithMockDB(t, "Persist projection state (using default encoder)", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
55+
projectionState := driverSQL.ProjectionState{
56+
Position: 6,
57+
ProjectionState: nil,
58+
}
59+
60+
conn, err := db.Conn(context.Background())
61+
require.NoError(t, err)
62+
63+
storage, err := newStreamProjectionStorage("my_projection", "projection_table", nil, goengine.NopLogger)
64+
require.NoError(t, err)
65+
66+
dbMock.ExpectExec("^UPDATE \"projection_table\" SET").
67+
WithArgs(6, []byte("{}"), "my_projection").
68+
WillReturnResult(sqlmock.NewResult(18, 1))
69+
70+
err = storage.PersistState(conn, notification, projectionState)
71+
assert.NoError(t, err)
72+
})
73+
74+
test.RunWithMockDB(t, "Fail when state encoding fails", func(t *testing.T, db *sql.DB, dbMock sqlmock.Sqlmock) {
75+
stateEncoderErr := errors.New("Failed to encode")
76+
stateEncoder := func(i interface{}) (bytes []byte, e error) {
77+
return nil, stateEncoderErr
78+
}
79+
80+
conn, err := db.Conn(context.Background())
81+
require.NoError(t, err)
82+
83+
storage, err := newStreamProjectionStorage("my_projection", "projection_table", stateEncoder, goengine.NopLogger)
84+
require.NoError(t, err)
85+
86+
err = storage.PersistState(conn, notification, projectionState)
87+
assert.Equal(t, stateEncoderErr, err)
88+
})
89+
90+
}

0 commit comments

Comments
 (0)