Skip to content

Commit c554ac0

Browse files
committed
Aggregate layout/naming consistency
1 parent eec85d9 commit c554ac0

File tree

34 files changed

+357
-513
lines changed

34 files changed

+357
-513
lines changed

equinox-testbed/Program.fs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ module CmdParser =
6969
member __.Options = a.GetResults Cached @ a.GetResults Unfolds
7070
member __.Cache = __.Options |> List.contains Cached
7171
member __.Unfolds = __.Options |> List.contains Unfolds
72-
member __.BatchSize = a.GetResult(BatchSize,500)
73-
member __.Test = a.GetResult(Name,Tests.Favorite)
74-
member __.ErrorCutoff = a.GetResult(ErrorCutoff,10000L)
75-
member __.TestsPerSecond = a.GetResult(TestsPerSecond,100)
76-
member __.Duration = a.GetResult(DurationM,30.) |> TimeSpan.FromMinutes
72+
member __.BatchSize = a.GetResult(BatchSize, 500)
73+
member __.Test = a.GetResult(Name, Tests.Favorite)
74+
member __.ErrorCutoff = a.GetResult(ErrorCutoff, 10000L)
75+
member __.TestsPerSecond = a.GetResult(TestsPerSecond, 100)
76+
member __.Duration = a.GetResult(DurationM, 30.) |> TimeSpan.FromMinutes
7777
member __.ReportingIntervals =
7878
match a.GetResults(ReportIntervalS) with
7979
| [] -> TimeSpan.FromSeconds 10.|> Seq.singleton
@@ -90,13 +90,13 @@ module CmdParser =
9090
| Some (Es sargs) ->
9191
let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Parameters.VerboseStore
9292
log.Information("Running transactions in-process against EventStore with storage options: {options:l}", __.Options)
93-
storeLog, Storage.EventStore.config (log,storeLog) (__.Cache, __.Unfolds, __.BatchSize) sargs
93+
storeLog, Storage.EventStore.config (log, storeLog) (__.Cache, __.Unfolds, __.BatchSize) sargs
9494
//#endif
9595
//#if cosmos
9696
| Some (Cosmos sargs) ->
9797
let storeLog = createStoreLog <| sargs.Contains Storage.Cosmos.Parameters.VerboseStore
9898
log.Information("Running transactions in-process against CosmosDb with storage options: {options:l}", __.Options)
99-
storeLog, Storage.Cosmos.config (log,storeLog) (__.Cache, __.Unfolds, __.BatchSize) (Storage.Cosmos.Arguments sargs)
99+
storeLog, Storage.Cosmos.config (log, storeLog) (__.Cache, __.Unfolds, __.BatchSize) (Storage.Cosmos.Arguments sargs)
100100
//#endif
101101
#if ((!cosmos && !eventStore) || (cosmos && eventStore))
102102
| _ -> raise <| Storage.MissingArg (sprintf "Please identify a valid store: memory, es, cosmos")
@@ -140,7 +140,7 @@ module LoadTest =
140140
with e -> domainLog.Warning(e, "Test threw an exception"); e.Reraise () }
141141
execute
142142
let private createResultLog fileName = LoggerConfiguration().WriteTo.File(fileName).CreateLogger()
143-
let run (log: ILogger) (verbose,verboseConsole,maybeSeq) reportFilename (a : CmdParser.TestArguments) =
143+
let run (log: ILogger) (verbose, verboseConsole, maybeSeq) reportFilename (a : CmdParser.TestArguments) =
144144
let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq
145145
let storeLog, storeConfig: ILogger * Storage.StorageConfig = a.ConfigureStore(log, createStoreLog)
146146
let runSingleTest : ClientId -> Async<unit> =
@@ -200,8 +200,8 @@ let main argv =
200200
let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None
201201
let verbose = args.Contains Verbose
202202
let log = createDomainLog verbose verboseConsole maybeSeq
203-
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
204-
LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename (TestArguments rargs)
203+
let reportFilename = args.GetResult(LogFile, programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
204+
LoadTest.run log (verbose, verboseConsole, maybeSeq) reportFilename (TestArguments rargs)
205205
| _ -> failwith "Please specify a valid subcommand :- run"
206206
0
207207
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1

equinox-testbed/Services.fs

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ module Domain =
1919
| Unfavorited of Unfavorited
2020
interface TypeShape.UnionContract.IUnionContract
2121
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
22+
let (|For|) (id : ClientId) = Equinox.AggregateId("Favorites", ClientId.toString id)
2223

23-
module Folds =
24+
module Fold =
2425

2526
type State = Events.Favorited []
2627

@@ -50,44 +51,42 @@ module Domain =
5051
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
5152
| Unfavorite of skuId : SkuId
5253

53-
module Commands =
54-
let interpret command (state : Folds.State) =
55-
let doesntHave skuId = state |> Array.exists (fun x -> x.skuId = skuId) |> not
56-
match command with
57-
| Favorite (date = date; skuIds = skuIds) ->
58-
[ for skuId in Seq.distinct skuIds do
59-
if doesntHave skuId then
60-
yield Events.Favorited { date = date; skuId = skuId } ]
61-
| Unfavorite skuId ->
62-
if doesntHave skuId then [] else
63-
[ Events.Unfavorited { skuId = skuId } ]
64-
65-
type Service(log, resolveStream, ?maxAttempts) =
66-
67-
let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toString id)
68-
let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 2)
69-
let execute (Stream stream) command : Async<unit> =
70-
stream.Transact(Commands.interpret command)
71-
let read (Stream stream) : Async<Events.Favorited []> =
72-
stream.Query id
54+
let interpret command (state : Fold.State) =
55+
let doesntHave skuId = state |> Array.exists (fun x -> x.skuId = skuId) |> not
56+
match command with
57+
| Favorite (date = date; skuIds = skuIds) ->
58+
[ for skuId in Seq.distinct skuIds do
59+
if doesntHave skuId then
60+
yield Events.Favorited { date = date; skuId = skuId } ]
61+
| Unfavorite skuId ->
62+
if doesntHave skuId then [] else
63+
[ Events.Unfavorited { skuId = skuId } ]
64+
65+
type Service internal (log, resolve, maxAttempts) =
66+
67+
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)
7368

