Skip to content

Commit 29dfbbf

Browse files
committed
feat(cqrs): add outbox pattern support with pgxpool
- Add OutboxConfig with support for both *sql.DB and *pgxpool.Pool - Implement forwarder state management for Watermill outbox/forwarder - Add RunForwarder methods to CommandBus and EventBus - Support automatic conversion from pgxpool.Pool to *sql.DB using stdlib - Add logger and meter provider support for observability
1 parent ffa44b3 commit 29dfbbf

File tree

6 files changed

+653
-14
lines changed

6 files changed

+653
-14
lines changed

cqrs/bus/command_bus.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,43 @@ type CommandBus struct {
2121
publisher wmmessage.Publisher
2222
marshaler cqrsmessage.Marshaler
2323
namer cqrsmessage.Namer
24+
forwarder *forwarderState
2425
}
2526

2627
// NewCommandBus builds a bus backed by Watermill publisher.
27-
func NewCommandBus(pub wmmessage.Publisher, marshaler cqrsmessage.Marshaler, namer cqrsmessage.Namer) *CommandBus {
28-
return &CommandBus{
28+
func NewCommandBus(pub wmmessage.Publisher, marshaler cqrsmessage.Marshaler, namer cqrsmessage.Namer, opts ...Option) *CommandBus {
29+
bus, err := NewCommandBusWithOptions(pub, marshaler, namer, opts...)
30+
if err != nil {
31+
panic(err)
32+
}
33+
34+
return bus
35+
}
36+
37+
// NewCommandBusWithOptions builds a bus and returns configuration errors instead of panicking.
38+
func NewCommandBusWithOptions(
39+
pub wmmessage.Publisher,
40+
marshaler cqrsmessage.Marshaler,
41+
namer cqrsmessage.Namer,
42+
opts ...Option,
43+
) (*CommandBus, error) {
44+
cfg, err := applyOptions(opts)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
bus := &CommandBus{
2950
publisher: pub,
3051
marshaler: marshaler,
3152
namer: namer,
3253
}
54+
55+
if cfg.outbox != nil {
56+
bus.forwarder = newForwarderState(cfg.outbox)
57+
bus.publisher = bus.forwarder.wrapPublisher(pub)
58+
}
59+
60+
return bus, nil
3361
}
3462

3563
// validate checks that the command bus and its dependencies are properly initialized.
@@ -90,3 +118,21 @@ func (b *CommandBus) Send(ctx context.Context, cmd any) error {
90118

91119
return b.publisher.Publish(topic, msg)
92120
}
121+
122+
// RunForwarder starts the optional outbox forwarder when configured.
123+
func (b *CommandBus) RunForwarder(ctx context.Context) error {
124+
if b == nil || b.forwarder == nil {
125+
return nil
126+
}
127+
128+
return b.forwarder.Run(ctx)
129+
}
130+
131+
// CloseForwarder attempts to stop previously started forwarder gracefully.
132+
func (b *CommandBus) CloseForwarder(ctx context.Context) error {
133+
if b == nil || b.forwarder == nil {
134+
return nil
135+
}
136+
137+
return b.forwarder.Close(ctx)
138+
}

cqrs/bus/event_bus.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,43 @@ type EventBus struct {
2121
publisher wmmessage.Publisher
2222
marshaler cqrsmessage.Marshaler
2323
namer cqrsmessage.Namer
24+
forwarder *forwarderState
2425
}
2526

2627
// NewEventBus builds EventBus with required dependencies.
27-
func NewEventBus(pub wmmessage.Publisher, marshaler cqrsmessage.Marshaler, namer cqrsmessage.Namer) *EventBus {
28-
return &EventBus{
28+
func NewEventBus(pub wmmessage.Publisher, marshaler cqrsmessage.Marshaler, namer cqrsmessage.Namer, opts ...Option) *EventBus {
29+
bus, err := NewEventBusWithOptions(pub, marshaler, namer, opts...)
30+
if err != nil {
31+
panic(err)
32+
}
33+
34+
return bus
35+
}
36+
37+
// NewEventBusWithOptions builds EventBus and exposes configuration errors.
38+
func NewEventBusWithOptions(
39+
pub wmmessage.Publisher,
40+
marshaler cqrsmessage.Marshaler,
41+
namer cqrsmessage.Namer,
42+
opts ...Option,
43+
) (*EventBus, error) {
44+
cfg, err := applyOptions(opts)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
bus := &EventBus{
2950
publisher: pub,
3051
marshaler: marshaler,
3152
namer: namer,
3253
}
54+
55+
if cfg.outbox != nil {
56+
bus.forwarder = newForwarderState(cfg.outbox)
57+
bus.publisher = bus.forwarder.wrapPublisher(pub)
58+
}
59+
60+
return bus, nil
3361
}
3462

3563
// validate checks that the event bus and its dependencies are properly initialized.
@@ -90,3 +118,21 @@ func (b *EventBus) Publish(ctx context.Context, evt any) error {
90118

91119
return b.publisher.Publish(topic, msg)
92120
}
121+
122+
// RunForwarder starts the optional outbox forwarder when configured.
123+
func (b *EventBus) RunForwarder(ctx context.Context) error {
124+
if b == nil || b.forwarder == nil {
125+
return nil
126+
}
127+
128+
return b.forwarder.Run(ctx)
129+
}
130+
131+
// CloseForwarder stops the optional forwarder if it was started.
132+
func (b *EventBus) CloseForwarder(ctx context.Context) error {
133+
if b == nil || b.forwarder == nil {
134+
return nil
135+
}
136+
137+
return b.forwarder.Close(ctx)
138+
}

cqrs/bus/options.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package bus
2+
3+
import (
4+
"database/sql"
5+
"errors"
6+
"fmt"
7+
"strings"
8+
9+
wmmessage "github.com/ThreeDotsLabs/watermill/message"
10+
"github.com/jackc/pgx/v5/pgxpool"
11+
"github.com/jackc/pgx/v5/stdlib"
12+
"github.com/shortlink-org/go-sdk/logger"
13+
"go.opentelemetry.io/otel/metric"
14+
)
15+
16+
const defaultForwarderTopic = "shortlink_cqrs_outbox"
17+
18+
// Option configures Bus behaviours without breaking the constructor API.
19+
type Option func(*cqrsConfig)
20+
21+
type cqrsConfig struct {
22+
outbox *OutboxConfig
23+
err error
24+
}
25+
26+
// OutboxConfig wires transactional outbox pieces required by the Watermill forwarder.
27+
type OutboxConfig struct {
28+
DB *sql.DB
29+
Pool *pgxpool.Pool
30+
Subscriber wmmessage.Subscriber
31+
RealPublisher wmmessage.Publisher
32+
ForwarderName string
33+
Logger logger.Logger
34+
MeterProvider metric.MeterProvider
35+
}
36+
37+
// WithOutbox enables Watermill's Outbox/Forwarder transport.
38+
//
39+
// Example with *sql.DB:
40+
//
41+
// package main
42+
//
43+
// import (
44+
// "database/sql"
45+
//
46+
// "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
47+
// "github.com/shortlink-org/go-sdk/cqrs/bus"
48+
// )
49+
//
50+
// func wireBuses(db *sql.DB, kafkaPub wmmessage.Publisher) *bus.CommandBus {
51+
// sqlPublisher, _ := sql.NewPublisher(db, sqlPublisherConfig, sqlLogger)
52+
// sqlSubscriber, _ := sql.NewSubscriber(db, sqlSubscriberConfig, sqlLogger)
53+
//
54+
// return bus.NewCommandBus(
55+
// sqlPublisher,
56+
// marshaler,
57+
// namer,
58+
// bus.WithOutbox(bus.OutboxConfig{
59+
// DB: db,
60+
// Subscriber: sqlSubscriber,
61+
// RealPublisher: kafkaPub,
62+
// ForwarderName: "orders_outbox_forwarder",
63+
// Logger: log,
64+
// MeterProvider: meterProvider,
65+
// }),
66+
// )
67+
// }
68+
//
69+
// Example with *pgxpool.Pool (converted automatically):
70+
//
71+
// package main
72+
//
73+
// import (
74+
// "database/sql"
75+
//
76+
// "github.com/jackc/pgx/v5/pgxpool"
77+
// "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
78+
// "github.com/shortlink-org/go-sdk/cqrs/bus"
79+
// )
80+
//
81+
// func wireBusesWithPgx(pool *pgxpool.Pool, kafkaPub wmmessage.Publisher) *bus.CommandBus {
82+
// // Convert pgxpool to sql.DB for watermill-sql
83+
// sqlDB, _ := sql.Open("pgx", pool.Config().ConnString())
84+
// sqlPublisher, _ := sql.NewPublisher(sqlDB, sqlPublisherConfig, sqlLogger)
85+
// sqlSubscriber, _ := sql.NewSubscriber(sqlDB, sqlSubscriberConfig, sqlLogger)
86+
//
87+
// return bus.NewCommandBus(
88+
// sqlPublisher,
89+
// marshaler,
90+
// namer,
91+
// bus.WithOutbox(bus.OutboxConfig{
92+
// Pool: pool,
93+
// Subscriber: sqlSubscriber,
94+
// RealPublisher: kafkaPub,
95+
// ForwarderName: "orders_outbox_forwarder",
96+
// Logger: log,
97+
// MeterProvider: meterProvider,
98+
// }),
99+
// )
100+
// }
101+
func WithOutbox(cfg OutboxConfig) Option {
102+
return func(c *cqrsConfig) {
103+
conf := cfg
104+
if c.err != nil {
105+
return
106+
}
107+
if err := conf.prepare(); err != nil {
108+
c.err = err
109+
return
110+
}
111+
c.outbox = &conf
112+
}
113+
}
114+
115+
func applyOptions(opts []Option) (cqrsConfig, error) {
116+
var cfg cqrsConfig
117+
for _, opt := range opts {
118+
if opt == nil {
119+
continue
120+
}
121+
opt(&cfg)
122+
if cfg.err != nil {
123+
break
124+
}
125+
}
126+
return cfg, cfg.err
127+
}
128+
129+
func (c *OutboxConfig) prepare() error {
130+
if c.DB == nil && c.Pool == nil {
131+
return errors.New("cqrs/bus: sql.DB or pgxpool.Pool must be provided")
132+
}
133+
if c.Subscriber == nil {
134+
return errors.New("cqrs/bus: outbox subscriber is required")
135+
}
136+
if c.RealPublisher == nil {
137+
return errors.New("cqrs/bus: real publisher is required")
138+
}
139+
if c.Logger == nil {
140+
return errors.New("cqrs/bus: logger is required")
141+
}
142+
if c.MeterProvider == nil {
143+
return errors.New("cqrs/bus: meter provider is required")
144+
}
145+
if c.DB == nil && c.Pool != nil {
146+
c.DB = stdlib.OpenDBFromPool(c.Pool)
147+
}
148+
c.ForwarderName = sanitizeForwarderTopic(c.ForwarderName, c.DB, c.Pool)
149+
return nil
150+
}
151+
152+
func sanitizeForwarderTopic(name string, db *sql.DB, pool *pgxpool.Pool) string {
153+
name = strings.TrimSpace(name)
154+
if name != "" {
155+
return name
156+
}
157+
158+
driverName := "sql"
159+
if pool != nil {
160+
driverName = "pgxpool"
161+
} else if db != nil && db.Driver() != nil {
162+
driverName = fmt.Sprintf("%T", db.Driver())
163+
}
164+
165+
driverName = strings.ToLower(driverName)
166+
driverName = strings.ReplaceAll(driverName, ".", "_")
167+
driverName = strings.ReplaceAll(driverName, "*", "_")
168+
driverName = strings.Trim(driverName, "_")
169+
if driverName == "" {
170+
driverName = "sql"
171+
}
172+
173+
return fmt.Sprintf("%s_%s", defaultForwarderTopic, driverName)
174+
}

0 commit comments

Comments
 (0)