Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit 15a4591

Browse files
Merge pull request #7 from vimeda/allow-matcher-lookup-for-event-name
allow matcher to lookup in column event_name
2 parents 1c96054 + ce32840 commit 15a4591

File tree

5 files changed

+21
-16
lines changed

5 files changed

+21
-16
lines changed

example/postgres/singlethread/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/hellofresh/goengine"
88
"github.com/hellofresh/goengine/strategy/json"
99
"github.com/hellofresh/goengine/strategy/json/sql/postgres"
10-
"github.com/hellofresh/goengine/strategy/json/sql/postgres/strategy"
1110
)
1211

1312
func main() {
@@ -19,7 +18,7 @@ func main() {
1918
defer db.Close()
2019

2120
transformer := json.NewPayloadTransformer()
22-
strategy, err := strategy.NewSingleStreamStrategy(transformer)
21+
strategy, err := postgres.NewSingleStreamStrategy(transformer)
2322
failOnErr(err)
2423

2524
manager, err := postgres.NewStreamManager(db, transformer, strategy, nil, nil)

strategy/json/sql/postgres/multi_thread_stream_strategy.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ func (s *MultiThreadStreamStrategy) PrepareSearch(matcher metadata.Matcher) ([]b
106106

107107
query = append(query, " AND "...)
108108
switch c.Field() {
109+
case "event_name":
110+
query = append(query, "event_name"...)
109111
case "_aggregate_type":
110112
query = append(query, "aggregate_type"...)
111113
case "_aggregate_id":

strategy/json/sql/postgres/multi_thread_stream_strategy_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/hellofresh/goengine/metadata"
1212
"github.com/hellofresh/goengine/mocks"
1313
"github.com/hellofresh/goengine/strategy/json/internal"
14+
"github.com/hellofresh/goengine/strategy/json/sql/postgres"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
)
@@ -110,7 +111,7 @@ func TestMultiThreadGenerateTableName(t *testing.T) {
110111

111112
for _, testCase := range testCases {
112113
t.Run(testCase.title, func(t *testing.T) {
113-
tableName, err := postgres.GenerateTableName(testCase.input)
114+
tableName, err := strategy.GenerateTableName(testCase.input)
114115

115116
assert.NoError(t, err)
116117
assert.Equal(t, testCase.output, tableName)
@@ -119,7 +120,7 @@ func TestMultiThreadGenerateTableName(t *testing.T) {
119120
})
120121

121122
t.Run("Empty stream names are not supported", func(t *testing.T) {
122-
tableName, err := postgres.GenerateTableName("")
123+
tableName, err := strategy.GenerateTableName("")
123124

124125
asserts := assert.New(t)
125126
asserts.Empty(tableName)
@@ -137,13 +138,13 @@ func TestMultiThreadColumnNames(t *testing.T) {
137138
require.NoError(t, err)
138139

139140
t.Run("get expected columns", func(t *testing.T) {
140-
cols := postgres.InsertColumnNames()
141+
cols := strategy.InsertColumnNames()
141142

142143
assert.Equal(t, cols, expectedColumns)
143144
})
144145

145146
t.Run("cannot modify data", func(t *testing.T) {
146-
assert.Equal(t, postgres.InsertColumnNames(), expectedColumns)
147+
assert.Equal(t, strategy.InsertColumnNames(), expectedColumns)
147148
})
148149
}
149150

@@ -152,7 +153,7 @@ func TestMultiThreadCreateSchema(t *testing.T) {
152153
require.NoError(t, err)
153154

154155
t.Run("output statement elements count", func(t *testing.T) {
155-
cs := postgres.CreateSchema("abc")
156+
cs := strategy.CreateSchema("abc")
156157

157158
assert.Equal(t, 3, len(cs))
158159
assert.Contains(t, cs[0], `CREATE TABLE "abc"`)
@@ -196,7 +197,7 @@ func TestMultiThreadPrepareData(t *testing.T) {
196197
strategy, err := postgres.NewMultiTenancyStreamStrategy(pc)
197198
require.NoError(t, err)
198199

199-
data, err := postgres.PrepareData(messages)
200+
data, err := strategy.PrepareData(messages)
200201

201202
assert.NoError(t, err)
202203
assert.Equal(t, expectedColumns, data)
@@ -225,7 +226,7 @@ func TestMultiThreadPrepareData(t *testing.T) {
225226
strategy, err := postgres.NewMultiTenancyStreamStrategy(pc)
226227
require.NoError(t, err)
227228

228-
data, err := postgres.PrepareData(messages)
229+
data, err := strategy.PrepareData(messages)
229230

230231
assert.Error(t, expectedErr, err)
231232
assert.Nil(t, data)

strategy/json/sql/postgres/single_stream_strategy.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ func (s *SingleStreamStrategy) PrepareSearch(matcher metadata.Matcher) ([]byte,
112112

113113
query = append(query, " AND "...)
114114
switch c.Field() {
115+
case "event_name":
116+
query = append(query, "event_name"...)
115117
case "_aggregate_type":
116118
query = append(query, "aggregate_type"...)
117119
case "_aggregate_id":

strategy/json/sql/postgres/single_stream_strategy_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/hellofresh/goengine/metadata"
1212
"github.com/hellofresh/goengine/mocks"
1313
"github.com/hellofresh/goengine/strategy/json/internal"
14+
"github.com/hellofresh/goengine/strategy/json/sql/postgres"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
)
@@ -110,7 +111,7 @@ func TestGenerateTableName(t *testing.T) {
110111

111112
for _, testCase := range testCases {
112113
t.Run(testCase.title, func(t *testing.T) {
113-
tableName, err := postgres.GenerateTableName(testCase.input)
114+
tableName, err := strategy.GenerateTableName(testCase.input)
114115

115116
assert.NoError(t, err)
116117
assert.Equal(t, testCase.output, tableName)
@@ -119,7 +120,7 @@ func TestGenerateTableName(t *testing.T) {
119120
})
120121

121122
t.Run("Empty stream names are not supported", func(t *testing.T) {
122-
tableName, err := postgres.GenerateTableName("")
123+
tableName, err := strategy.GenerateTableName("")
123124

124125
asserts := assert.New(t)
125126
asserts.Empty(tableName)
@@ -137,13 +138,13 @@ func TestColumnNames(t *testing.T) {
137138
require.NoError(t, err)
138139

139140
t.Run("get expected columns", func(t *testing.T) {
140-
cols := postgres.InsertColumnNames()
141+
cols := strategy.InsertColumnNames()
141142

142143
assert.Equal(t, cols, expectedColumns)
143144
})
144145

145146
t.Run("cannot modify data", func(t *testing.T) {
146-
assert.Equal(t, postgres.InsertColumnNames(), expectedColumns)
147+
assert.Equal(t, strategy.InsertColumnNames(), expectedColumns)
147148
})
148149
}
149150

@@ -152,7 +153,7 @@ func TestCreateSchema(t *testing.T) {
152153
require.NoError(t, err)
153154

154155
t.Run("output statement elements count", func(t *testing.T) {
155-
cs := postgres.CreateSchema("abc")
156+
cs := strategy.CreateSchema("abc")
156157

157158
assert.Equal(t, 3, len(cs))
158159
assert.Contains(t, cs[0], `CREATE TABLE "abc"`)
@@ -197,7 +198,7 @@ func TestPrepareData(t *testing.T) {
197198
strategy, err := postgres.NewSingleStreamStrategy(pc)
198199
require.NoError(t, err)
199200

200-
data, err := postgres.PrepareData(messages)
201+
data, err := strategy.PrepareData(messages)
201202

202203
assert.NoError(t, err)
203204
assert.Equal(t, expectedColumns, data)
@@ -226,7 +227,7 @@ func TestPrepareData(t *testing.T) {
226227
strategy, err := postgres.NewSingleStreamStrategy(pc)
227228
require.NoError(t, err)
228229

229-
data, err := postgres.PrepareData(messages)
230+
data, err := strategy.PrepareData(messages)
230231

231232
assert.Error(t, expectedErr, err)
232233
assert.Nil(t, data)

0 commit comments

Comments
 (0)