Skip to content

Commit b042d15

Browse files
feat: Phase 1 — Core framework + KurrentDB integration (#1)
* chore: scaffold multi-module Go project structure Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add shared primitives — StreamName, Metadata, errors Implements StreamName (typed string with Category/ID split on first dash), ExpectedVersion and ExpectedState constants, Metadata map with immutable WithCorrelationID/WithCausationID helpers, and sentinel error values. All behaviour is covered by six unit tests (TDD green). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add TypeMap for bidirectional event type registration Implements TypeMap in core/codec with Register[E], TypeName, and NewInstance. Uses reflect.Type as the key for the forward map so both pointer and value forms of a type resolve to the same entry. Concurrent-safe via sync.RWMutex. Idempotent registration; errors on conflicts. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add Codec interface and JSON implementation Introduce the Codec interface (Encode/Decode) and JSONCodec backed by TypeMap for event type resolution. Codec.Decode dereferences the *T returned by TypeMap.NewInstance so callers always receive a value, not a pointer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add Aggregate[S] domain type with fold, apply, and guards Implements the generic Aggregate[S] type that tracks state via a fold function, accumulates pending changes for optimistic concurrency, and provides EnsureNew/EnsureExists guards for creation and mutation commands. Includes all 10 mirrored tests from the .NET Eventuous aggregate test suite. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add EventStore/Reader/Writer interfaces and stream types Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add in-memory EventStore for testing Implements a thread-safe in-memory EventStore (memstore.Store) under core/test/memstore/ that satisfies the store.EventStore interface. Provides optimistic-concurrency checks, backwards reads, stream deletion, and truncation — no infrastructure required. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add store conformance test suite and validate with in-memory store Implements storetest.RunAll with sub-suites for Append, Read, ReadBackwards, OptimisticConcurrency, StreamExists, DeleteStream, and TruncateStream. Fixes a uint64 overflow bug in memstore.ReadEventsBackwards where passing ^uint64(0) as the start sentinel was cast to int(-1), bypassing the bounds clamp. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add LoadState, LoadAggregate, StoreAggregate functions Implements store helper functions for the functional command service and aggregate command service. LoadState reads all events from a stream, folds them into state, and handles IsNew/IsExisting/IsAny semantics. LoadAggregate wraps LoadState to reconstruct an Aggregate[S]. StoreAggregate appends pending changes with optimistic concurrency control. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(core): add shared Booking test domain mirroring .NET Eventuous.Sut.Domain Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add functional command service with handler registration Implements Service[S] with On[C,S]() registration, Handle() pipeline (load state → act → append → fold), and 8 tests covering IsNew/IsExisting/IsAny, no-op, and handler-not-found scenarios. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(core): add aggregate command service Implement AggregateService[S] and OnAggregate registration in core/command/aggservice.go. The service loads an aggregate via store.LoadAggregate, enforces IsNew/IsExisting/IsAny guards, invokes the handler's Act function (which records events via agg.Apply), and persists changes with store.StoreAggregate. Covers the full test matrix: OnNew success/conflict, OnExisting success/missing, OnAny new and existing streams, handler not found, and no-op (no Apply called). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add subscription framework with middleware chain Implements the subscription package with Subscription interface, EventHandler/HandlerFunc, ConsumeContext, and Middleware chain including WithLogging, WithConcurrency, and WithPartitioning. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(core): add checkpoint committer with gap detection Implements CheckpointStore interface and CheckpointCommitter that safely batches checkpoint writes while ensuring no position is committed until all prior sequence numbers have been processed, preventing data loss under concurrent/out-of-order event processing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(core): add subscription conformance test suite Adds subtest package with TestConsumeProducedEvents and TestResumeFromCheckpoint exported test functions any subscription implementation can call to verify conformance, along with collectingHandler and memCheckpointStore helpers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(kurrentdb): implement EventStore with conformance tests passing Add KurrentDB event store implementation that satisfies the store.EventStore interface using the KurrentDB Go client. All 21 conformance tests pass against a live KurrentDB instance. Also update storetest to use per-process unique stream names and export a NewCodec helper so external store implementations can reuse the conformance test codec. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(kurrentdb): use testcontainers for integration tests Replace hardcoded localhost:2113 connection with testcontainers-go to start an ephemeral EventStoreDB container per test run, removing the docker-compose dependency for tests. Also fix isStreamNotFound to add a string-based fallback for errors from rs.Recv() that don't unwrap into *kurrentdb.Error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(kurrentdb): add catch-up subscription for stream and $all Implements CatchUp subscription that consumes events from KurrentDB via either a single stream or the $all stream. Supports checkpoint- based resumption, middleware chaining, server-side filters, and link resolution. Includes integration tests for stream consumption, $all consumption, and checkpoint resume. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(kurrentdb): add persistent subscription with ack/nack Implement server-managed persistent subscriptions for KurrentDB with auto-creation of subscription groups, ack on handler success, and nack with retry on handler failure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(otel): add command tracing and subscription tracing middleware Implements TracedCommandHandler wrapping any CommandHandler[S] with OTel span creation and duration/error metrics, and TracingMiddleware providing a subscription.Middleware that creates a span per consumed event. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(kurrentdb): add subscription conformance tests and enhance subtest package Wire the subscription conformance suite from core/test/subtest/ to the KurrentDB catch-up subscription. Export TestEvent type and add NewCodec() to the subtest package so external modules can build a matching codec. Add per-process runID to stream names for test isolation against persistent stores. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(otel): add command tracing and subscription tracing middleware Implements TracedCommandHandler wrapping command.CommandHandler with OTel spans, duration histogram, and error counter; TracingMiddleware for subscriptions adds per-event spans with OK/error status. Adds missing error-path test and success span.SetStatus(Ok) to subscription middleware. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(kurrentdb): add end-to-end integration test Exercises the full stack: codec → KurrentDB store → functional command service (BookRoom + RecordPayment) → catch-up subscription, using the shared testdomain types and a real KurrentDB container. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: add GitHub Actions CI pipeline - Lint: go vet + gofmt check across all modules - Build: parallel build of core, kurrentdb, otel - Test core: unit tests with race detector (no infra needed) - Test otel: unit tests with race detector (no infra needed) - Test kurrentdb: integration tests via testcontainers (Docker) Runs on PRs and pushes to main/dev. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: fix gofmt formatting in 6 files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: add Apache 2.0 license headers to all Go files Adds copyright and license headers to all 55 .go files matching the .NET Eventuous convention. Also adds the full LICENSE file. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: add README with quick start, architecture, and examples Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address code review feedback - WithConcurrency: block until handler completes and return its error, preventing checkpoint/ack before processing finishes - TruncateStream: validate expected version against data stream before writing metadata, honoring optimistic concurrency - StoreAggregate: ClearChanges now advances originalVersion so the aggregate can be reused without stale version conflicts - CI: bump Go version to 1.25 to match kurrentdb/otel module requirements Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: add regression tests for code review fixes - TestWithConcurrency_ReturnsHandlerError: verify handler errors are propagated, not swallowed (catches fire-and-forget bug) - TestWithConcurrency_BlocksUntilHandlerCompletes: verify HandleEvent blocks until the inner handler finishes (catches premature ack bug) - TestClearChanges_AdvancesVersion: verify ClearChanges bumps OriginalVersion so the aggregate is reusable - TestClearChanges_ThenApply: verify Apply after ClearChanges produces correct versions for a second command cycle - TestStoreAggregate_AggregateReusableAfterStore: verify the aggregate can be stored twice without ErrOptimisticConcurrency Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: fix gofmt formatting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: flesh out CLAUDE.md with architecture, conventions, and test patterns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: skip pipeline for docs-only and non-code changes Add paths-ignore to the push trigger (was only on pull_request). Also exclude LICENSE and .gitignore from triggering builds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5b6700a commit b042d15

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+7572
-0
lines changed

.github/workflows/ci.yml

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
name: CI
2+
3+
on:
4+
pull_request:
5+
paths-ignore:
6+
- "docs/**"
7+
- "**.md"
8+
- "LICENSE"
9+
- ".gitignore"
10+
push:
11+
branches:
12+
- main
13+
- dev
14+
paths-ignore:
15+
- "docs/**"
16+
- "**.md"
17+
- "LICENSE"
18+
- ".gitignore"
19+
20+
permissions:
21+
contents: read
22+
23+
env:
24+
GO_VERSION: "1.25"
25+
26+
jobs:
27+
lint:
28+
name: Lint
29+
runs-on: ubuntu-latest
30+
steps:
31+
- uses: actions/checkout@v4
32+
- uses: actions/setup-go@v5
33+
with:
34+
go-version: ${{ env.GO_VERSION }}
35+
36+
- name: Vet core
37+
run: cd core && go vet ./...
38+
39+
- name: Vet kurrentdb
40+
run: cd kurrentdb && go vet ./...
41+
42+
- name: Vet otel
43+
run: cd otel && go vet ./...
44+
45+
- name: Check formatting
46+
run: |
47+
unformatted=$(gofmt -l core/ kurrentdb/ otel/)
48+
if [ -n "$unformatted" ]; then
49+
echo "Files not formatted:"
50+
echo "$unformatted"
51+
exit 1
52+
fi
53+
54+
build:
55+
name: Build
56+
runs-on: ubuntu-latest
57+
strategy:
58+
fail-fast: false
59+
matrix:
60+
module: [core, kurrentdb, otel]
61+
steps:
62+
- uses: actions/checkout@v4
63+
- uses: actions/setup-go@v5
64+
with:
65+
go-version: ${{ env.GO_VERSION }}
66+
67+
- name: Build ${{ matrix.module }}
68+
run: cd ${{ matrix.module }} && go build ./...
69+
70+
test-core:
71+
name: Test core
72+
needs: build
73+
runs-on: ubuntu-latest
74+
steps:
75+
- uses: actions/checkout@v4
76+
- uses: actions/setup-go@v5
77+
with:
78+
go-version: ${{ env.GO_VERSION }}
79+
80+
- name: Run core tests
81+
run: cd core && go test -race -count=1 ./...
82+
83+
test-otel:
84+
name: Test otel
85+
needs: build
86+
runs-on: ubuntu-latest
87+
steps:
88+
- uses: actions/checkout@v4
89+
- uses: actions/setup-go@v5
90+
with:
91+
go-version: ${{ env.GO_VERSION }}
92+
93+
- name: Run otel tests
94+
run: cd otel && go test -race -count=1 ./...
95+
96+
test-kurrentdb:
97+
name: Test kurrentdb
98+
needs: build
99+
runs-on: ubuntu-latest
100+
timeout-minutes: 15
101+
steps:
102+
- uses: actions/checkout@v4
103+
- uses: actions/setup-go@v5
104+
with:
105+
go-version: ${{ env.GO_VERSION }}
106+
107+
- name: Run kurrentdb integration tests
108+
run: cd kurrentdb && go test -race -count=1 -timeout 300s ./...

.gitignore

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Binaries
2+
*.exe
3+
*.dll
4+
*.so
5+
*.dylib
6+
7+
# Test artifacts
8+
*.test
9+
*.out
10+
coverage.txt
11+
12+
# IDE
13+
.idea/
14+
.vscode/
15+
*.swp
16+
17+
# OS
18+
.DS_Store

CLAUDE.md

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code when working with code in this repository.
4+
5+
## What Is Eventuous Go
6+
7+
Go port of [Eventuous](https://github.com/Eventuous/eventuous), a production-grade Event Sourcing library. Implements DDD tactical patterns for Go: aggregates, command services, event stores, subscriptions, and projections. Functional-first design — the functional command service is the primary path; aggregate-based service is an optional layer.
8+
9+
## Build & Test Commands
10+
11+
```bash
12+
# Build all modules
13+
(cd core && go build ./...)
14+
(cd kurrentdb && go build ./...)
15+
(cd otel && go build ./...)
16+
17+
# Run all core tests (no infrastructure needed)
18+
(cd core && go test ./...)
19+
20+
# Run all core tests with race detector
21+
(cd core && go test -race ./...)
22+
23+
# Run kurrentdb integration tests (uses testcontainers — Docker required)
24+
(cd kurrentdb && go test -race -timeout 300s ./...)
25+
26+
# Run otel tests
27+
(cd otel && go test -race ./...)
28+
29+
# Run a single test by name
30+
(cd core && go test ./aggregate/... -run TestClearChanges_AdvancesVersion -v)
31+
32+
# Check formatting
33+
gofmt -l core/ kurrentdb/ otel/
34+
35+
# Vet all modules
36+
(cd core && go vet ./...) && (cd kurrentdb && go vet ./...) && (cd otel && go vet ./...)
37+
```
38+
39+
Integration tests use testcontainers-go to start KurrentDB automatically — no manual `docker compose up` needed. Docker must be running.
40+
41+
## Module Structure
42+
43+
This is a multi-module Go project for dependency efficiency. Each module has its own `go.mod`.
44+
45+
```
46+
core/ # github.com/eventuous/eventuous-go/core
47+
├── eventuous.go # StreamName, ExpectedVersion, Metadata, sentinel errors
48+
├── aggregate/ # Domain: Aggregate[S] with fold, apply, guards
49+
├── codec/ # TypeMap (bidirectional type registry), Codec interface, JSON impl
50+
├── store/ # EventReader/Writer/Store interfaces, LoadState, LoadAggregate, StoreAggregate
51+
├── command/ # Functional Service[S], AggregateService[S], handler registration
52+
├── subscription/ # EventHandler, middleware chain, CheckpointCommitter with gap detection
53+
└── test/ # Exported test infrastructure
54+
├── memstore/ # In-memory EventStore for unit testing
55+
├── storetest/ # Store conformance test suite (run by any store impl)
56+
├── subtest/ # Subscription conformance test suite
57+
└── testdomain/ # Shared Booking domain (events, state, commands, codec setup)
58+
59+
kurrentdb/ # github.com/eventuous/eventuous-go/kurrentdb
60+
├── store.go # EventStore implementation for KurrentDB
61+
├── catchup.go # Catch-up subscription (stream + $all)
62+
├── persistent.go # Persistent subscription (stream + $all, ack/nack)
63+
└── options.go # Functional options for subscriptions
64+
65+
otel/ # github.com/eventuous/eventuous-go/otel
66+
├── command.go # TracedCommandHandler decorator (tracing + metrics)
67+
└── subscription.go # TracingMiddleware for subscriptions
68+
```
69+
70+
Dependencies flow: `kurrentdb → core`, `otel → core`. Core depends on nothing heavy.
71+
72+
## Architecture
73+
74+
### Domain Model
75+
76+
`Aggregate[S]` tracks state, pending changes, and version. State is any struct — no interface needed. State reconstruction uses a **fold function** (`func(S, any) S`) with a type switch, not handler registration.
77+
78+
### Command Services (Two Approaches)
79+
80+
**Functional** (primary): `command.Service[S]` — loads state via fold, handler is `func(ctx, state, cmd) ([]any, error)`, no aggregate involved. Registered via `command.On(svc, handler)`.
81+
82+
**Aggregate** (optional): `command.AggregateService[S]` — loads aggregate, handler calls `agg.Apply()`, framework reads `agg.Changes()`. Registered via `command.OnAggregate(svc, handler)`.
83+
84+
### Persistence
85+
86+
`store.EventReader`, `store.EventWriter`, `store.EventStore` interfaces. Package-level generic functions `LoadState`, `LoadAggregate`, `StoreAggregate` handle the load/store cycle.
87+
88+
### Subscriptions
89+
90+
`subscription.EventHandler` interface with `HandlerFunc` adaptor. Middleware chain pattern (like `net/http`): `WithConcurrency`, `WithPartitioning`, `WithLogging`. `CheckpointCommitter` handles batched commits with gap detection.
91+
92+
### Serialization
93+
94+
`codec.TypeMap` for bidirectional type name mapping (explicit registration, no reflection magic). `codec.Codec` interface with `JSONCodec` implementation.
95+
96+
## Key Conventions
97+
98+
- **Package name**: the root package is `eventuous` (import as `eventuous "github.com/eventuous/eventuous-go/core"`)
99+
- **Stream naming**: `{Category}-{ID}` via `eventuous.NewStreamName(category, id)`
100+
- **Type mapping**: events must be explicitly registered in `codec.TypeMap``codec.Register[MyEvent](tm, "MyEvent")`
101+
- **Errors**: sentinel errors with `errors.Is()``ErrStreamNotFound`, `ErrOptimisticConcurrency`, `ErrHandlerNotFound`
102+
- **Context**: all I/O functions take `context.Context` as first parameter
103+
- **No DI container**: explicit wiring, functional options for configuration
104+
105+
## Code Style
106+
107+
- Go 1.25+ (matches module requirements in go.mod)
108+
- Use generics where they reduce boilerplate, not for everything
109+
- Errors over panics. Sentinel errors with `errors.Is()`
110+
- `context.Context` as first parameter on all I/O functions
111+
- `slog` for structured logging
112+
- Functional options for configuration (subscription options, etc.)
113+
- Table-driven tests
114+
- All `.go` files must have the license header:
115+
```go
116+
// Copyright (C) Eventuous HQ OÜ. All rights reserved
117+
// Licensed under the Apache License, Version 2.0.
118+
```
119+
- Run `gofmt` before committing — CI enforces formatting
120+
- Run `go vet` — CI enforces vet
121+
122+
## Test Patterns
123+
124+
### Conformance test suites
125+
126+
Store implementations run the shared conformance suite:
127+
```go
128+
func TestMyStore(t *testing.T) {
129+
s := mystore.New()
130+
storetest.RunAll(t, s)
131+
}
132+
```
133+
134+
### Shared test domain
135+
136+
All command service and subscription tests use the Booking domain from `core/test/testdomain/`:
137+
- Events: `RoomBooked`, `BookingImported`, `PaymentRecorded`, `BookingCancelled`
138+
- State: `BookingState` with `BookingFold`
139+
- Commands: `BookRoom`, `ImportBooking`, `RecordPayment`, `CancelBooking`
140+
- Helpers: `testdomain.NewCodec()`, `testdomain.BookingStream(id)`
141+
142+
### Integration tests
143+
144+
KurrentDB tests use testcontainers-go — `setupClient(t)` in `kurrentdb/testutil_test.go` starts a container automatically.
145+
146+
## Design Specs
147+
148+
- Design spec: `docs/specs/2026-03-20-phase1-design.md`
149+
- Implementation plan: `docs/specs/2026-03-20-phase1-plan.md`

0 commit comments

Comments
 (0)