Skip to content

Commit e7fdee8

Browse files
committed
cqrs: add integration tests with testcontainers and goroutine leak checks
- Add integration tests (build tag: integration) for CommandBus outbox, EventBus tx-aware outbox, and forwarder close using PostgreSQL testcontainers - Add unit leak tests (CommandBus/EventBus with gochannel) and TestMain with goleak.VerifyTestMain for package-wide leak detection - Require Docker for integration tests; fail with clear message if postgres container cannot start - Add testcontainers-go, postgres module, and goleak to cqrs go.mod - Ignore known third-party goroutines (testcontainers reaper, pgx pool, database/sql) in goleak for integration tests
1 parent 4f25da7 commit e7fdee8

File tree

4 files changed

+568
-0
lines changed

4 files changed

+568
-0
lines changed

cqrs/bus/integration_test.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
//go:build integration
2+
3+
package bus_test
4+
5+
import (
6+
"context"
7+
"io"
8+
"testing"
9+
"time"
10+
11+
"github.com/ThreeDotsLabs/watermill"
12+
wmsql "github.com/ThreeDotsLabs/watermill-sql/v4/pkg/sql"
13+
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
14+
"github.com/jackc/pgx/v5/pgxpool"
15+
"github.com/jackc/pgx/v5/stdlib"
16+
"github.com/stretchr/testify/require"
17+
"github.com/testcontainers/testcontainers-go"
18+
"github.com/testcontainers/testcontainers-go/modules/postgres"
19+
"github.com/testcontainers/testcontainers-go/wait"
20+
"go.opentelemetry.io/otel/metric/noop"
21+
"go.uber.org/goleak"
22+
23+
"github.com/shortlink-org/go-sdk/cqrs/bus"
24+
"github.com/shortlink-org/go-sdk/cqrs/message"
25+
"github.com/shortlink-org/go-sdk/logger"
26+
"github.com/shortlink-org/go-sdk/uow"
27+
)
28+
29+
var integrationGoleakOpts = []goleak.Option{
30+
goleak.IgnoreTopFunction("github.com/testcontainers/testcontainers-go.(*Reaper).connect.func1"),
31+
goleak.IgnoreTopFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
32+
goleak.IgnoreTopFunction("database/sql.(*DB).connectionOpener"),
33+
}
34+
35+
const (
36+
forwarderTopic = "shortlink_cqrs_outbox_test"
37+
serviceName = "cqrs-integration-test"
38+
)
39+
40+
func setupPostgres(t *testing.T) *pgxpool.Pool {
41+
t.Helper()
42+
ctx := context.Background()
43+
44+
container, err := postgres.Run(ctx,
45+
"postgres:18-alpine",
46+
postgres.WithDatabase("testdb"),
47+
postgres.WithUsername("testuser"),
48+
postgres.WithPassword("testpass"),
49+
testcontainers.WithWaitStrategy(
50+
wait.ForLog("database system is ready to accept connections").
51+
WithOccurrence(2).
52+
WithStartupTimeout(30*time.Second),
53+
),
54+
)
55+
require.NoError(t, err, "postgres container: ensure Docker is running")
56+
57+
connStr, err := container.ConnectionString(ctx, "sslmode=disable")
58+
require.NoError(t, err)
59+
60+
pool, err := pgxpool.New(ctx, connStr)
61+
require.NoError(t, err)
62+
63+
t.Cleanup(func() {
64+
pool.Close()
65+
_ = container.Terminate(context.Background())
66+
})
67+
68+
return pool
69+
}
70+
71+
func TestIntegration_CommandBus_Outbox_NoLeaks(t *testing.T) {
72+
defer goleak.VerifyNone(t, integrationGoleakOpts...)
73+
74+
pool := setupPostgres(t)
75+
sqlDB := stdlib.OpenDBFromPool(pool)
76+
t.Cleanup(func() { _ = sqlDB.Close() })
77+
78+
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
79+
defer cancel()
80+
81+
wmLogger := watermill.NewStdLogger(false, false)
82+
schema := wmsql.DefaultPostgreSQLSchema{}
83+
pgxBeginner := wmsql.PgxBeginner{Conn: pool}
84+
85+
sqlPub, err := wmsql.NewPublisher(pgxBeginner, wmsql.PublisherConfig{
86+
SchemaAdapter: schema,
87+
AutoInitializeSchema: true,
88+
}, wmLogger)
89+
require.NoError(t, err)
90+
t.Cleanup(func() { _ = sqlPub.Close() })
91+
92+
sqlSub, err := wmsql.NewSubscriber(
93+
wmsql.BeginnerFromPgx(pool),
94+
wmsql.SubscriberConfig{
95+
SchemaAdapter: schema,
96+
OffsetsAdapter: wmsql.DefaultPostgreSQLOffsetsAdapter{},
97+
InitializeSchema: true,
98+
ConsumerGroup: "test-consumer",
99+
PollInterval: 100 * time.Millisecond,
100+
AckDeadline: ptrDuration(5 * time.Second),
101+
},
102+
wmLogger,
103+
)
104+
require.NoError(t, err)
105+
t.Cleanup(func() { _ = sqlSub.Close() })
106+
107+
realPub := gochannel.NewGoChannel(gochannel.Config{}, wmLogger)
108+
namer := message.NewShortlinkNamer(serviceName)
109+
marshaler := message.NewJSONMarshaler(namer)
110+
111+
cfg := logger.Default()
112+
cfg.Writer = io.Discard
113+
cfg.Level = logger.WARN_LEVEL
114+
log, err := logger.New(cfg)
115+
require.NoError(t, err)
116+
117+
cmdBus, err := bus.NewCommandBusWithOptions(sqlPub, marshaler, namer,
118+
bus.WithOutbox(bus.OutboxConfig{
119+
DB: sqlDB,
120+
Subscriber: sqlSub,
121+
RealPublisher: realPub,
122+
ForwarderName: forwarderTopic,
123+
Logger: log,
124+
MeterProvider: noop.NewMeterProvider(),
125+
}),
126+
)
127+
require.NoError(t, err)
128+
129+
cmdTopic := namer.TopicForCommand(namer.CommandName(&testCommand{}))
130+
sub, err := realPub.Subscribe(ctx, cmdTopic)
131+
require.NoError(t, err)
132+
133+
forwarderCtx, stopForwarder := context.WithCancel(ctx)
134+
var forwarderErr error
135+
done := make(chan struct{})
136+
go func() {
137+
defer close(done)
138+
forwarderErr = cmdBus.RunForwarder(forwarderCtx)
139+
}()
140+
141+
err = cmdBus.Send(ctx, &testCommand{ID: "cmd-1"})
142+
require.NoError(t, err)
143+
144+
select {
145+
case msg := <-sub:
146+
require.NotNil(t, msg)
147+
msg.Ack()
148+
case <-time.After(10 * time.Second):
149+
t.Fatal("timeout waiting for forwarded message")
150+
}
151+
152+
stopForwarder()
153+
<-done
154+
closeCtx, closeCancel := context.WithTimeout(context.Background(), 3*time.Second)
155+
err = cmdBus.CloseForwarder(closeCtx)
156+
closeCancel()
157+
if err != nil && err != context.DeadlineExceeded {
158+
require.NoError(t, err)
159+
}
160+
_ = forwarderErr
161+
}
162+
163+
func TestIntegration_EventBus_TxAwareOutbox_NoLeaks(t *testing.T) {
164+
defer goleak.VerifyNone(t, integrationGoleakOpts...)
165+
166+
pool := setupPostgres(t)
167+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
168+
defer cancel()
169+
170+
wmLogger := watermill.NewStdLogger(false, false)
171+
schema := wmsql.DefaultPostgreSQLSchema{}
172+
pgxBeginner := wmsql.PgxBeginner{Conn: pool}
173+
174+
sqlPub, err := wmsql.NewPublisher(pgxBeginner, wmsql.PublisherConfig{
175+
SchemaAdapter: schema,
176+
AutoInitializeSchema: true,
177+
}, wmLogger)
178+
require.NoError(t, err)
179+
t.Cleanup(func() { _ = sqlPub.Close() })
180+
181+
sqlSub, err := wmsql.NewSubscriber(
182+
wmsql.BeginnerFromPgx(pool),
183+
wmsql.SubscriberConfig{
184+
SchemaAdapter: schema,
185+
OffsetsAdapter: wmsql.DefaultPostgreSQLOffsetsAdapter{},
186+
InitializeSchema: true,
187+
ConsumerGroup: "tx-outbox-init",
188+
PollInterval: 10 * time.Millisecond,
189+
AckDeadline: ptrDuration(2 * time.Second),
190+
},
191+
wmLogger,
192+
)
193+
require.NoError(t, err)
194+
t.Cleanup(func() { _ = sqlSub.Close() })
195+
require.NoError(t, sqlSub.SubscribeInitialize(forwarderTopic))
196+
197+
namer := message.NewShortlinkNamer(serviceName)
198+
marshaler := message.NewJSONMarshaler(namer)
199+
200+
evtBus, err := bus.NewEventBusWithOptions(nil, marshaler, namer,
201+
bus.WithTxAwareOutbox(forwarderTopic, wmLogger),
202+
)
203+
require.NoError(t, err)
204+
205+
tx, err := pool.Begin(ctx)
206+
require.NoError(t, err)
207+
t.Cleanup(func() { _ = tx.Rollback(ctx) })
208+
209+
txCtx := uow.WithTx(ctx, tx)
210+
err = evtBus.Publish(txCtx, &testEvent{ID: "evt-1"})
211+
require.NoError(t, err)
212+
err = tx.Commit(ctx)
213+
require.NoError(t, err)
214+
}
215+
216+
func TestIntegration_ForwarderClose_NoGoroutineLeak(t *testing.T) {
217+
defer goleak.VerifyNone(t, integrationGoleakOpts...)
218+
219+
pool := setupPostgres(t)
220+
sqlDB := stdlib.OpenDBFromPool(pool)
221+
t.Cleanup(func() { _ = sqlDB.Close() })
222+
223+
ctx := context.Background()
224+
225+
wmLogger := watermill.NewStdLogger(false, false)
226+
schema := wmsql.DefaultPostgreSQLSchema{}
227+
pgxBeginner := wmsql.PgxBeginner{Conn: pool}
228+
229+
sqlPub, err := wmsql.NewPublisher(pgxBeginner, wmsql.PublisherConfig{
230+
SchemaAdapter: schema,
231+
AutoInitializeSchema: true,
232+
}, wmLogger)
233+
require.NoError(t, err)
234+
t.Cleanup(func() { _ = sqlPub.Close() })
235+
236+
sqlSub, err := wmsql.NewSubscriber(
237+
wmsql.BeginnerFromPgx(pool),
238+
wmsql.SubscriberConfig{
239+
SchemaAdapter: schema,
240+
OffsetsAdapter: wmsql.DefaultPostgreSQLOffsetsAdapter{},
241+
InitializeSchema: true,
242+
ConsumerGroup: "leak-test",
243+
PollInterval: 50 * time.Millisecond,
244+
AckDeadline: ptrDuration(2 * time.Second),
245+
},
246+
wmLogger,
247+
)
248+
require.NoError(t, err)
249+
t.Cleanup(func() { _ = sqlSub.Close() })
250+
251+
realPub := gochannel.NewGoChannel(gochannel.Config{}, wmLogger)
252+
namer := message.NewShortlinkNamer(serviceName)
253+
marshaler := message.NewJSONMarshaler(namer)
254+
255+
cfg := logger.Default()
256+
cfg.Writer = io.Discard
257+
cfg.Level = logger.WARN_LEVEL
258+
log, err := logger.New(cfg)
259+
require.NoError(t, err)
260+
261+
cmdBus, err := bus.NewCommandBusWithOptions(sqlPub, marshaler, namer,
262+
bus.WithOutbox(bus.OutboxConfig{
263+
DB: sqlDB,
264+
Subscriber: sqlSub,
265+
RealPublisher: realPub,
266+
ForwarderName: forwarderTopic + "_leak",
267+
Logger: log,
268+
MeterProvider: noop.NewMeterProvider(),
269+
}),
270+
)
271+
require.NoError(t, err)
272+
273+
runCtx, cancel := context.WithCancel(ctx)
274+
go func() { _ = cmdBus.RunForwarder(runCtx) }()
275+
276+
time.Sleep(200 * time.Millisecond)
277+
cancel()
278+
closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second)
279+
_ = cmdBus.CloseForwarder(closeCtx)
280+
closeCancel()
281+
}
282+
283+
type testCommand struct {
284+
ID string `json:"id"`
285+
}
286+
287+
type testEvent struct {
288+
ID string `json:"id"`
289+
}
290+
291+
func ptrDuration(d time.Duration) *time.Duration { return &d }

