Skip to content

Commit 225442e

Browse files
committed
feat: add state module
1 parent 82a9c24 commit 225442e

File tree

13 files changed

+345
-15
lines changed

13 files changed

+345
-15
lines changed

guides/explanations/fork-differences.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,46 @@ end
243243
- Visualize event handler execution in your tracing backend
244244
- Correlate event processing with command dispatch using span links
245245
- Configurable span relationships (`:link`, `:child`, `:none`)
246+
247+
### **Separate State Module for Aggregates**
248+
[PR #49](https://github.com/straw-hat-team/commanded/pull/49)
249+
250+
**Changes:**
251+
- Added `state` option to command router's `dispatch` macro
252+
- Allows specifying a separate module for aggregate state struct
253+
- Useful for protobuf-generated modules or external struct definitions
254+
255+
**Usage:**
256+
```elixir
257+
defmodule MyRouter do
258+
use Commanded.Commands.Router
259+
260+
# BankAccount has execute/2 and apply/2 functions
261+
# BankAccountState defines the state struct (e.g., protobuf-generated)
262+
dispatch [OpenAccount, DepositMoney],
263+
to: BankAccount,
264+
state: BankAccountState,
265+
identity: :account_number
266+
end
267+
268+
# State module (could be protobuf-generated)
269+
defmodule BankAccountState do
270+
defstruct [:account_number, :balance, :status]
271+
end
272+
273+
# Aggregate module (behavior only, no struct)
274+
defmodule BankAccount do
275+
def execute(%BankAccountState{status: nil}, %OpenAccount{} = cmd) do
276+
%AccountOpened{account_number: cmd.account_number}
277+
end
278+
279+
def apply(%BankAccountState{} = state, %AccountOpened{} = event) do
280+
%BankAccountState{state | account_number: event.account_number, status: :open}
281+
end
282+
end
283+
```
284+
285+
**Benefits:**
286+
- Decouples aggregate behavior from state representation
287+
- Enables use of protobuf-generated structs as aggregate state
288+
- Backwards compatible - if `state` option is omitted, aggregate module is used for both behavior and state (existing behavior)

lib/application.ex

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,18 +214,26 @@ defmodule Commanded.Application do
214214
Retrieving aggregate state is done by calling to the opened aggregate,
215215
or querying the event store for an optional state snapshot
216216
and then replaying the aggregate's event stream.
217+
218+
## Options
219+
220+
- `:timeout` - timeout in milliseconds (default: 5000)
221+
- `:state_module` - module to use for initial state struct when rebuilding
222+
from events. Defaults to `aggregate_module`. Use this when the aggregate
223+
uses a separate state module.
224+
217225
"""
218226
@spec aggregate_state(
219227
aggregate_module :: module(),
220228
aggregate_uuid :: Aggregate.uuid(),
221-
timeout :: integer
229+
timeout_or_opts :: integer | Keyword.t()
222230
) :: Aggregate.state()
223-
def aggregate_state(aggregate_module, aggregate_uuid, timeout \\ 5000) do
231+
def aggregate_state(aggregate_module, aggregate_uuid, timeout_or_opts \\ 5000) do
224232
Aggregate.aggregate_state(
225233
__MODULE__,
226234
aggregate_module,
227235
aggregate_uuid,
228-
timeout
236+
timeout_or_opts
229237
)
230238
end
231239

lib/commanded.ex

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,27 @@ defmodule Commanded do
3838
Retrieving aggregate state is done by calling to the opened aggregate,
3939
or querying the event store for an optional state snapshot
4040
and then replaying the aggregate's event stream.
41+
42+
## Options
43+
44+
- `:timeout` - timeout in milliseconds (default: 5000)
45+
- `:state_module` - module to use for initial state struct when rebuilding
46+
from events. Defaults to `aggregate_module`. Use this when the aggregate
47+
uses a separate state module.
48+
4149
"""
4250
@spec aggregate_state(
4351
application :: Commanded.Application.t(),
4452
aggregate_module :: module(),
4553
aggregate_uuid :: Aggregate.uuid(),
46-
timeout :: integer
54+
timeout_or_opts :: integer | Keyword.t()
4755
) :: Aggregate.state()
48-
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout \\ 5_000) do
56+
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout_or_opts \\ 5_000) do
4957
Aggregate.aggregate_state(
5058
application,
5159
aggregate_module,
5260
aggregate_uuid,
53-
timeout
61+
timeout_or_opts
5462
)
5563
end
5664
end

