Skip to content

Commit 87fc02c

Browse files
authored
Merge pull request #50 from hellofresh/patch/aggregate_id
Move event stream aggregate_type and aggregate_id into their own columns with proper data types
2 parents 048954f + e239b4c commit 87fc02c

File tree

9 files changed

+168
-75
lines changed

9 files changed

+168
-75
lines changed

driver/sql/persistence_strategy.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package sql
22

3-
import "github.com/hellofresh/goengine"
3+
import (
4+
"github.com/hellofresh/goengine"
5+
"github.com/hellofresh/goengine/metadata"
6+
)
47

58
// PersistenceStrategy interface describes strategy of persisting messages in the database
69
type PersistenceStrategy interface {
710
CreateSchema(tableName string) []string
8-
ColumnNames() []string
11+
// EventColumnsNames represent the event store columns selected from the event stream table. Used by PrepareSearch
12+
EventColumnNames() []string
13+
// InsertColumnNames represent the ordered event store columns that are used to insert data into the event stream.
14+
InsertColumnNames() []string
915
PrepareData([]goengine.Message) ([]interface{}, error)
16+
PrepareSearch(metadata.Matcher) ([]byte, []interface{})
1017
GenerateTableName(streamName goengine.StreamName) (string, error)
1118
}

driver/sql/postgres/advisory_lock_aggregate_projection_storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ func NewAdvisoryLockAggregateProjectionStorage(
6060

6161
queryOutOfSyncProjections: fmt.Sprintf(
6262
`WITH aggregate_position AS (
63-
SELECT e.metadata ->> '_aggregate_id' AS aggregate_id, MAX(e.no) AS no
63+
SELECT e.aggregate_id, MAX(e.no) AS no
6464
FROM %[1]s AS e
6565
GROUP BY aggregate_id
6666
)
6767
SELECT a.aggregate_id, a.no FROM aggregate_position AS a
68-
LEFT JOIN %[2]s AS p ON p.aggregate_id::text = a.aggregate_id
68+
LEFT JOIN %[2]s AS p ON p.aggregate_id = a.aggregate_id
6969
WHERE p.aggregate_id IS NULL OR (a.no > p.position)`,
7070
eventStoreTableQuoted,
7171
projectionTableQuoted,

driver/sql/postgres/eventstore.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ type EventStore struct {
3131
persistenceStrategy driverSQL.PersistenceStrategy
3232
db *sql.DB
3333
messageFactory driverSQL.MessageFactory
34-
columns string
34+
insertColumns string
3535
columnCount int
36+
eventColumns string
3637
logger goengine.Logger
3738
}
3839

@@ -55,17 +56,25 @@ func NewEventStore(
5556
logger = goengine.NopLogger
5657
}
5758

58-
columns := persistenceStrategy.ColumnNames()
59-
for i, c := range persistenceStrategy.ColumnNames() {
60-
columns[i] = QuoteIdentifier(c)
59+
columns := persistenceStrategy.InsertColumnNames()
60+
insertColumns := make([]string, len(columns))
61+
for i, c := range columns {
62+
insertColumns[i] = QuoteIdentifier(c)
63+
}
64+
65+
columns = persistenceStrategy.EventColumnNames()
66+
selectColumns := make([]string, len(columns))
67+
for i, c := range columns {
68+
selectColumns[i] = QuoteIdentifier(c)
6169
}
6270

6371
return &EventStore{
6472
persistenceStrategy: persistenceStrategy,
6573
db: db,
6674
messageFactory: messageFactory,
67-
columns: strings.Join(columns, ", "),
68-
columnCount: len(columns),
75+
insertColumns: strings.Join(insertColumns, ", "),
76+
columnCount: len(insertColumns),
77+
eventColumns: strings.Join(selectColumns, ", "),
6978
logger: logger,
7079
}, nil
7180
}
@@ -166,27 +175,19 @@ func (e *EventStore) loadQuery(
166175
selectQuery := make([]byte, 0, 196)
167176
params := make([]interface{}, 0, 4)
168177

169-
selectQuery = append(selectQuery, "SELECT * FROM "...)
178+
selectQuery = append(selectQuery, "SELECT "...)
179+
selectQuery = append(selectQuery, e.eventColumns...)
180+
selectQuery = append(selectQuery, " FROM "...)
170181
selectQuery = append(selectQuery, tableName...)
171182

172183
// Add conditions to the select query
173184
selectQuery = append(selectQuery, " WHERE no >= $1"...)
174185
params = append(params, fromNumber)
175186

176187
if matcher != nil {
177-
paramCount := 1
178-
matcher.Iterate(func(c metadata.Constraint) {
179-
paramCount++
180-
params = append(params, c.Value())
181-
182-
// We are doing "metadata ->> %s %s $%d" with a possible AND
183-
selectQuery = append(selectQuery, " AND metadata ->> "...)
184-
selectQuery = append(selectQuery, QuoteString(c.Field())...)
185-
selectQuery = append(selectQuery, ' ')
186-
selectQuery = append(selectQuery, c.Operator()...)
187-
selectQuery = append(selectQuery, " $"...)
188-
selectQuery = append(selectQuery, strconv.Itoa(paramCount)...)
189-
})
188+
searchPart, searchParams := e.persistenceStrategy.PrepareSearch(matcher)
189+
selectQuery = append(selectQuery, searchPart...)
190+
params = append(params, searchParams...)
190191
}
191192
selectQuery = append(selectQuery, " ORDER BY no "...)
192193
if count != nil {
@@ -224,11 +225,11 @@ func (e *EventStore) AppendToWithExecer(ctx context.Context, conn driverSQL.Exec
224225
return err
225226
}
226227

227-
insertQuery := make([]byte, 0, 35+len(e.columns)+(e.columnCount*2)+(eventCount*3))
228+
insertQuery := make([]byte, 0, 35+len(e.insertColumns)+(e.columnCount*2)+(eventCount*3))
228229
insertQuery = append(insertQuery, "INSERT INTO "...)
229230
insertQuery = append(insertQuery, tableName...)
230231
insertQuery = append(insertQuery, " ("...)
231-
insertQuery = append(insertQuery, e.columns...)
232+
insertQuery = append(insertQuery, e.insertColumns...)
232233
insertQuery = append(insertQuery, ") VALUES "...)
233234
for i := 0; i < eventCount; i++ {
234235
if i != 0 {

driver/sql/postgres/eventstore_test.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ func TestEventStore_Create(t *testing.T) {
126126
defer ctrl.Finish()
127127

128128
strategy := mockSQL.NewPersistenceStrategy(ctrl)
129-
strategy.EXPECT().ColumnNames().Return([]string{}).AnyTimes()
129+
strategy.EXPECT().InsertColumnNames().Return([]string{}).AnyTimes()
130+
strategy.EXPECT().EventColumnNames().Return([]string{}).AnyTimes()
130131
strategy.EXPECT().GenerateTableName(goengine.StreamName("orders")).Return("events_orders", nil).AnyTimes()
131132
strategy.EXPECT().CreateSchema("events_orders").Return([]string{}).AnyTimes()
132133

@@ -184,7 +185,7 @@ func TestEventStore_AppendTo(t *testing.T) {
184185

185186
payloadConverter, messages := mockMessages(ctrl)
186187

187-
dbMock.ExpectExec(`INSERT(.+)VALUES \(\$1,\$2,\$3,\$4,\$5\),\(\$6,\$7,\$8,\$9,\$10\),\(\$11(.+)`).
188+
dbMock.ExpectExec(`INSERT(.+)VALUES \(\$1,\$2,\$3,\$4,\$5\,\$6,\$7,\$8\),\(\$9,\$10,\$11,\$12,\$13,\$14,\$15,\$16\),\(\$17(.+)`).
188189
WillReturnResult(sqlmock.NewResult(111, 3))
189190

190191
eventStore := createEventStore(t, db, payloadConverter)
@@ -230,7 +231,8 @@ func TestEventStore_AppendTo(t *testing.T) {
230231
persistenceStrategy := mockSQL.NewPersistenceStrategy(ctrl)
231232
persistenceStrategy.EXPECT().PrepareData(messages).Return(nil, expectedError).AnyTimes()
232233
persistenceStrategy.EXPECT().GenerateTableName(goengine.StreamName("orders")).Return("events_orders", nil).AnyTimes()
233-
persistenceStrategy.EXPECT().ColumnNames().Return([]string{"event_id", "event_name"}).AnyTimes()
234+
persistenceStrategy.EXPECT().InsertColumnNames().Return([]string{"event_id", "event_name"}).AnyTimes()
235+
persistenceStrategy.EXPECT().EventColumnNames().Return([]string{"event_id", "event_name"}).AnyTimes()
234236

235237
store, err := postgres.NewEventStore(persistenceStrategy, db, &mockSQL.MessageFactory{}, nil)
236238
require.NoError(t, err)
@@ -262,7 +264,7 @@ func TestEventStore_Load(t *testing.T) {
262264
m = metadata.WithConstraint(m, "version", metadata.LowerThan, 100)
263265
return m
264266
},
265-
`SELECT \* FROM event_stream WHERE no >= \$1 AND metadata ->> 'version' > \$2 AND metadata ->> 'version' < \$3 ORDER BY no`,
267+
`SELECT "no", "payload", "metadata" FROM event_stream WHERE no >= \$1 AND version > \$2 AND version < \$3 ORDER BY no`,
266268
},
267269
{
268270
"Without matcher",
@@ -271,7 +273,7 @@ func TestEventStore_Load(t *testing.T) {
271273
func() metadata.Matcher {
272274
return nil
273275
},
274-
`SELECT \* FROM event_stream WHERE no >= \$1 ORDER BY no`,
276+
`SELECT "no", "payload", "metadata" FROM event_stream WHERE no >= \$1 ORDER BY no`,
275277
},
276278
{
277279
"With limit",
@@ -280,7 +282,7 @@ func TestEventStore_Load(t *testing.T) {
280282
func() metadata.Matcher {
281283
return nil
282284
},
283-
`SELECT \* FROM event_stream WHERE no >= \$1 ORDER BY no LIMIT 50`,
285+
`SELECT "no", "payload", "metadata" FROM event_stream WHERE no >= \$1 ORDER BY no LIMIT 50`,
284286
},
285287
}
286288

@@ -289,6 +291,8 @@ func TestEventStore_Load(t *testing.T) {
289291
ctrl := gomock.NewController(t)
290292
defer ctrl.Finish()
291293

294+
matcher := testCase.matcher()
295+
292296
expectedStream := &mocks.EventStream{}
293297

294298
dbMock.ExpectQuery(testCase.expectedQuery).WillReturnRows(sqlmock.NewRows(columns))
@@ -297,7 +301,15 @@ func TestEventStore_Load(t *testing.T) {
297301
factory.EXPECT().CreateEventStream(gomock.AssignableToTypeOf(&sql.Rows{})).Return(expectedStream, nil).Times(1)
298302

299303
strategy := mockSQL.NewPersistenceStrategy(ctrl)
300-
strategy.EXPECT().ColumnNames().Return(columns).AnyTimes()
304+
305+
if matcher != nil {
306+
strategy.EXPECT().PrepareSearch(matcher).Return([]byte(" AND version > $2 AND version < $3"), []interface{}{1, 100}).Times(1)
307+
} else {
308+
strategy.EXPECT().PrepareSearch(matcher).Return([]byte{}, []interface{}{}).AnyTimes()
309+
}
310+
311+
strategy.EXPECT().InsertColumnNames().Return([]string{}).AnyTimes()
312+
strategy.EXPECT().EventColumnNames().Return(columns).AnyTimes()
301313
strategy.EXPECT().GenerateTableName(goengine.StreamName("event_stream")).Return("event_stream", nil).AnyTimes()
302314

303315
store, err := postgres.NewEventStore(strategy, db, factory, nil)
@@ -308,7 +320,7 @@ func TestEventStore_Load(t *testing.T) {
308320
"event_stream",
309321
testCase.fromNumber,
310322
testCase.count,
311-
testCase.matcher(),
323+
matcher,
312324
)
313325

314326
assert.NoError(t, err)
@@ -318,7 +330,8 @@ func TestEventStore_Load(t *testing.T) {
318330
})
319331

320332
t.Run("persistent strategy failures", func(t *testing.T) {
321-
columns := []string{"no", "payload"}
333+
insertColumns := []string{"no", "payload", "aggregate_id"}
334+
eventColumns := []string{"no", "payload"}
322335

323336
testCases := []struct {
324337
title string
@@ -329,7 +342,8 @@ func TestEventStore_Load(t *testing.T) {
329342
"Empty table name returned",
330343
func(ctrl *gomock.Controller) *mockSQL.PersistenceStrategy {
331344
strategy := mockSQL.NewPersistenceStrategy(ctrl)
332-
strategy.EXPECT().ColumnNames().Return(columns).AnyTimes()
345+
strategy.EXPECT().InsertColumnNames().Return(insertColumns).AnyTimes()
346+
strategy.EXPECT().EventColumnNames().Return(eventColumns).AnyTimes()
333347
strategy.EXPECT().GenerateTableName(goengine.StreamName("event_stream")).
334348
Return("", nil).AnyTimes()
335349
return strategy
@@ -340,7 +354,8 @@ func TestEventStore_Load(t *testing.T) {
340354
"Empty table name returned",
341355
func(ctrl *gomock.Controller) *mockSQL.PersistenceStrategy {
342356
strategy := mockSQL.NewPersistenceStrategy(ctrl)
343-
strategy.EXPECT().ColumnNames().Return(columns).AnyTimes()
357+
strategy.EXPECT().InsertColumnNames().Return(insertColumns).AnyTimes()
358+
strategy.EXPECT().EventColumnNames().Return(eventColumns).AnyTimes()
344359
strategy.EXPECT().GenerateTableName(goengine.StreamName("event_stream")).
345360
Return("", errors.New("failed gen")).AnyTimes()
346361
return strategy
@@ -419,7 +434,7 @@ func BenchmarkEventStore_AppendToWithExecer(b *testing.B) {
419434
dbExecer.EXPECT().ExecContext(ctx, gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
420435

421436
persistenceStrategy := mockSQL.NewPersistenceStrategy(ctrl)
422-
persistenceStrategy.EXPECT().ColumnNames().Return([]string{"event_id", "event_name", "payload", "metadata", "created_at"}).AnyTimes()
437+
persistenceStrategy.EXPECT().InsertColumnNames().Return([]string{"event_id", "event_name", "payload", "metadata", "created_at"}).AnyTimes()
423438
persistenceStrategy.EXPECT().GenerateTableName(goengine.StreamName("hello")).Return("hello", nil).AnyTimes()
424439
persistenceStrategy.EXPECT().PrepareData(gomock.Any()).Return([]interface{}{
425440
"event_id", "event_name", "payload", "metadata", "created_at",
@@ -461,7 +476,7 @@ func BenchmarkEventStore_LoadWithConnection(b *testing.B) {
461476
dbQueryer.EXPECT().QueryContext(ctx, gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
462477

463478
persistenceStrategy := mockSQL.NewPersistenceStrategy(ctrl)
464-
persistenceStrategy.EXPECT().ColumnNames().Return([]string{"event_id", "event_name", "payload", "metadata", "created_at"}).AnyTimes()
479+
persistenceStrategy.EXPECT().InsertColumnNames().Return([]string{"event_id", "event_name", "payload", "metadata", "created_at"}).AnyTimes()
465480
persistenceStrategy.EXPECT().GenerateTableName(goengine.StreamName("hello")).Return("hello", nil).AnyTimes()
466481
require.NoError(b, err)
467482

driver/sql/projection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import (
55
"database/sql"
66
"time"
77

8-
"github.com/hellofresh/goengine"
98
"github.com/mailru/easyjson/jlexer"
109
"github.com/pkg/errors"
10+
11+
"github.com/hellofresh/goengine"
1112
)
1213

1314
type (

mocks/driver/sql/persistence_strategy.go

Lines changed: 42 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)