cqrs/bus/leak_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package bus
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/ThreeDotsLabs/watermill"
8+
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
9+
"github.com/stretchr/testify/require"
10+
"go.uber.org/goleak"
11+
12+
"github.com/shortlink-org/go-sdk/cqrs/message"
13+
)
14+
15+
// goleakIgnoreOpts ignores known third-party goroutines that may still be
16+
// running after integration tests (testcontainers reaper, pgx pool, sql.DB).
17+
// Used by TestMain and by integration tests.
18+
var goleakIgnoreOpts = []goleak.Option{
19+
goleak.IgnoreTopFunction("github.com/testcontainers/testcontainers-go.(*Reaper).connect.func1"),
20+
goleak.IgnoreTopFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
21+
goleak.IgnoreTopFunction("database/sql.(*DB).connectionOpener"),
22+
}
23+
24+
// TestMain runs goleak.VerifyTestMain after all tests in the package,
25+
// so any goroutine leak from any test is reported once at exit.
26+
// See https://github.com/uber-go/goleak
27+
func TestMain(m *testing.M) {
28+
goleak.VerifyTestMain(m, goleakIgnoreOpts...)
29+
}
30+
31+
// TestCommandBus_Send_NoGoroutineLeak ensures CommandBus with in-memory publisher
32+
// does not leak goroutines after Send and normal usage.
33+
func TestCommandBus_Send_NoGoroutineLeak(t *testing.T) {
34+
defer goleak.VerifyNone(t)
35+
36+
pc := gochannel.NewGoChannel(gochannel.Config{}, watermill.NewStdLogger(false, false))
37+
namer := message.NewShortlinkNamer("leak-test")
38+
marshaler := message.NewJSONMarshaler(namer)
39+
40+
cmdBus := NewCommandBus(pc, marshaler, namer)
41+
ctx := context.Background()
42+
43+
err := cmdBus.Send(ctx, &struct {
44+
ID string `json:"id"`
45+
}{ID: "x"})
46+
require.NoError(t, err)
47+
_ = cmdBus
48+
}
49+
50+
// TestEventBus_Publish_NoGoroutineLeak ensures EventBus with in-memory publisher
51+
// does not leak goroutines after Publish.
52+
func TestEventBus_Publish_NoGoroutineLeak(t *testing.T) {
53+
defer goleak.VerifyNone(t)
54+
55+
pc := gochannel.NewGoChannel(gochannel.Config{}, watermill.NewStdLogger(false, false))
56+
namer := message.NewShortlinkNamer("leak-test")
57+
marshaler := message.NewJSONMarshaler(namer)
58+
59+
evtBus := NewEventBus(pc, marshaler, namer)
60+
ctx := context.Background()
61+
62+
err := evtBus.Publish(ctx, &struct {
63+
ID string `json:"id"`
64+
}{ID: "y"})
65+
require.NoError(t, err)
66+
_ = evtBus
67+
}

0 commit comments

Comments
 (0)