lib/commanded/aggregates/aggregate.ex

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ defmodule Commanded.Aggregates.Aggregate do
147147
defstruct [
148148
:application,
149149
:aggregate_module,
150+
:state_module,
150151
:aggregate_uuid,
151152
:aggregate_state,
152153
:snapshotting,
@@ -160,6 +161,7 @@ defmodule Commanded.Aggregates.Aggregate do
160161

161162
aggregate_module = Keyword.fetch!(aggregate_opts, :aggregate_module)
162163
aggregate_uuid = Keyword.fetch!(aggregate_opts, :aggregate_uuid)
164+
state_module = Keyword.get(aggregate_opts, :state_module) || aggregate_module
163165

164166
unless is_atom(aggregate_module),
165167
do: raise(ArgumentError, message: "aggregate module must be an atom")
@@ -174,6 +176,7 @@ defmodule Commanded.Aggregates.Aggregate do
174176
state = %Aggregate{
175177
application: application,
176178
aggregate_module: aggregate_module,
179+
state_module: state_module,
177180
aggregate_uuid: aggregate_uuid,
178181
snapshotting: Snapshotting.new(application, aggregate_uuid, snapshot_options)
179182
}
@@ -230,7 +233,16 @@ defmodule Commanded.Aggregates.Aggregate do
230233
end
231234

232235
@doc false
233-
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout \\ 5_000) do
236+
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout_or_opts \\ 5_000)
237+
238+
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout)
239+
when is_integer(timeout) or timeout == :infinity do
240+
aggregate_state(application, aggregate_module, aggregate_uuid, timeout: timeout)
241+
end
242+
243+
def aggregate_state(application, aggregate_module, aggregate_uuid, opts) when is_list(opts) do
244+
timeout = Keyword.get(opts, :timeout, 5_000)
245+
state_module = Keyword.get(opts, :state_module, aggregate_module)
234246
name = via_name(application, aggregate_module, aggregate_uuid)
235247

236248
try do
@@ -249,6 +261,7 @@ defmodule Commanded.Aggregates.Aggregate do
249261
%Aggregate{
250262
application: application,
251263
aggregate_module: aggregate_module,
264+
state_module: state_module,
252265
aggregate_uuid: aggregate_uuid,
253266
snapshotting: Snapshotting.new(application, aggregate_uuid, snapshot_options)
254267
}

