Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions core/command/aggservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package command

import (
"context"
"fmt"
"reflect"

eventuous "github.com/eventuous/eventuous-go/core"
"github.com/eventuous/eventuous-go/core/aggregate"
"github.com/eventuous/eventuous-go/core/codec"
"github.com/eventuous/eventuous-go/core/store"
)

Expand All @@ -17,6 +19,7 @@ import (
type AggregateService[S any] struct {
reader store.EventReader
writer store.EventWriter
typeMap *codec.TypeMap
fold func(S, any) S
zero S
handlers map[reflect.Type]untypedAggHandler[S]
Expand All @@ -30,15 +33,21 @@ type untypedAggHandler[S any] struct {
}

// NewAggregateService creates an aggregate-based command service.
// Panics if reader, writer, or typeMap is nil.
func NewAggregateService[S any](
reader store.EventReader,
writer store.EventWriter,
typeMap *codec.TypeMap,
fold func(S, any) S,
zero S,
) *AggregateService[S] {
if typeMap == nil {
panic("command: typeMap must not be nil")
}
return &AggregateService[S]{
reader: reader,
writer: writer,
typeMap: typeMap,
fold: fold,
zero: zero,
handlers: make(map[reflect.Type]untypedAggHandler[S]),
Expand Down Expand Up @@ -97,11 +106,11 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul
}

// Step 7: If no changes, return current state (no-op).
changes := agg.Changes()
if len(changes) == 0 {
rawChanges := agg.Changes()
if len(rawChanges) == 0 {
return &Result[S]{
State: agg.State(),
NewEvents: nil,
Changes: nil,
StreamVersion: agg.OriginalVersion(),
}, nil
}
Expand All @@ -112,10 +121,19 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul
return nil, err
}

// Step 9: Return result.
// Step 9: Build typed changes and return result.
changes := make([]Change, len(rawChanges))
for i, e := range rawChanges {
typeName, err := svc.typeMap.TypeName(e)
if err != nil {
return nil, fmt.Errorf("command: resolving event type: %w", err)
}
changes[i] = Change{Event: e, EventType: typeName}
}

return &Result[S]{
State: agg.State(),
NewEvents: changes,
Changes: changes,
GlobalPosition: appendResult.GlobalPosition,
StreamVersion: appendResult.NextExpectedVersion,
}, nil
Expand Down
22 changes: 11 additions & 11 deletions core/command/aggservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func newAggService(s *memstore.Store) *command.AggregateService[testdomain.BookingState] {
return command.NewAggregateService[testdomain.BookingState](s, s, testdomain.BookingFold, testdomain.BookingState{})
return command.NewAggregateService[testdomain.BookingState](s, s, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{})
}

// aggStreamName returns the stream name as the AggregateService generates it.
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestAggService_OnNew_Success(t *testing.T) {
if result == nil {
t.Fatal("expected non-nil result")
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
if result.State.RoomID != "room-42" {
t.Errorf("expected RoomID=room-42, got %s", result.State.RoomID)
Expand Down Expand Up @@ -127,8 +127,8 @@ func TestAggService_OnExisting_Success(t *testing.T) {
if result.State.AmountPaid != 100.0 {
t.Errorf("expected AmountPaid=100.0, got %f", result.State.AmountPaid)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
}

Expand Down Expand Up @@ -164,8 +164,8 @@ func TestAggService_OnAny_Works(t *testing.T) {
if result.State.RoomID != "room-5" {
t.Errorf("expected RoomID=room-5, got %s", result.State.RoomID)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
})

Expand All @@ -183,8 +183,8 @@ func TestAggService_OnAny_Works(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
})
}
Expand Down Expand Up @@ -225,8 +225,8 @@ func TestAggService_NoOp(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.NewEvents))
if len(result.Changes) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.Changes))
}
// Verify nothing was appended.
exists, _ := s.StreamExists(context.Background(), aggStreamName("noop-agg-booking"))
Expand Down
8 changes: 7 additions & 1 deletion core/command/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@

package command

// Change pairs a domain event with its registered type name.
type Change struct {
Event any
EventType string
}

// Result of a handled command.
type Result[S any] struct {
State S
NewEvents []any
Changes []Change
GlobalPosition uint64
StreamVersion int64
}
23 changes: 19 additions & 4 deletions core/command/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package command

import (
"context"
"fmt"
"reflect"

eventuous "github.com/eventuous/eventuous-go/core"
"github.com/eventuous/eventuous-go/core/codec"
"github.com/eventuous/eventuous-go/core/store"
"github.com/google/uuid"
)
Expand All @@ -21,21 +23,28 @@ type CommandHandler[S any] interface {
type Service[S any] struct {
reader store.EventReader
writer store.EventWriter
typeMap *codec.TypeMap
fold func(S, any) S
zero S
handlers map[reflect.Type]untypedHandler[S]
}

// New creates a functional command service.
// Panics if reader, writer, or typeMap is nil.
func New[S any](
reader store.EventReader,
writer store.EventWriter,
typeMap *codec.TypeMap,
fold func(S, any) S,
zero S,
) *Service[S] {
if typeMap == nil {
panic("command: typeMap must not be nil")
}
return &Service[S]{
reader: reader,
writer: writer,
typeMap: typeMap,
fold: fold,
zero: zero,
handlers: make(map[reflect.Type]untypedHandler[S]),
Expand Down Expand Up @@ -79,7 +88,7 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err
if len(newEvents) == 0 {
return &Result[S]{
State: state,
NewEvents: nil,
Changes: nil,
StreamVersion: int64(version),
}, nil
}
Expand All @@ -98,15 +107,21 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err
return nil, err
}

// Step 7: Fold new events into state for the result.
for _, e := range newEvents {
// Step 7: Build changes and fold new events into state.
changes := make([]Change, len(newEvents))
for i, e := range newEvents {
typeName, err := svc.typeMap.TypeName(e)
if err != nil {
return nil, fmt.Errorf("command: resolving event type: %w", err)
}
changes[i] = Change{Event: e, EventType: typeName}
state = svc.fold(state, e)
}

// Step 8: Return Result[S].
return &Result[S]{
State: state,
NewEvents: newEvents,
Changes: changes,
GlobalPosition: appendResult.GlobalPosition,
StreamVersion: appendResult.NextExpectedVersion,
}, nil
Expand Down
18 changes: 9 additions & 9 deletions core/command/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func newService(s *memstore.Store) *command.Service[testdomain.BookingState] {
return command.New[testdomain.BookingState](s, s, testdomain.BookingFold, testdomain.BookingState{})
return command.New[testdomain.BookingState](s, s, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{})
}

// seedEvents directly appends raw events to the memstore for test setup.
Expand Down Expand Up @@ -82,8 +82,8 @@ func TestService_OnNew_Success(t *testing.T) {
if result == nil {
t.Fatal("expected non-nil result")
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
if result.State.RoomID != "room-42" {
t.Errorf("expected RoomID=room-42, got %s", result.State.RoomID)
Expand Down Expand Up @@ -126,8 +126,8 @@ func TestService_OnExisting_Success(t *testing.T) {
if result.State.AmountPaid != 100.0 {
t.Errorf("expected AmountPaid=100.0, got %f", result.State.AmountPaid)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
}

Expand Down Expand Up @@ -176,8 +176,8 @@ func TestService_OnAny_ExistingStream(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
}

Expand Down Expand Up @@ -216,8 +216,8 @@ func TestService_NoOp(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.NewEvents))
if len(result.Changes) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.Changes))
}
// Verify nothing was appended.
exists, _ := s.StreamExists(context.Background(), testdomain.BookingStream("noop-booking"))
Expand Down
6 changes: 3 additions & 3 deletions kurrentdb/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestEndToEnd(t *testing.T) {
bookingID := "e2e-booking-" + uuid.New().String()[:8]

// 2. Create functional command service with the booking fold.
svc := command.New[testdomain.BookingState](store, store, testdomain.BookingFold, testdomain.BookingState{})
svc := command.New[testdomain.BookingState](store, store, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{})

// Register BookRoom handler (stream must be new).
command.On(svc, command.Handler[testdomain.BookRoom, testdomain.BookingState]{
Expand Down Expand Up @@ -84,8 +84,8 @@ func TestEndToEnd(t *testing.T) {
if !result.State.Active {
t.Error("expected Active=true after BookRoom")
}
if len(result.NewEvents) != 1 {
t.Errorf("expected 1 new event from BookRoom, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Errorf("expected 1 new event from BookRoom, got %d", len(result.Changes))
}

// 4. Handle RecordPayment command.
Expand Down
5 changes: 5 additions & 0 deletions samples/booking/domain/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func NewCodec() codec.Codec {
return codec.NewJSON(NewTypeMap())
}

// NewCodecFromTypeMap creates a JSON codec from an existing TypeMap.
func NewCodecFromTypeMap(tm *codec.TypeMap) codec.Codec {
return codec.NewJSON(tm)
}

func mustRegister[E any](tm *codec.TypeMap, name string) {
if err := codec.Register[E](tm, name); err != nil {
panic(err)
Expand Down
20 changes: 10 additions & 10 deletions samples/booking/domain/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ package domain

// BookingState is the write-side state reconstructed by folding events.
type BookingState struct {
ID string
GuestID string
RoomID string
CheckIn string
CheckOut string
Price float64
Outstanding float64
Currency string
Paid bool
Cancelled bool
ID string `json:"id"`
GuestID string `json:"guestId"`
RoomID string `json:"roomId"`
CheckIn string `json:"checkIn"`
CheckOut string `json:"checkOut"`
Price float64 `json:"price"`
Outstanding float64 `json:"outstanding"`
Currency string `json:"currency"`
Paid bool `json:"paid"`
Cancelled bool `json:"cancelled"`
}

// BookingFold is the fold function used by the command service to reconstruct state.
Expand Down
Loading
Loading