Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit 1c96054

Browse files
Merge pull request #6 from vimeda/remove-aggregate-version
remove aggregate version when inserting
2 parents f42e7d5 + 6c99c54 commit 1c96054

File tree

6 files changed

+486
-31
lines changed

6 files changed

+486
-31
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
7+
"github.com/hellofresh/goengine"
8+
"github.com/hellofresh/goengine/strategy/json"
9+
"github.com/hellofresh/goengine/strategy/json/sql/postgres"
10+
)
11+
12+
func main() {
13+
ctx := context.Background()
14+
15+
db, err := sql.Open("postgres", "string-connection")
16+
failOnErr(err)
17+
18+
defer db.Close()
19+
20+
transformer := json.NewPayloadTransformer()
21+
strategy, err := postgres.NewMultiTenancyStreamStrategy(transformer)
22+
failOnErr(err)
23+
24+
manager, err := postgres.NewStreamManager(db, transformer, strategy, nil, nil)
25+
failOnErr(err)
26+
27+
failOnErr(manager.RegisterPayloads(nil))
28+
29+
es, err := manager.NewEventStore()
30+
failOnErr(err)
31+
32+
eventStream := goengine.StreamName("event_stream")
33+
failOnErr(es.Create(ctx, eventStream))
34+
35+
// connect to repository...
36+
}
37+
38+
func failOnErr(err error) {
39+
if err != nil {
40+
panic(err)
41+
}
42+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
7+
"github.com/hellofresh/goengine"
8+
"github.com/hellofresh/goengine/strategy/json"
9+
"github.com/hellofresh/goengine/strategy/json/sql/postgres"
10+
"github.com/hellofresh/goengine/strategy/json/sql/postgres/strategy"
11+
)
12+
13+
func main() {
14+
ctx := context.Background()
15+
16+
db, err := sql.Open("postgres", "string-connection")
17+
failOnErr(err)
18+
19+
defer db.Close()
20+
21+
transformer := json.NewPayloadTransformer()
22+
strategy, err := strategy.NewSingleStreamStrategy(transformer)
23+
failOnErr(err)
24+
25+
manager, err := postgres.NewStreamManager(db, transformer, strategy, nil, nil)
26+
failOnErr(err)
27+
28+
failOnErr(manager.RegisterPayloads(nil))
29+
30+
es, err := manager.NewEventStore()
31+
failOnErr(err)
32+
33+
eventStream := goengine.StreamName("event_stream")
34+
failOnErr(es.Create(ctx, eventStream))
35+
36+
// connect to repository...
37+
}
38+
39+
func failOnErr(err error) {
40+
if err != nil {
41+
panic(err)
42+
}
43+
}

strategy/json/sql/postgres/manager.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
strategySQL "github.com/hellofresh/goengine/strategy/json/sql"
1212
)
1313