lib/commanded/aggregates/aggregate_state_builder.ex

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do
4040
If the snapshot exists, fetch any subsequent events to rebuild its state.
4141
Otherwise start with the aggregate struct and stream all existing events for
4242
the aggregate from the event store to rebuild its state from those events.
43+
44+
The `state_module` field determines which module is used to create the initial
45+
empty state struct.
4346
"""
4447
def populate(%Aggregate{} = state) do
45-
%Aggregate{aggregate_module: aggregate_module, snapshotting: snapshotting} = state
48+
%Aggregate{state_module: state_module, snapshotting: snapshotting} = state
4649

4750
aggregate =
4851
case Snapshotting.read_snapshot(snapshotting) do
@@ -55,7 +58,7 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do
5558

5659
{:error, _error} ->
5760
# No snapshot present, or exists but for outdated state, so use initial empty state
58-
%Aggregate{state | aggregate_version: 0, aggregate_state: struct(aggregate_module)}
61+
%Aggregate{state | aggregate_version: 0, aggregate_state: struct(state_module)}
5962
end
6063

6164
rebuild_from_events(aggregate)

lib/commanded/aggregates/supervisor.ex

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,16 @@ defmodule Commanded.Aggregates.Supervisor do
2323
2424
Returns `{:ok, aggregate_uuid}` when a process is successfully started, or is
2525
already running.
26+
27+
## Options
28+
29+
- `:state_module` - optional module to use for the aggregate's state struct.
30+
If not provided, defaults to the aggregate module itself.
31+
2632
"""
27-
def open_aggregate(application, aggregate_module, aggregate_uuid)
33+
def open_aggregate(application, aggregate_module, aggregate_uuid, opts \\ [])
34+
35+
def open_aggregate(application, aggregate_module, aggregate_uuid, opts)
2836
when is_atom(application) and is_atom(aggregate_module) and is_binary(aggregate_uuid) do
2937
Logger.debug(fn ->
3038
"Locating aggregate process for `#{inspect(aggregate_module)}` with UUID " <>
@@ -34,10 +42,13 @@ defmodule Commanded.Aggregates.Supervisor do
3442
supervisor_name = Module.concat([application, __MODULE__])
3543
aggregate_name = Aggregate.name(application, aggregate_module, aggregate_uuid)
3644

45+
state_module = Keyword.get(opts, :state_module)
46+
3747
args = [
3848
application: application,
3949
aggregate_module: aggregate_module,
40-
aggregate_uuid: aggregate_uuid
50+
aggregate_uuid: aggregate_uuid,
51+
state_module: state_module
4152
]
4253

4354
case Registration.start_child(application, aggregate_name, supervisor_name, {Aggregate, args}) do
@@ -55,7 +66,7 @@ defmodule Commanded.Aggregates.Supervisor do
5566
end
5667
end
5768

58-
def open_aggregate(_application, _aggregate_module, aggregate_uuid),
69+
def open_aggregate(_application, _aggregate_module, aggregate_uuid, _opts),
5970
do: {:error, {:unsupported_aggregate_identity_type, aggregate_uuid}}
6071

6172
def init(args) do

lib/commanded/commands/dispatcher.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ defmodule Commanded.Commands.Dispatcher do
2424
:handler_function,
2525
:handler_before_execute,
2626
:aggregate_module,
27+
:state_module,
2728
:identity,
2829
:identity_prefix,
2930
:timeout,
@@ -83,13 +84,16 @@ defmodule Commanded.Commands.Dispatcher do
8384
@dialyzer {:nowarn_function, execute: 3}
8485
defp execute(%Pipeline{} = pipeline, %Payload{} = payload, %ExecutionContext{} = context) do
8586
%Pipeline{application: application, assigns: %{aggregate_uuid: aggregate_uuid}} = pipeline
86-
%Payload{aggregate_module: aggregate_module, timeout: timeout} = payload
87+
88+
%Payload{aggregate_module: aggregate_module, state_module: state_module, timeout: timeout} =
89+
payload
8790

8891
{:ok, ^aggregate_uuid} =
8992
Commanded.Aggregates.Supervisor.open_aggregate(
9093
application,
9194
aggregate_module,
92-
aggregate_uuid
95+
aggregate_uuid,
96+
state_module: state_module
9397
)
9498

9599
task_dispatcher_name = Module.concat([application, Commanded.Commands.TaskDispatcher])

lib/commanded/commands/router.ex

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,34 @@ defmodule Commanded.Commands.Router do
7373
dispatch OpenAccount, to: BankAccount, function: :open_account, identity: :account_number
7474
end
7575
76+
## Separate state module
77+
78+
By default, the aggregate module is expected to define a struct that represents
79+
the aggregate's state. However, you can specify a separate module to use for the
80+
state struct using the `state` option. This is useful when you want to use a
81+
different struct (such as a protobuf-generated module) as the aggregate's state.
82+
83+
### Example
84+
85+
defmodule BankRouter do
86+
use Commanded.Commands.Router
87+
88+
# BankAccount module has execute/2 and apply/2 functions
89+
# BankAccountState module defines the state struct
90+
dispatch OpenAccount,
91+
to: BankAccount,
92+
state: BankAccountState,
93+
identity: :account_number
94+
end
95+
96+
When using a separate state module:
97+
98+
- The `state` module must define a struct (i.e., use `defstruct`)
99+
- The aggregate module's `execute/2` and `apply/2` functions receive and
100+
return the state module's struct
101+
- If `state` is not specified, the aggregate module is used for both
102+
behavior and state (the default behavior)
103+
76104
## Define aggregate identity
77105
78106
You can define the identity field for an aggregate once using the `identify` macro.
@@ -506,6 +534,7 @@ defmodule Commanded.Commands.Router do
506534

507535
for {command_module, command_opts} <- @registered_commands do
508536
@aggregate Keyword.fetch!(command_opts, :aggregate)
537+
@state Keyword.get(command_opts, :state, @aggregate)
509538
@handler Keyword.fetch!(command_opts, :to)
510539
@function Keyword.fetch!(command_opts, :function)
511540
@before_execute Keyword.get(command_opts, :before_execute)
@@ -576,6 +605,7 @@ defmodule Commanded.Commands.Router do
576605
handler_function: @function,
577606
handler_before_execute: @before_execute,
578607
aggregate_module: @aggregate,
608+
state_module: @state,
579609
identity: identity,
580610
identity_prefix: identity_prefix,
581611
returning: returning,
@@ -659,6 +689,7 @@ defmodule Commanded.Commands.Router do
659689
:function,
660690
:before_execute,
661691
:aggregate,
692+
:state,
662693
:identity,
663694
:identity_prefix,
664695
:timeout,

test/commands/routing_commands_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ defmodule Commanded.Commands.RoutingCommandsTest do
226226
assert_raise RuntimeError,
227227
"""
228228
unexpected dispatch parameter "id"
229-
available params are: to, function, before_execute, aggregate, identity, identity_prefix, timeout, lifespan, consistency
229+
available params are: to, function, before_execute, aggregate, state, identity, identity_prefix, timeout, lifespan, consistency
230230
""",
231231
fn ->
232232
Code.eval_string("""

0 commit comments

Comments
 (0)