7469
member __.Execute(clientId, command) =
75-
execute clientId command
70+
let stream = resolve clientId
71+
stream.Transact(interpret command)
7672

77-
member __.Favorite(clientId, skus) =
78-
execute clientId (Command.Favorite(DateTimeOffset.Now, skus))
73+
member x.Favorite(clientId, skus) =
74+
x.Execute(clientId, Command.Favorite (DateTimeOffset.Now, skus))
7975

80-
member __.Unfavorite(clientId, skus) =
81-
execute clientId (Command.Unfavorite skus)
76+
member x.Unfavorite(clientId, skus) =
77+
x.Execute(clientId, Command.Unfavorite skus)
8278

8379
member __.List clientId : Async<Events.Favorited []> =
84-
read clientId
85-
80+
let stream = resolve clientId
81+
stream.Query id
82+
83+
let create log resolve = Service(log, resolve, maxAttempts = 3)
84+
8685
open Microsoft.Extensions.DependencyInjection
8786

8887
type StreamResolver(storage) =
8988
member __.Resolve
90-
( codec : FsCodec.IUnionEncoder<'event,byte[],_>,
89+
( codec : FsCodec.IUnionEncoder<'event, byte[], _>,
9190
fold: ('state -> 'event seq -> 'state),
9291
initial: 'state,
9392
snapshot: (('event -> bool) * ('state -> 'event))) =
@@ -99,22 +98,22 @@ type StreamResolver(storage) =
9998
//#if eventStore
10099
| Storage.StorageConfig.Es (gateway, caching, unfolds) ->
101100
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
102-
Equinox.EventStore.Resolver<'event,'state,_>(gateway, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
101+
Equinox.EventStore.Resolver<'event, 'state, _>(gateway, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
103102
//#endif
104103
//#if cosmos
105104
| Storage.StorageConfig.Cosmos (gateway, caching, unfolds, databaseId, containerId) ->
106105
let store = Equinox.Cosmos.Context(gateway, databaseId, containerId)
107106
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot else Equinox.Cosmos.AccessStrategy.Unoptimized
108-
Equinox.Cosmos.Resolver<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
107+
Equinox.Cosmos.Resolver<'event, 'state, _>(store, codec, fold, initial, caching, accessStrategy).Resolve
109108
//#endif
110109

111110
type ServiceBuilder(storageConfig, handlerLog) =
112111
let resolver = StreamResolver(storageConfig)
113112

114113
member __.CreateFavoritesService() =
115-
let fold, initial = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial
116-
let snapshot = Domain.Favorites.Folds.isOrigin,Domain.Favorites.Folds.compact
117-
Domain.Favorites.Service(handlerLog, resolver.Resolve(Domain.Favorites.Events.codec,fold,initial,snapshot))
114+
let fold, initial = Domain.Favorites.Fold.fold, Domain.Favorites.Fold.initial
115+
let snapshot = Domain.Favorites.Fold.isOrigin, Domain.Favorites.Fold.compact
116+
Domain.Favorites.create handlerLog (resolver.Resolve(Domain.Favorites.Events.codec, fold, initial, snapshot))
118117

119118
let register (services : IServiceCollection, storageConfig, handlerLog) =
120119
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore

equinox-testbed/Storage.fs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ module Cosmos =
5959
| Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)"
6060
| Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)"
6161
type Arguments(a : ParseResults<Parameters>) =
62-
member __.Mode = a.GetResult(ConnectionMode,Equinox.Cosmos.ConnectionMode.Direct)
62+
member __.Mode = a.GetResult(ConnectionMode, Equinox.Cosmos.ConnectionMode.Direct)
6363
member __.Connection = a.TryGetResult Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection"
6464
member __.Database = a.TryGetResult Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database"
6565
member __.Container = a.TryGetResult Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container"
6666

67-
member __.Timeout = a.GetResult(Timeout,5.) |> TimeSpan.FromSeconds
68-
member __.Retries = a.GetResult(Retries,1)
67+
member __.Timeout = a.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
68+
member __.Retries = a.GetResult(Retries, 1)
6969
member __.MaxRetryWaitTime = a.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds
7070

7171
/// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either:
@@ -77,7 +77,7 @@ module Cosmos =
7777

7878
let private createGateway connection maxItems = Gateway(connection, BatchingPolicy(defaultMaxItems=maxItems))
7979
let private context (log: ILogger, storeLog: ILogger) (a : Arguments) =
80-
let (Discovery.UriAndKey (endpointUri,_)) as discovery = a.Connection|> Discovery.FromConnectionString
80+
let (Discovery.UriAndKey (endpointUri, _)) as discovery = a.Connection|> Discovery.FromConnectionString
8181
log.Information("CosmosDb {mode} {connection} Database {database} Container {container}",
8282
a.Mode, endpointUri, a.Database, a.Container)
8383
Log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
@@ -123,13 +123,13 @@ module EventStore =
123123
open Equinox.EventStore
124124

125125
type Arguments(a : ParseResults<Parameters>) =
126-
member __.Host = a.GetResult(Host,"localhost")
127-
member __.Credentials = a.GetResult(Username,"admin"), a.GetResult(Password,"changeit")
126+
member __.Host = a.GetResult(Host, "localhost")
127+
member __.Credentials = a.GetResult(Username, "admin"), a.GetResult(Password, "changeit")
128128

129129
member __.Retries = a.GetResult(Retries, 1)
130-
member __.Timeout = a.GetResult(Timeout,5.) |> TimeSpan.FromSeconds
131-
member __.HeartbeatTimeout = a.GetResult(HeartbeatTimeout,1.5) |> float |> TimeSpan.FromSeconds
132-
member __.ConcurrentOperationsLimit = a.GetResult(ConcurrentOperationsLimit,5000)
130+
member __.Timeout = a.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
131+
member __.HeartbeatTimeout = a.GetResult(HeartbeatTimeout, 1.5) |> float |> TimeSpan.FromSeconds
132+
member __.ConcurrentOperationsLimit = a.GetResult(ConcurrentOperationsLimit, 5000)
133133

134134
let private connect (log: Serilog.ILogger) (dnsQuery, heartbeatTimeout, col) (username, password) (operationTimeout, operationRetries) =
135135
Connector(username, password, reqTimeout=operationTimeout, reqRetries=operationRetries,

equinox-testbed/Tests.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@ let executeLocal (container: ServiceProvider) test: ClientId -> Async<unit> =
1414
let service = container.GetRequiredService<Services.Domain.Favorites.Service>()
1515
fun clientId -> async {
1616
let sku = mkSkuId ()
17-
do! service.Favorite(clientId,[sku])
17+
do! service.Favorite(clientId, [sku])
1818
let! items = service.List clientId
1919
if items |> Array.exists (fun x -> x.skuId = sku) |> not then invalidOp "Added item not found" }

equinox-web-csharp/Domain/Aggregate.cs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public static Event TryDecode(string et, byte[] json)
3636
}
3737

3838
public static Tuple<string, byte[]> Encode(Event e) => Tuple.Create(e.GetType().Name, Codec.Encode(e));
39+
public static Target For(ClientId id) => Target.NewAggregateId("Aggregate", id.ToString());
3940
}
4041
public class State
4142
{
@@ -94,18 +95,18 @@ public static IEnumerable<Event> Interpret(State s, Command x)
9495

9596
class Handler
9697
{
97-
readonly EquinoxStream<Event, State> _inner;
98+
readonly EquinoxStream<Event, State> _stream;
9899

99100
public Handler(ILogger log, IStream<Event, State> stream) =>
100-
_inner = new EquinoxStream<Event, State>(State.Fold, log, stream);
101+
_stream = new EquinoxStream<Event, State>(State.Fold, log, stream);
101102

102103
/// Execute `command`, syncing any events decided upon
103104
public Task<Unit> Execute(Command c) =>
104-
_inner.Execute(s => Command.Interpret(s, c));
105+
_stream.Execute(s => Command.Interpret(s, c));
105106

106107
/// Establish the present state of the Stream, project from that as specified by `projection`
107108
public Task<T> Query<T>(Func<State, T> projection) =>
108-
_inner.Query(projection);
109+
_stream.Query(projection);
109110
}
110111

111112
public class View
@@ -116,20 +117,18 @@ public class View
116117
public class Service
117118
{
118119
/// Maps a ClientId to Handler for the relevant stream
119-
readonly Func<string, Handler> _stream;
120-
121-
static Target AggregateId(string id) => Target.NewAggregateId("Aggregate", id);
120+
readonly Func<ClientId, Handler> _stream;
122121

123122
public Service(ILogger handlerLog, Func<Target, IStream<Event, State>> resolve) =>
124-
_stream = id => new Handler(handlerLog, resolve(AggregateId(id)));
123+
_stream = id => new Handler(handlerLog, resolve(Event.For(id)));
125124

126125
/// Execute the specified command
127-
public Task<Unit> Execute(string id, Command command) =>
126+
public Task<Unit> Execute(ClientId id, Command command) =>
128127
_stream(id).Execute(command);
129128

130129
/// Read the present state
131130
// TOCONSIDER: you should probably be separating this out per CQRS and reading from a denormalized/cached set of projections
132-
public Task<View> Read(string id) => _stream(id).Query(Render);
131+
public Task<View> Read(ClientId id) => _stream(id).Query(Render);
133132

134133
static View Render(State s) => new View() {Sorted = s.Happened};
135134
}

equinox-web-csharp/Domain/Infrastructure.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ public EquinoxStream(
3131
}
3232

3333
/// Run the decision method, letting it decide whether or not the Command's intent should manifest as Events
34-
public async Task<Unit> Execute(Func<TState,IEnumerable<TEvent>> interpret)
34+
public async Task<Unit> Execute(Func<TState, IEnumerable<TEvent>> interpret)
3535
{
3636
FSharpList<TEvent> decide_(TState state)
3737
{
3838
var a = new Accumulator<TEvent, TState>(FuncConvert.FromFunc(_fold), state);
3939
a.Execute(interpret);
4040
return a.Accumulated;
4141
}
42-
return await FSharpAsync.StartAsTask(Transact(FuncConvert.FromFunc<TState,FSharpList<TEvent>>(decide_)), null, null);
42+
return await FSharpAsync.StartAsTask(Transact(FuncConvert.FromFunc<TState, FSharpList<TEvent>>(decide_)), null, null);
4343
}
4444

4545
/// Execute a command, as Decide(Action) does, but also yield an outcome from the decision
@@ -51,7 +51,7 @@ Tuple<T, FSharpList<TEvent>> decide_(TState state)
5151
var r = decide(a);
5252
return Tuple.Create(r, a.Accumulated);
5353
}
54-
return await FSharpAsync.StartAsTask<T>(Transact(FuncConvert.FromFunc<TState,Tuple<T,FSharpList<TEvent>>>(decide_)), null, null);
54+
return await FSharpAsync.StartAsTask<T>(Transact(FuncConvert.FromFunc<TState, Tuple<T, FSharpList<TEvent>>>(decide_)), null, null);
5555
}
5656

5757
// Project from the synchronized state, without the possibility of adding events that Decide(Func) admits

0 commit comments

Comments
 (0)