Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions core/command/aggservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

eventuous "github.com/eventuous/eventuous-go/core"
"github.com/eventuous/eventuous-go/core/aggregate"
"github.com/eventuous/eventuous-go/core/codec"
"github.com/eventuous/eventuous-go/core/store"
)

Expand All @@ -17,6 +18,7 @@ import (
type AggregateService[S any] struct {
reader store.EventReader
writer store.EventWriter
typeMap *codec.TypeMap
fold func(S, any) S
zero S
handlers map[reflect.Type]untypedAggHandler[S]
Expand All @@ -33,12 +35,14 @@ type untypedAggHandler[S any] struct {
func NewAggregateService[S any](
reader store.EventReader,
writer store.EventWriter,
typeMap *codec.TypeMap,
fold func(S, any) S,
zero S,
) *AggregateService[S] {
return &AggregateService[S]{
reader: reader,
writer: writer,
typeMap: typeMap,
fold: fold,
zero: zero,
handlers: make(map[reflect.Type]untypedAggHandler[S]),
Expand Down Expand Up @@ -97,11 +101,11 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul
}

// Step 7: If no changes, return current state (no-op).
changes := agg.Changes()
if len(changes) == 0 {
rawChanges := agg.Changes()
if len(rawChanges) == 0 {
return &Result[S]{
State: agg.State(),
NewEvents: nil,
Changes: nil,
StreamVersion: agg.OriginalVersion(),
}, nil
}
Expand All @@ -112,10 +116,16 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul
return nil, err
}

// Step 9: Return result.
// Step 9: Build typed changes and return result.
changes := make([]Change, len(rawChanges))
for i, e := range rawChanges {
typeName, _ := svc.typeMap.TypeName(e)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard TypeMap lookup failures in AggregateService results

AggregateService.Handle has the same unchecked svc.typeMap.TypeName(e) call when materializing Result.Changes. A nil typeMap causes a runtime panic, and an unregistered event type is silently converted to eventType:"", so API clients can receive malformed change metadata even though the append succeeded. Returning a clear error here would avoid both crash and silent data-shape corruption.

Useful? React with 👍 / 👎.

changes[i] = Change{Event: e, EventType: typeName}
}

return &Result[S]{
State: agg.State(),
NewEvents: changes,
Changes: changes,
GlobalPosition: appendResult.GlobalPosition,
StreamVersion: appendResult.NextExpectedVersion,
}, nil
Expand Down
22 changes: 11 additions & 11 deletions core/command/aggservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func newAggService(s *memstore.Store) *command.AggregateService[testdomain.BookingState] {
return command.NewAggregateService[testdomain.BookingState](s, s, testdomain.BookingFold, testdomain.BookingState{})
return command.NewAggregateService[testdomain.BookingState](s, s, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{})
}

// aggStreamName returns the stream name as the AggregateService generates it.
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestAggService_OnNew_Success(t *testing.T) {
if result == nil {
t.Fatal("expected non-nil result")
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
if result.State.RoomID != "room-42" {
t.Errorf("expected RoomID=room-42, got %s", result.State.RoomID)
Expand Down Expand Up @@ -127,8 +127,8 @@ func TestAggService_OnExisting_Success(t *testing.T) {
if result.State.AmountPaid != 100.0 {
t.Errorf("expected AmountPaid=100.0, got %f", result.State.AmountPaid)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
}

Expand Down Expand Up @@ -164,8 +164,8 @@ func TestAggService_OnAny_Works(t *testing.T) {
if result.State.RoomID != "room-5" {
t.Errorf("expected RoomID=room-5, got %s", result.State.RoomID)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
})

Expand All @@ -183,8 +183,8 @@ func TestAggService_OnAny_Works(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
})
}
Expand Down Expand Up @@ -225,8 +225,8 @@ func TestAggService_NoOp(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.NewEvents))
if len(result.Changes) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.Changes))
}
// Verify nothing was appended.
exists, _ := s.StreamExists(context.Background(), aggStreamName("noop-agg-booking"))
Expand Down
8 changes: 7 additions & 1 deletion core/command/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@

package command

// Change pairs a domain event with its registered type name.
type Change struct {
Event any
EventType string
}

// Result of a handled command.
type Result[S any] struct {
State S
NewEvents []any
Changes []Change
GlobalPosition uint64
StreamVersion int64
}
15 changes: 11 additions & 4 deletions core/command/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"

eventuous "github.com/eventuous/eventuous-go/core"
"github.com/eventuous/eventuous-go/core/codec"
"github.com/eventuous/eventuous-go/core/store"
"github.com/google/uuid"
)
Expand All @@ -21,6 +22,7 @@ type CommandHandler[S any] interface {
type Service[S any] struct {
reader store.EventReader
writer store.EventWriter
typeMap *codec.TypeMap
fold func(S, any) S
zero S
handlers map[reflect.Type]untypedHandler[S]
Expand All @@ -30,12 +32,14 @@ type Service[S any] struct {
func New[S any](
reader store.EventReader,
writer store.EventWriter,
typeMap *codec.TypeMap,
fold func(S, any) S,
zero S,
) *Service[S] {
return &Service[S]{
reader: reader,
writer: writer,
typeMap: typeMap,
fold: fold,
zero: zero,
handlers: make(map[reflect.Type]untypedHandler[S]),
Expand Down Expand Up @@ -79,7 +83,7 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err
if len(newEvents) == 0 {
return &Result[S]{
State: state,
NewEvents: nil,
Changes: nil,
StreamVersion: int64(version),
}, nil
}
Expand All @@ -98,15 +102,18 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err
return nil, err
}

// Step 7: Fold new events into state for the result.
for _, e := range newEvents {
// Step 7: Build changes and fold new events into state.
changes := make([]Change, len(newEvents))
for i, e := range newEvents {
typeName, _ := svc.typeMap.TypeName(e)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard TypeMap lookup failures when building Service changes

Handle now calls svc.typeMap.TypeName(e) without validating svc.typeMap or the lookup error. If a caller constructs command.New(..., nil, ...), this line panics on the first command that emits events; if an event type is missing from the map, the ignored error produces an empty eventType and violates the new response contract. This should fail fast with an explicit error instead of panicking or silently returning blank type names.

Useful? React with 👍 / 👎.

changes[i] = Change{Event: e, EventType: typeName}
state = svc.fold(state, e)
}

// Step 8: Return Result[S].
return &Result[S]{
State: state,
NewEvents: newEvents,
Changes: changes,
GlobalPosition: appendResult.GlobalPosition,
StreamVersion: appendResult.NextExpectedVersion,
}, nil
Expand Down
18 changes: 9 additions & 9 deletions core/command/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func newService(s *memstore.Store) *command.Service[testdomain.BookingState] {
return command.New[testdomain.BookingState](s, s, testdomain.BookingFold, testdomain.BookingState{})
return command.New[testdomain.BookingState](s, s, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{})
}

// seedEvents directly appends raw events to the memstore for test setup.
Expand Down Expand Up @@ -82,8 +82,8 @@ func TestService_OnNew_Success(t *testing.T) {
if result == nil {
t.Fatal("expected non-nil result")
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
if result.State.RoomID != "room-42" {
t.Errorf("expected RoomID=room-42, got %s", result.State.RoomID)
Expand Down Expand Up @@ -126,8 +126,8 @@ func TestService_OnExisting_Success(t *testing.T) {
if result.State.AmountPaid != 100.0 {
t.Errorf("expected AmountPaid=100.0, got %f", result.State.AmountPaid)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
}

Expand Down Expand Up @@ -176,8 +176,8 @@ func TestService_OnAny_ExistingStream(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Fatalf("expected 1 new event, got %d", len(result.Changes))
}
}

Expand Down Expand Up @@ -216,8 +216,8 @@ func TestService_NoOp(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.NewEvents) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.NewEvents))
if len(result.Changes) != 0 {
t.Errorf("expected 0 new events, got %d", len(result.Changes))
}
// Verify nothing was appended.
exists, _ := s.StreamExists(context.Background(), testdomain.BookingStream("noop-booking"))
Expand Down
6 changes: 3 additions & 3 deletions kurrentdb/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestEndToEnd(t *testing.T) {
bookingID := "e2e-booking-" + uuid.New().String()[:8]

// 2. Create functional command service with the booking fold.
svc := command.New[testdomain.BookingState](store, store, testdomain.BookingFold, testdomain.BookingState{})
svc := command.New[testdomain.BookingState](store, store, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{})

// Register BookRoom handler (stream must be new).
command.On(svc, command.Handler[testdomain.BookRoom, testdomain.BookingState]{
Expand Down Expand Up @@ -84,8 +84,8 @@ func TestEndToEnd(t *testing.T) {
if !result.State.Active {
t.Error("expected Active=true after BookRoom")
}
if len(result.NewEvents) != 1 {
t.Errorf("expected 1 new event from BookRoom, got %d", len(result.NewEvents))
if len(result.Changes) != 1 {
t.Errorf("expected 1 new event from BookRoom, got %d", len(result.Changes))
}

// 4. Handle RecordPayment command.
Expand Down
5 changes: 5 additions & 0 deletions samples/booking/domain/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func NewCodec() codec.Codec {
return codec.NewJSON(NewTypeMap())
}

// NewCodecFromTypeMap creates a JSON codec from an existing TypeMap.
func NewCodecFromTypeMap(tm *codec.TypeMap) codec.Codec {
return codec.NewJSON(tm)
}

func mustRegister[E any](tm *codec.TypeMap, name string) {
if err := codec.Register[E](tm, name); err != nil {
panic(err)
Expand Down
20 changes: 10 additions & 10 deletions samples/booking/domain/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ package domain

// BookingState is the write-side state reconstructed by folding events.
type BookingState struct {
ID string
GuestID string
RoomID string
CheckIn string
CheckOut string
Price float64
Outstanding float64
Currency string
Paid bool
Cancelled bool
ID string `json:"id"`
GuestID string `json:"guestId"`
RoomID string `json:"roomId"`
CheckIn string `json:"checkIn"`
CheckOut string `json:"checkOut"`
Price float64 `json:"price"`
Outstanding float64 `json:"outstanding"`
Currency string `json:"currency"`
Paid bool `json:"paid"`
Cancelled bool `json:"cancelled"`
}

// BookingFold is the fold function used by the command service to reconstruct state.
Expand Down
42 changes: 31 additions & 11 deletions samples/booking/httpapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ func handleBookRoom(svc command.CommandHandler[domain.BookingState]) http.Handle
return
}

writeJSON(w, http.StatusCreated, map[string]any{
"bookingId": bookingID,
"streamVersion": result.StreamVersion,
})
writeJSON(w, http.StatusCreated, newCommandResponse(result))
}
}

Expand Down Expand Up @@ -93,11 +90,7 @@ func handleRecordPayment(svc command.CommandHandler[domain.BookingState]) http.H
return
}

writeJSON(w, http.StatusOK, map[string]any{
"bookingId": bookingID,
"outstanding": result.State.Outstanding,
"paid": result.State.Paid,
})
writeJSON(w, http.StatusOK, newCommandResponse(result))
}
}

Expand All @@ -114,7 +107,7 @@ func handleCancelBooking(svc command.CommandHandler[domain.BookingState]) http.H
return
}

_, err := svc.Handle(r.Context(), domain.CancelBooking{
result, err := svc.Handle(r.Context(), domain.CancelBooking{
BookingID: bookingID,
Reason: req.Reason,
})
Expand All @@ -123,7 +116,7 @@ func handleCancelBooking(svc command.CommandHandler[domain.BookingState]) http.H
return
}

w.WriteHeader(http.StatusOK)
writeJSON(w, http.StatusOK, newCommandResponse(result))
}
}

Expand All @@ -150,6 +143,33 @@ func handleGetGuestBookings(rm *readmodel.BookingReadModel) http.HandlerFunc {
}
}

// commandResponse is the standard JSON envelope for command results,
// matching the Eventuous .NET Result<TState>.Ok shape.
type commandResponse struct {
State any `json:"state"`
Changes []changeResponse `json:"changes"`
GlobalPosition uint64 `json:"globalPosition"`
StreamVersion int64 `json:"streamVersion"`
}

type changeResponse struct {
Event any `json:"event"`
EventType string `json:"eventType"`
}

func newCommandResponse[S any](result *command.Result[S]) commandResponse {
changes := make([]changeResponse, len(result.Changes))
for i, c := range result.Changes {
changes[i] = changeResponse{Event: c.Event, EventType: c.EventType}
}
return commandResponse{
State: result.State,
Changes: changes,
GlobalPosition: result.GlobalPosition,
StreamVersion: result.StreamVersion,
}
}

// writeJSON encodes v as JSON and writes it with the given status code.
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
Expand Down
Loading
Loading