Skip to content

Commit ffa44b3

Browse files
committed
feat(cqrs): add JSON marshaler and examples
- Add JSONMarshaler for JSON payloads (alternative to ProtoMarshaler) - Add comprehensive tests for JSONMarshaler - Add basic example in cqrs/examples with JSON messages - Remove unused gobreaker/v2 dependency - Update registry to return errors on nil registration
1 parent 9dd5195 commit ffa44b3

File tree

14 files changed

+603
-20
lines changed

14 files changed

+603
-20
lines changed

cqrs/bus/command_bus.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func NewCommandBus(pub wmmessage.Publisher, marshaler cqrsmessage.Marshaler, nam
3232
}
3333
}
3434

35-
// Send encodes and publishes command with Shortlink metadata and tracing context.
36-
func (b *CommandBus) Send(ctx context.Context, cmd any) error {
35+
// validate checks that the command bus and its dependencies are properly initialized.
36+
func (b *CommandBus) validate(cmd any) error {
3737
if b == nil {
3838
return errCommandBusUninitialized
3939
}
@@ -46,6 +46,14 @@ func (b *CommandBus) Send(ctx context.Context, cmd any) error {
4646
if cmd == nil {
4747
return errCommandPayloadNil
4848
}
49+
return nil
50+
}
51+
52+
// Send encodes and publishes command with Shortlink metadata and tracing context.
53+
func (b *CommandBus) Send(ctx context.Context, cmd any) error {
54+
if err := b.validate(cmd); err != nil {
55+
return err
56+
}
4957
if ctx == nil {
5058
ctx = context.Background()
5159
}

cqrs/bus/event_bus.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func NewEventBus(pub wmmessage.Publisher, marshaler cqrsmessage.Marshaler, namer
3232
}
3333
}
3434

35-
// Publish sends event using canonical topic name.
36-
func (b *EventBus) Publish(ctx context.Context, evt any) error {
35+
// validate checks that the event bus and its dependencies are properly initialized.
36+
func (b *EventBus) validate(evt any) error {
3737
if b == nil {
3838
return errEventBusUninitialized
3939
}
@@ -46,6 +46,14 @@ func (b *EventBus) Publish(ctx context.Context, evt any) error {
4646
if evt == nil {
4747
return errEventPayloadNil
4848
}
49+
return nil
50+
}
51+
52+
// Publish sends event using canonical topic name.
53+
func (b *EventBus) Publish(ctx context.Context, evt any) error {
54+
if err := b.validate(evt); err != nil {
55+
return err
56+
}
4957
if ctx == nil {
5058
ctx = context.Background()
5159
}

cqrs/bus/registry.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,19 @@ func NewTypeRegistry() *TypeRegistry {
3030
}
3131
}
3232

33-
// RegisterCommand registers command type.
34-
func (r *TypeRegistry) RegisterCommand(cmd any) error {
33+
// validateCommand checks that command is not nil.
34+
func (r *TypeRegistry) validateCommand(cmd any) error {
3535
if cmd == nil {
3636
return ErrNilCommandType
3737
}
38+
return nil
39+
}
40+
41+
// RegisterCommand registers command type.
42+
func (r *TypeRegistry) RegisterCommand(cmd any) error {
43+
if err := r.validateCommand(cmd); err != nil {
44+
return err
45+
}
3846
r.mu.Lock()
3947
defer r.mu.Unlock()
4048

@@ -43,11 +51,19 @@ func (r *TypeRegistry) RegisterCommand(cmd any) error {
4351
return nil
4452
}
4553

46-
// RegisterEvent registers event type.
47-
func (r *TypeRegistry) RegisterEvent(evt any) error {
54+
// validateEvent checks that event is not nil.
55+
func (r *TypeRegistry) validateEvent(evt any) error {
4856
if evt == nil {
4957
return ErrNilEventType
5058
}
59+
return nil
60+
}
61+
62+
// RegisterEvent registers event type.
63+
func (r *TypeRegistry) RegisterEvent(evt any) error {
64+
if err := r.validateEvent(evt); err != nil {
65+
return err
66+
}
5167
r.mu.Lock()
5268
defer r.mu.Unlock()
5369

cqrs/bus/registry_test.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"sync"
66
"testing"
77

8+
"github.com/stretchr/testify/require"
9+
810
cqrsmessage "github.com/shortlink-org/go-sdk/cqrs/message"
911
)
1012

@@ -25,9 +27,8 @@ func TestTypeRegistryConcurrentAccess(t *testing.T) {
2527
cqrsmessage.MetadataTypeVersion: "v1",
2628
},
2729
}
28-
if err := reg.RegisterCommand(cmd); err != nil {
29-
t.Errorf("register command %d: %v", i, err)
30-
}
30+
err := reg.RegisterCommand(cmd)
31+
require.NoError(t, err, "register command %d", i)
3132
}(i)
3233

3334
go func(i int) {
@@ -38,20 +39,18 @@ func TestTypeRegistryConcurrentAccess(t *testing.T) {
3839
cqrsmessage.MetadataTypeVersion: "v1",
3940
},
4041
}
41-
if err := reg.RegisterEvent(evt); err != nil {
42-
t.Errorf("register event %d: %v", i, err)
43-
}
42+
err := reg.RegisterEvent(evt)
43+
require.NoError(t, err, "register event %d", i)
4444
}(i)
4545
}
4646

4747
wg.Wait()
4848

4949
for i := 0; i < workers; i++ {
50-
if _, ok := reg.ResolveCommand(fmt.Sprintf("billing.command.create_%d.v1", i)); !ok {
51-
t.Fatalf("command %d not found", i)
52-
}
53-
if _, ok := reg.ResolveEvent(fmt.Sprintf("billing.aggregate.event_%d.v1", i)); !ok {
54-
t.Fatalf("event %d not found", i)
55-
}
50+
_, ok := reg.ResolveCommand(fmt.Sprintf("billing.command.create_%d.v1", i))
51+
require.True(t, ok, "command %d not found", i)
52+
53+
_, ok = reg.ResolveEvent(fmt.Sprintf("billing.aggregate.event_%d.v1", i))
54+
require.True(t, ok, "event %d not found", i)
5655
}
5756
}

cqrs/examples/README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# CQRS Basic Example
2+
3+
This example demonstrates the basic usage of the `cqrs` package with Watermill.
4+
5+
## What it does
6+
7+
- Registers command and event types in a type registry
8+
- Creates command and event buses
9+
- Sets up typed handlers for commands and events
10+
- Publishes a command and an event
11+
- Processes them through the router
12+
13+
## Running the example
14+
15+
```bash
16+
cd cqrs/examples
17+
go mod tidy
18+
go run main.go
19+
```
20+
21+
## Expected output
22+
23+
```
24+
🚀 Publishing CreateOrderCommand...
25+
📦 Processing command: CreateOrderCommand{OrderId: order-123, UserId: user-456, Amount: 99.99}
26+
✅ Order created successfully: order-123
27+
28+
📢 Publishing OrderCreatedEvent...
29+
📊 Projecting event: OrderCreatedEvent{OrderId: order-123, UserId: user-456, Amount: 99.99}
30+
✅ Projection updated for order: order-123
31+
32+
✨ Example completed successfully!
33+
```
34+
35+
## Key concepts
36+
37+
- **Type Registry**: Central registry for all commands and events
38+
- **Namer**: Generates canonical names like `orders.command.create_order.v1`
39+
- **Marshaler**: Serializes protobuf messages with metadata
40+
- **CommandBus/EventBus**: Publish commands and events with automatic metadata enrichment
41+
- **Handlers**: Typed handlers that receive unmarshaled protobuf messages
42+
- **Router**: Watermill router with CQRS middleware (retry, timeout, etc.)
43+

cqrs/examples/go.mod

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
module github.com/shortlink-org/go-sdk/cqrs/examples
2+
3+
go 1.25.4
4+
5+
replace github.com/shortlink-org/go-sdk/cqrs => ../
6+
7+
replace github.com/shortlink-org/go-sdk/watermill => ../../watermill
8+
9+
require (
10+
github.com/ThreeDotsLabs/watermill v1.5.1
11+
github.com/google/uuid v1.6.0
12+
github.com/shortlink-org/go-sdk/cqrs v0.0.0-00010101000000-000000000000
13+
)
14+
15+
require (
16+
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
17+
github.com/go-logr/logr v1.4.3 // indirect
18+
github.com/go-logr/stdr v1.2.2 // indirect
19+
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
20+
github.com/oklog/ulid v1.3.1 // indirect
21+
github.com/pkg/errors v0.9.1 // indirect
22+
github.com/sony/gobreaker v1.0.0 // indirect
23+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
24+
go.opentelemetry.io/otel v1.38.0 // indirect
25+
go.opentelemetry.io/otel/metric v1.38.0 // indirect
26+
go.opentelemetry.io/otel/trace v1.38.0 // indirect
27+
google.golang.org/protobuf v1.36.10 // indirect
28+
)

cqrs/examples/go.sum

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
github.com/ThreeDotsLabs/watermill v1.5.1 h1:t5xMivyf9tpmU3iozPqyrCZXHvoV1XQDfihas4sV0fY=
2+
github.com/ThreeDotsLabs/watermill v1.5.1/go.mod h1:Uop10dA3VeJWsSvis9qO3vbVY892LARrKAdki6WtXS4=
3+
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
4+
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
5+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
7+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
9+
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
10+
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
11+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
12+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
13+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
14+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
15+
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
16+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
17+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
18+
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
19+
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
20+
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
21+
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
22+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
23+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
24+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
25+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
26+
github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ=
27+
github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
28+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
29+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
30+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
31+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
32+
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
33+
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
34+
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
35+
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
36+
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
37+
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
38+
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
39+
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
40+
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
41+
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
42+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
43+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

cqrs/examples/handlers.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
)
8+
9+
// CreateOrderHandler processes CreateOrderCommand
10+
type CreateOrderHandler struct{}
11+
12+
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd *CreateOrderCommand) error {
13+
fmt.Printf("📦 Processing command: CreateOrderCommand{OrderId: %s, UserId: %s, Amount: %.2f}\n",
14+
cmd.OrderId, cmd.UserId, cmd.Amount)
15+
16+
// Simulate business logic
17+
time.Sleep(100 * time.Millisecond)
18+
19+
// In real app, you would publish events here
20+
fmt.Printf("✅ Order created successfully: %s\n", cmd.OrderId)
21+
return nil
22+
}
23+
24+
// OrderCreatedProjector processes OrderCreatedEvent
25+
type OrderCreatedProjector struct{}
26+
27+
func (p *OrderCreatedProjector) Handle(ctx context.Context, evt *OrderCreatedEvent) error {
28+
fmt.Printf("📊 Projecting event: OrderCreatedEvent{OrderId: %s, UserId: %s, Amount: %.2f}\n",
29+
evt.OrderId, evt.UserId, evt.Amount)
30+
31+
// Simulate projection logic (e.g., update read model)
32+
time.Sleep(50 * time.Millisecond)
33+
34+
fmt.Printf("✅ Projection updated for order: %s\n", evt.OrderId)
35+
return nil
36+
}

0 commit comments

Comments
 (0)