14-
// SingleStreamManager is a helper for creating JSON Postgres event stores and projectors
15-
type SingleStreamManager struct {
14+
// StreamManager is a helper for creating JSON Postgres event stores and projectors
15+
type StreamManager struct {
1616
db *sql.DB
1717
payloadTransformer *json.PayloadTransformer
1818
persistenceStrategy driverSQL.PersistenceStrategy
@@ -22,8 +22,14 @@ type SingleStreamManager struct {
2222
metrics driverSQL.Metrics
2323
}
2424

25-
// NewSingleStreamManager return a new instance of the SingleStreamManager
26-
func NewSingleStreamManager(db *sql.DB, logger goengine.Logger, metrics driverSQL.Metrics) (*SingleStreamManager, error) {
25+
// NewStreamManager return a new instance of the StreamManager
26+
func NewStreamManager(
27+
db *sql.DB,
28+
transformer *json.PayloadTransformer,
29+
strategy driverSQL.PersistenceStrategy,
30+
logger goengine.Logger,
31+
metrics driverSQL.Metrics,
32+
) (*StreamManager, error) {
2733
if db == nil {
2834
return nil, goengine.InvalidArgumentError("db")
2935
}
@@ -34,32 +40,24 @@ func NewSingleStreamManager(db *sql.DB, logger goengine.Logger, metrics driverSQ
3440
metrics = driverSQL.NopMetrics
3541
}
3642

37-
payloadTransformer := json.NewPayloadTransformer()
38-
39-
// Setting up the postgres strategy
40-
persistenceStrategy, err := NewSingleStreamStrategy(payloadTransformer)
41-
if err != nil {
42-
return nil, err
43-
}
44-
4543
// Setting up the message factory
46-
messageFactory, err := strategySQL.NewAggregateChangedFactory(payloadTransformer)
44+
messageFactory, err := strategySQL.NewAggregateChangedFactory(transformer)
4745
if err != nil {
4846
return nil, err
4947
}
5048

51-
return &SingleStreamManager{
49+
return &StreamManager{
5250
db: db,
53-
payloadTransformer: payloadTransformer,
54-
persistenceStrategy: persistenceStrategy,
51+
payloadTransformer: transformer,
52+
persistenceStrategy: strategy,
5553
messageFactory: messageFactory,
5654
logger: logger,
5755
metrics: metrics,
5856
}, nil
5957
}
6058

6159
// NewEventStore returns a new event store instance
62-
func (m *SingleStreamManager) NewEventStore() (*postgres.EventStore, error) {
60+
func (m *StreamManager) NewEventStore() (*postgres.EventStore, error) {
6361
// Setting up the event store
6462
return postgres.NewEventStore(
6563
m.persistenceStrategy,
@@ -70,17 +68,17 @@ func (m *SingleStreamManager) NewEventStore() (*postgres.EventStore, error) {
7068
}
7169

7270
// RegisterPayloads registers a set of payload type initiators
73-
func (m *SingleStreamManager) RegisterPayloads(initiators map[string]json.PayloadInitiator) error {
71+
func (m *StreamManager) RegisterPayloads(initiators map[string]json.PayloadInitiator) error {
7472
return m.payloadTransformer.RegisterPayloads(initiators)
7573
}
7674

7775
// PersistenceStrategy returns the sql persistence strategy
78-
func (m *SingleStreamManager) PersistenceStrategy() driverSQL.PersistenceStrategy {
76+
func (m *StreamManager) PersistenceStrategy() driverSQL.PersistenceStrategy {
7977
return m.persistenceStrategy
8078
}
8179

8280
// NewStreamProjector returns a new stream projector instance
83-
func (m *SingleStreamManager) NewStreamProjector(
81+
func (m *StreamManager) NewStreamProjector(
8482
projectionTable string,
8583
projection goengine.Projection,
8684
projectionErrorHandler driverSQL.ProjectionErrorCallback,
@@ -114,7 +112,7 @@ func (m *SingleStreamManager) NewStreamProjector(
114112
}
115113

116114
// NewAggregateProjector returns a new aggregate projector instance
117-
func (m *SingleStreamManager) NewAggregateProjector(
115+
func (m *StreamManager) NewAggregateProjector(
118116
eventStream goengine.StreamName,
119117
aggregateTypeName string,
120118
projectionTable string,
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package postgres
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
8+
"github.com/hellofresh/goengine"
9+
"github.com/hellofresh/goengine/driver/sql"
10+
"github.com/hellofresh/goengine/driver/sql/postgres"
11+
"github.com/hellofresh/goengine/metadata"
12+
"github.com/hellofresh/goengine/strategy/json/internal"
13+
)
14+
15+
// Ensure MultiThreadStreamStrategy implements strategy.PersistenceStrategy
16+
var _ sql.PersistenceStrategy = &MultiThreadStreamStrategy{}
17+
18+
// MultiThreadStreamStrategy struct represents eventstore with multitenancy stream
19+
type MultiThreadStreamStrategy struct {
20+
converter goengine.MessagePayloadConverter
21+
}
22+
23+
// NewMultiTenancyStreamStrategy is the constructor postgres for PersistenceStrategy interface
24+
func NewMultiTenancyStreamStrategy(converter goengine.MessagePayloadConverter) (sql.PersistenceStrategy, error) {
25+
if converter == nil {
26+
return nil, goengine.InvalidArgumentError("converter")
27+
}
28+
29+
return &MultiThreadStreamStrategy{converter: converter}, nil
30+
}
31+
32+
// CreateSchema returns a valid set of SQL statements to create the event store tables and indexes
33+
func (s *MultiThreadStreamStrategy) CreateSchema(tableName string) []string {
34+
tableName = postgres.QuoteIdentifier(tableName)
35+
36+
statements := make([]string, 3)
37+
statements[0] = fmt.Sprintf(
38+
`CREATE TABLE %s (
39+
no BIGSERIAL,
40+
event_id UUID NOT NULL,
41+
event_name VARCHAR(100) NOT NULL,
42+
payload JSON NOT NULL,
43+
metadata JSONB NOT NULL,
44+
aggregate_type VARCHAR(50) NOT NULL,
45+
aggregate_id UUID NOT NULL,
46+
aggregate_version SERIAL,
47+
created_at TIMESTAMP(6) NOT NULL,
48+
PRIMARY KEY (no),
49+
UNIQUE (event_id)
50+
);`,
51+
tableName,
52+
)
53+
statements[1] = fmt.Sprintf(`CREATE UNIQUE INDEX ON %s (aggregate_type, aggregate_id, aggregate_version);`, tableName)
54+
statements[2] = fmt.Sprintf(`CREATE INDEX ON %s (aggregate_type, aggregate_id, no);`, tableName)
55+
56+
return statements
57+
}
58+
59+
// EventColumnNames returns the columns that need to be select an event from the table
60+
func (s *MultiThreadStreamStrategy) EventColumnNames() []string {
61+
return []string{"no", "event_id", "event_name", "payload", "metadata", "created_at"}
62+
}
63+
64+
// InsertColumnNames returns the columns that need to be inserted into the table in the correct order
65+
func (s *MultiThreadStreamStrategy) InsertColumnNames() []string {
66+
return []string{"event_id", "event_name", "payload", "metadata", "aggregate_type", "aggregate_id", "created_at"}
67+
}
68+
69+
// PrepareData transforms a slice of messaging into a flat interface slice with the correct column order
70+
func (s *MultiThreadStreamStrategy) PrepareData(messages []goengine.Message) ([]interface{}, error) {
71+
var out = make([]interface{}, 0, len(messages)*5) // optimization for the number of columns
72+
for _, msg := range messages {
73+
payloadType, payloadData, err := s.converter.ConvertPayload(msg.Payload())
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
msgMetadata := msg.Metadata()
79+
meta, err := internal.MarshalJSON(msgMetadata)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
out = append(out,
85+
msg.UUID(),
86+
payloadType,
87+
payloadData,
88+
meta,
89+
msgMetadata.Value("_aggregate_type"),
90+
msgMetadata.Value("_aggregate_id"),
91+
msg.CreatedAt(),
92+
)
93+
}
94+
return out, nil
95+
}
96+
97+
// PrepareSearch returns the where part for searching the event store
98+
func (s *MultiThreadStreamStrategy) PrepareSearch(matcher metadata.Matcher) ([]byte, []interface{}) {
99+
query := make([]byte, 0, 196)
100+
params := make([]interface{}, 0, 2)
101+
102+
paramCount := 1
103+
matcher.Iterate(func(c metadata.Constraint) {
104+
paramCount++
105+
params = append(params, c.Value())
106+
107+
query = append(query, " AND "...)
108+
switch c.Field() {
109+
case "_aggregate_type":
110+
query = append(query, "aggregate_type"...)
111+
case "_aggregate_id":
112+
query = append(query, "aggregate_id"...)
113+
case "_aggregate_version":
114+
query = append(query, "aggregate_version"...)
115+
default:
116+
query = append(query, "metadata ->> "...)
117+
query = append(query, postgres.QuoteString(c.Field())...)
118+
}
119+
120+
query = append(query, ' ')
121+
query = append(query, c.Operator()...)
122+
query = append(query, " $"...)
123+
query = append(query, strconv.Itoa(paramCount)...)
124+
})
125+
126+
return query, params
127+
}
128+
129+
// GenerateTableName returns a valid table name for postgres
130+
func (s *MultiThreadStreamStrategy) GenerateTableName(streamName goengine.StreamName) (string, error) {
131+
if len(streamName) == 0 {
132+
return "", goengine.InvalidArgumentError("streamName")
133+
}
134+
135+
name := strings.ToLower(string(streamName))
136+
// remove not allowed symbols
137+
name = tableNameInvalidCharRegex.ReplaceAllString(name, "")
138+
// remove underscore at the end
139+
name = strings.TrimRight(name, "_")
140+
// prefix with events_
141+
return fmt.Sprintf("events_%s", name), nil
142+
}

0 commit comments

Comments
 (0)