Skip to content
This repository was archived by the owner on Dec 6, 2025. It is now read-only.

Commit e038085

Browse files
committed
update patient
1 parent bfe2c3f commit e038085

File tree

11 files changed

+462
-46
lines changed

11 files changed

+462
-46
lines changed

libs/hwes/aggregate_store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ type AggregateStore interface {
99
// Load populates the aggregate to the recent version of the persisted events
1010
Load(ctx context.Context, aggregate Aggregate) error
1111

12+
// LoadN populates the aggregate by N-many persisted events
13+
LoadN(ctx context.Context, aggregate Aggregate, N uint64) error
14+
1215
// Save persists all uncommitted events of the aggregate, returns consistency
1316
Save(ctx context.Context, aggregate Aggregate) (uint64, error)
1417

libs/hwes/eventstoredb/aggregate_store.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,39 +89,62 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
8989

9090
// Implements AggregateStore interface
9191

92-
func (a *AggregateStore) Load(ctx context.Context, aggregate hwes.Aggregate) error {
93-
stream, err := a.es.ReadStream(ctx, aggregate.GetTypeID(), esdb.ReadStreamOptions{}, math.MaxUint64) // MaxUint64 for "all" events
92+
func (a *AggregateStore) LoadN(ctx context.Context, aggregate hwes.Aggregate, N uint64) error {
93+
if N == 0 {
94+
return nil
95+
}
96+
97+
// start from the beginning, unless we have already processed events
98+
var from esdb.StreamPosition = esdb.Start{}
99+
aE := aggregate.GetAppliedEvents()
100+
if len(aE) != 0 {
101+
// We cant just use the aggreagate's Version here, as we are unable to
102+
// discriminate aggregates where the first event was processed already (version = 0)
103+
// from those aggregates where no event was processed yet (version = 0 by default)
104+
from = esdb.Revision(aE[len(aE)-1].Version + 1)
105+
}
106+
107+
// open a read stream
108+
stream, err := a.es.ReadStream(ctx, aggregate.GetTypeID(), esdb.ReadStreamOptions{
109+
Direction: esdb.Forwards,
110+
From: from,
111+
}, N)
94112
if err != nil {
95-
return fmt.Errorf("AggregateStore.Load: could not open stream: %w", err)
113+
return fmt.Errorf("AggregateStore.LoadN: could not open stream: %w", err)
96114
}
97115
defer stream.Close()
98116

117+
// stream events in
99118
for {
100119
esdbEvent, err := stream.Recv()
101120
if errors.Is(err, io.EOF) {
102-
// exit condition for for-loop
121+
// all events were handled!
103122
break
104123
} else if err != nil {
105-
return fmt.Errorf("AggregateStore.Load: could not read from stream: %w", err)
124+
return fmt.Errorf("AggregateStore.LoadN: could not read from stream: %w", err)
106125
}
107126

108127
event, err := hwes.NewEventFromRecordedEvent(esdbEvent.Event)
109128
if err != nil {
110-
return fmt.Errorf("AggregateStore.Load: %w", err)
129+
return fmt.Errorf("AggregateStore.LoadN: %w", err)
111130
}
112131

113132
if err := aggregate.Progress(event); err != nil {
114-
return fmt.Errorf("AggregateStore.Load: Progress failed: %w", err)
133+
return fmt.Errorf("AggregateStore.LoadN: Progress failed: %w", err)
115134
}
116135
}
117136

118137
if aggregate.IsDeleted() {
119-
return fmt.Errorf("AggregateStore.Load: aggregate has been marked as deleted")
138+
return fmt.Errorf("AggregateStore.LoadN: aggregate has been marked as deleted")
120139
}
121140

122141
return nil
123142
}
124143

144+
func (a *AggregateStore) Load(ctx context.Context, aggregate hwes.Aggregate) error {
145+
return a.LoadN(ctx, aggregate, math.MaxUint64) // MaxUint64 -> all events
146+
}
147+
125148
func (a *AggregateStore) Save(ctx context.Context, aggregate hwes.Aggregate) (uint64, error) {
126149
// We can switch out the getExpectedRevision strategy for testing optimistic concurrency.
127150
// It is not intended to switch the strategy in production.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package eventstoredb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/google/uuid"
7+
zlog "github.com/rs/zerolog/log"
8+
"github.com/stretchr/testify/assert"
9+
"hwes"
10+
"hwtesting"
11+
"os"
12+
"testing"
13+
"time"
14+
)
15+
16+
var endpoint string
17+
18+
func TestMain(m *testing.M) {
19+
ctx, cancel := context.WithCancel(context.Background())
20+
21+
zlog.Info().Msg("starting containers")
22+
endpoints, teardownContainers := hwtesting.StartContainers(ctx, hwtesting.Eventstore)
23+
24+
endpoint = fmt.Sprintf("esdb://%s:%s@%s?tls=false",
25+
hwtesting.EsUser, hwtesting.EsPassword, endpoints.Get(hwtesting.Eventstore))
26+
27+
exit := m.Run()
28+
29+
teardownContainers()
30+
cancel()
31+
os.Exit(exit)
32+
}
33+
34+
func TestLoadN(t *testing.T) {
35+
zlog.Info().Str("endpoint", endpoint).Msg(t.Name())
36+
client := SetupEventStore(endpoint)
37+
as := NewAggregateStore(client)
38+
ctx := context.Background()
39+
40+
id := uuid.New()
41+
eventType := t.Name()
42+
43+
sendAggregate := hwes.NewAggregateBase(hwes.AggregateType(eventType), id)
44+
sendAggregate.RegisterEventListener(eventType, func(evt hwes.Event) error { return nil })
45+
46+
for i := 0; i < 4; i++ {
47+
event, _ := hwes.NewEvent(sendAggregate, eventType, hwes.WithContext(ctx))
48+
assert.NoError(t, sendAggregate.Apply(event))
49+
}
50+
51+
_, err := as.Save(ctx, sendAggregate)
52+
assert.NoError(t, err)
53+
54+
time.Sleep(time.Millisecond * 100)
55+
56+
rxAggregate := hwes.NewAggregateBase(hwes.AggregateType(eventType), id)
57+
rxAggregate.RegisterEventListener(eventType, func(evt hwes.Event) error { return nil })
58+
assert.Len(t, sendAggregate.GetAppliedEvents(), 0)
59+
60+
// LoadN 0 does nothing
61+
err = as.LoadN(ctx, rxAggregate, 0)
62+
assert.NoError(t, err)
63+
assert.Len(t, rxAggregate.GetAppliedEvents(), 0)
64+
65+
// LoadN 1 loads the first event
66+
err = as.LoadN(ctx, rxAggregate, 1)
67+
assert.NoError(t, err)
68+
assert.Len(t, rxAggregate.GetAppliedEvents(), 1)
69+
assert.Equal(t, rxAggregate.GetAppliedEvents()[0].Version, uint64(0))
70+
71+
// LoadN 1 loads another event
72+
err = as.LoadN(ctx, rxAggregate, 1)
73+
assert.NoError(t, err)
74+
assert.Len(t, rxAggregate.GetAppliedEvents(), 2)
75+
assert.Equal(t, rxAggregate.GetAppliedEvents()[1].Version, uint64(1))
76+
77+
// LoadN 2 loads the rest
78+
err = as.LoadN(ctx, rxAggregate, 2)
79+
assert.NoError(t, err)
80+
assert.Len(t, rxAggregate.GetAppliedEvents(), 4)
81+
assert.Equal(t, rxAggregate.GetAppliedEvents()[2].Version, uint64(2))
82+
assert.Equal(t, rxAggregate.GetAppliedEvents()[3].Version, uint64(3))
83+
84+
// LoadN 1 now does nothing anymore
85+
err = as.LoadN(ctx, rxAggregate, 1)
86+
assert.NoError(t, err)
87+
assert.Len(t, rxAggregate.GetAppliedEvents(), 4)
88+
89+
}

libs/hwes/go.mod

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23
55
replace (
66
common => ../common
77
hwlocale => ../hwlocale
8+
hwtesting => ../hwtesting
89
hwutil => ../hwutil
910
telemetry => ../telemetry
1011
)
@@ -15,45 +16,87 @@ require (
1516
github.com/google/uuid v1.6.0
1617
github.com/rs/zerolog v1.33.0
1718
github.com/stretchr/testify v1.9.0
19+
hwtesting v0.0.0
1820
hwutil v0.0.0
1921
telemetry v0.0.0
2022
)
2123

2224
require (
25+
dario.cat/mergo v1.0.0 // indirect
26+
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
2327
github.com/BurntSushi/toml v1.4.0 // indirect
28+
github.com/Mariscal6/testcontainers-spicedb-go v0.1.0 // indirect
29+
github.com/Microsoft/go-winio v0.6.2 // indirect
2430
github.com/beorn7/perks v1.0.1 // indirect
2531
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2632
github.com/cespare/xxhash/v2 v2.3.0 // indirect
33+
github.com/containerd/log v0.1.0 // indirect
34+
github.com/containerd/platforms v0.2.1 // indirect
2735
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
36+
github.com/cpuguy83/dockercfg v0.3.1 // indirect
2837
github.com/dapr/dapr v1.14.2 // indirect
2938
github.com/dapr/go-sdk v1.11.0 // indirect
3039
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
40+
github.com/distribution/reference v0.6.0 // indirect
41+
github.com/docker/docker v27.2.0+incompatible // indirect
42+
github.com/docker/go-connections v0.5.0 // indirect
43+
github.com/docker/go-units v0.5.0 // indirect
3144
github.com/fatih/structs v1.1.0 // indirect
45+
github.com/felixge/httpsnoop v1.0.4 // indirect
3246
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
3347
github.com/go-logr/logr v1.4.2 // indirect
3448
github.com/go-logr/stdr v1.2.2 // indirect
49+
github.com/go-ole/go-ole v1.2.6 // indirect
3550
github.com/go-playground/locales v0.14.1 // indirect
3651
github.com/go-playground/universal-translator v0.18.1 // indirect
3752
github.com/go-playground/validator/v10 v10.22.1 // indirect
53+
github.com/gogo/protobuf v1.3.2 // indirect
54+
github.com/golang-migrate/migrate/v4 v4.18.1 // indirect
3855
github.com/golang/protobuf v1.5.4 // indirect
3956
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // indirect
4057
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
4158
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
59+
github.com/hashicorp/errwrap v1.1.0 // indirect
60+
github.com/hashicorp/go-multierror v1.1.1 // indirect
4261
github.com/joho/godotenv v1.5.1 // indirect
4362
github.com/klauspost/compress v1.17.9 // indirect
4463
github.com/leodido/go-urn v1.4.0 // indirect
64+
github.com/lib/pq v1.10.9 // indirect
65+
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
66+
github.com/magiconair/properties v1.8.7 // indirect
4567
github.com/mattn/go-colorable v0.1.13 // indirect
4668
github.com/mattn/go-isatty v0.0.20 // indirect
69+
github.com/moby/docker-image-spec v1.3.1 // indirect
70+
github.com/moby/patternmatcher v0.6.0 // indirect
71+
github.com/moby/sys/sequential v0.5.0 // indirect
72+
github.com/moby/sys/user v0.1.0 // indirect
73+
github.com/moby/sys/userns v0.1.0 // indirect
74+
github.com/moby/term v0.5.0 // indirect
75+
github.com/morikuni/aec v1.0.0 // indirect
4776
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
4877
github.com/nicksnyder/go-i18n/v2 v2.4.0 // indirect
78+
github.com/opencontainers/go-digest v1.0.0 // indirect
79+
github.com/opencontainers/image-spec v1.1.0 // indirect
4980
github.com/openzipkin/zipkin-go v0.4.3 // indirect
81+
github.com/pkg/errors v0.9.1 // indirect
5082
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
83+
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
5184
github.com/pquerna/cachecontrol v0.1.0 // indirect
5285
github.com/prometheus/client_golang v1.20.3 // indirect
5386
github.com/prometheus/client_model v0.6.1 // indirect
5487
github.com/prometheus/common v0.55.0 // indirect
5588
github.com/prometheus/procfs v0.15.1 // indirect
89+
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
90+
github.com/shoenig/go-m1cpu v0.1.6 // indirect
91+
github.com/sirupsen/logrus v1.9.3 // indirect
92+
github.com/testcontainers/testcontainers-go v0.33.0 // indirect
93+
github.com/testcontainers/testcontainers-go/modules/postgres v0.33.0 // indirect
94+
github.com/testcontainers/testcontainers-go/modules/redis v0.33.0 // indirect
95+
github.com/tklauser/go-sysconf v0.3.12 // indirect
96+
github.com/tklauser/numcpus v0.6.1 // indirect
97+
github.com/yusufpapurcu/wmi v1.2.3 // indirect
5698
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
99+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
57100
go.opentelemetry.io/otel v1.30.0 // indirect
58101
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect
59102
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.30.0 // indirect
@@ -63,6 +106,7 @@ require (
63106
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
64107
go.opentelemetry.io/otel/trace v1.30.0 // indirect
65108
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
109+
go.uber.org/atomic v1.10.0 // indirect
66110
golang.org/x/crypto v0.27.0 // indirect
67111
golang.org/x/net v0.29.0 // indirect
68112
golang.org/x/oauth2 v0.23.0 // indirect

0 commit comments

Comments
 (0)