Skip to content

Commit 37b9a69

Browse files
committed
feat: add initial state module
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 82a9c24 commit 37b9a69

File tree

13 files changed

+438
-15
lines changed

13 files changed

+438
-15
lines changed

guides/explanations/fork-differences.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,82 @@ end
243243
- Visualize event handler execution in your tracing backend
244244
- Correlate event processing with command dispatch using span links
245245
- Configurable span relationships (`:link`, `:child`, `:none`)
246+
247+
### **Custom Initial State for Aggregates**
248+
[PR #49](https://github.com/straw-hat-team/commanded/pull/49)
249+
250+
**Changes:**
251+
- Added `initial_state` option to command router's `dispatch` macro
252+
- Allows specifying a module that implements `initial_state/0` callback
253+
- Useful for protobuf-generated modules or custom default values
254+
255+
**Usage:**
256+
257+
```elixir
258+
# State module with initial_state/0 callback
259+
defmodule BankAccountState do
260+
defstruct [:account_number, :balance, status: :uninitialized]
261+
262+
def initial_state, do: %__MODULE__{}
263+
end
264+
265+
# Aggregate module (behavior only, no struct)
266+
defmodule BankAccount do
267+
def execute(%BankAccountState{status: :uninitialized}, %OpenAccount{} = cmd) do
268+
%AccountOpened{account_number: cmd.account_number}
269+
end
270+
271+
def apply(%BankAccountState{} = state, %AccountOpened{} = event) do
272+
%BankAccountState{state | account_number: event.account_number, status: :open}
273+
end
274+
end
275+
276+
defmodule MyRouter do
277+
use Commanded.Commands.Router
278+
279+
# Calls BankAccountState.initial_state/0 to create initial state
280+
dispatch [OpenAccount, DepositMoney],
281+
to: BankAccount,
282+
initial_state: BankAccountState,
283+
identity: :account_number
284+
end
285+
```
286+
287+
**Benefits:**
288+
- Decouples aggregate behavior from state representation
289+
- State module controls its own initialization (like `to:` pattern)
290+
- Enables use of protobuf-generated structs with custom default values
291+
- Handles protobuf zero-value defaults naturally (strings default to `""`, not `nil`)
292+
- Backwards compatible - if `initial_state` is omitted, `struct(AggregateModule)` is used
293+
294+
**Rationale:**
295+
296+
In the upstream Commanded, the aggregate module serves dual purposes: it defines both the state struct and the behavior (`execute/2` and `apply/2` functions). This coupling becomes problematic when you want to use protobuf-generated modules for state.
297+
298+
Protobuf modules are code-generated and shouldn't be manually modified—any changes would be overwritten on regeneration. To add aggregate behavior to a protobuf struct, you'd need to write custom protobuf extensions or use workarounds, adding complexity to your build pipeline.
299+
300+
By separating the aggregate (behavior) from the state (data structure), you can:
301+
302+
```elixir
303+
# Generated by protobuf - don't modify
304+
defmodule MyApp.Proto.BankAccountState do
305+
use Protobuf, syntax: :proto3
306+
# ... generated fields ...
307+
end
308+
309+
# Your code - aggregate behavior with initial_state returning the protobuf struct
310+
defmodule MyApp.BankAccount do
311+
alias MyApp.Proto.BankAccountState
312+
313+
def initial_state, do: %BankAccountState{status: :STATUS_UNINITIALIZED}
314+
315+
def execute(%BankAccountState{} = state, %OpenAccount{} = cmd), do: ...
316+
def apply(%BankAccountState{} = state, %AccountOpened{} = event), do: ...
317+
end
318+
```
319+
320+
**Why a callback instead of `struct/1`?**
321+
322+
Elixir's `struct(Module)` initializes all fields to `nil`, but protobuf has different default semantics—strings default to `""`, integers to `0`, booleans to `false`, etc. Using `%BankAccountState{}` (the struct literal syntax) invokes protobuf's generated `new/0` which applies the correct proto3 default values. The `initial_state/0` callback gives you control over this initialization rather than relying on `struct/1`.
323+
324+
This separation follows the principle that data representation and business logic are distinct concerns that benefit from being in separate modules. It also aligns more closely with the [Functional Decider Pattern](https://thinkbeforecoding.com/post/2021/12/17/functional-event-sourcing-decider), where the aggregate is a set of pure functions (`execute`, `apply`, `initial_state`) operating on state, rather than a stateful object that owns its data structure.

lib/application.ex

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,18 +214,25 @@ defmodule Commanded.Application do
214214
Retrieving aggregate state is done by calling to the opened aggregate,
215215
or querying the event store for an optional state snapshot
216216
and then replaying the aggregate's event stream.
217+
218+
## Options
219+
220+
- `:timeout` - timeout in milliseconds (default: 5000)
221+
- `:initial_state` - module that implements `initial_state/0` callback.
222+
Used when rebuilding state from events. Defaults to `aggregate_module`.
223+
217224
"""
218225
@spec aggregate_state(
219226
aggregate_module :: module(),
220227
aggregate_uuid :: Aggregate.uuid(),
221-
timeout :: integer
228+
timeout_or_opts :: integer | Keyword.t()
222229
) :: Aggregate.state()
223-
def aggregate_state(aggregate_module, aggregate_uuid, timeout \\ 5000) do
230+
def aggregate_state(aggregate_module, aggregate_uuid, timeout_or_opts \\ 5000) do
224231
Aggregate.aggregate_state(
225232
__MODULE__,
226233
aggregate_module,
227234
aggregate_uuid,
228-
timeout
235+
timeout_or_opts
229236
)
230237
end
231238

lib/commanded.ex

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,26 @@ defmodule Commanded do
3838
Retrieving aggregate state is done by calling to the opened aggregate,
3939
or querying the event store for an optional state snapshot
4040
and then replaying the aggregate's event stream.
41+
42+
## Options
43+
44+
- `:timeout` - timeout in milliseconds (default: 5000)
45+
- `:initial_state` - module that implements `initial_state/0` callback.
46+
Used when rebuilding state from events. Defaults to `aggregate_module`.
47+
4148
"""
4249
@spec aggregate_state(
4350
application :: Commanded.Application.t(),
4451
aggregate_module :: module(),
4552
aggregate_uuid :: Aggregate.uuid(),
46-
timeout :: integer
53+
timeout_or_opts :: integer | Keyword.t()
4754
) :: Aggregate.state()
48-
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout \\ 5_000) do
55+
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout_or_opts \\ 5_000) do
4956
Aggregate.aggregate_state(
5057
application,
5158
aggregate_module,
5259
aggregate_uuid,
53-
timeout
60+
timeout_or_opts
5461
)
5562
end
5663
end

lib/commanded/aggregates/aggregate.ex

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ defmodule Commanded.Aggregates.Aggregate do
147147
defstruct [
148148
:application,
149149
:aggregate_module,
150+
:initial_state,
150151
:aggregate_uuid,
151152
:aggregate_state,
152153
:snapshotting,
@@ -160,6 +161,7 @@ defmodule Commanded.Aggregates.Aggregate do
160161

161162
aggregate_module = Keyword.fetch!(aggregate_opts, :aggregate_module)
162163
aggregate_uuid = Keyword.fetch!(aggregate_opts, :aggregate_uuid)
164+
initial_state = Keyword.get(aggregate_opts, :initial_state)
163165

164166
unless is_atom(aggregate_module),
165167
do: raise(ArgumentError, message: "aggregate module must be an atom")
@@ -174,6 +176,7 @@ defmodule Commanded.Aggregates.Aggregate do
174176
state = %Aggregate{
175177
application: application,
176178
aggregate_module: aggregate_module,
179+
initial_state: initial_state,
177180
aggregate_uuid: aggregate_uuid,
178181
snapshotting: Snapshotting.new(application, aggregate_uuid, snapshot_options)
179182
}
@@ -230,7 +233,16 @@ defmodule Commanded.Aggregates.Aggregate do
230233
end
231234

232235
@doc false
233-
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout \\ 5_000) do
236+
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout_or_opts \\ 5_000)
237+
238+
def aggregate_state(application, aggregate_module, aggregate_uuid, timeout)
239+
when is_integer(timeout) or timeout == :infinity do
240+
aggregate_state(application, aggregate_module, aggregate_uuid, timeout: timeout)
241+
end
242+
243+
def aggregate_state(application, aggregate_module, aggregate_uuid, opts) when is_list(opts) do
244+
timeout = Keyword.get(opts, :timeout, 5_000)
245+
initial_state = Keyword.get(opts, :initial_state)
234246
name = via_name(application, aggregate_module, aggregate_uuid)
235247

236248
try do
@@ -249,6 +261,7 @@ defmodule Commanded.Aggregates.Aggregate do
249261
%Aggregate{
250262
application: application,
251263
aggregate_module: aggregate_module,
264+
initial_state: initial_state,
252265
aggregate_uuid: aggregate_uuid,
253266
snapshotting: Snapshotting.new(application, aggregate_uuid, snapshot_options)
254267
}
@@ -260,6 +273,9 @@ defmodule Commanded.Aggregates.Aggregate do
260273
{:ok, result} ->
261274
result
262275

276+
{:exit, reason} ->
277+
exit(reason)
278+
263279
nil ->
264280
exit({:timeout, {GenServer, :call, [name, :aggregate_state, timeout]}})
265281
end

lib/commanded/aggregates/aggregate_state_builder.ex

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do
4040
If the snapshot exists, fetch any subsequent events to rebuild its state.
4141
Otherwise start with the aggregate struct and stream all existing events for
4242
the aggregate from the event store to rebuild its state from those events.
43+
44+
The initial state is determined by the `initial_state` field:
45+
- If a module is provided, `initial_state.initial_state()` is called
46+
- Otherwise, falls back to `struct(aggregate_module)`
4347
"""
4448
def populate(%Aggregate{} = state) do
45-
%Aggregate{aggregate_module: aggregate_module, snapshotting: snapshotting} = state
49+
%Aggregate{snapshotting: snapshotting} = state
4650

4751
aggregate =
4852
case Snapshotting.read_snapshot(snapshotting) do
@@ -55,12 +59,21 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do
5559

5660
{:error, _error} ->
5761
# No snapshot present, or exists but for outdated state, so use initial empty state
58-
%Aggregate{state | aggregate_version: 0, aggregate_state: struct(aggregate_module)}
62+
%Aggregate{state | aggregate_version: 0, aggregate_state: create_initial_state(state)}
5963
end
6064

6165
rebuild_from_events(aggregate)
6266
end
6367

68+
defp create_initial_state(%Aggregate{initial_state: nil, aggregate_module: aggregate_module}) do
69+
struct(aggregate_module)
70+
end
71+
72+
defp create_initial_state(%Aggregate{initial_state: initial_state})
73+
when is_atom(initial_state) do
74+
initial_state.initial_state()
75+
end
76+
6477
@doc """
6578
Load events from the event store, in batches, to rebuild the aggregate state
6679
"""

lib/commanded/aggregates/supervisor.ex

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,24 @@ defmodule Commanded.Aggregates.Supervisor do
2323
2424
Returns `{:ok, aggregate_uuid}` when a process is successfully started, or is
2525
already running.
26+
27+
## Options
28+
29+
- `:initial_state` - optional module that implements `initial_state/0` callback.
30+
If not provided, defaults to `aggregate_module`.
31+
2632
"""
27-
def open_aggregate(application, aggregate_module, aggregate_uuid)
33+
@type open_aggregate_opt :: {:initial_state, module()}
34+
35+
@spec open_aggregate(
36+
application :: module(),
37+
aggregate_module :: module(),
38+
aggregate_uuid :: String.t(),
39+
opts :: [open_aggregate_opt()]
40+
) :: {:ok, String.t()} | {:error, term()}
41+
def open_aggregate(application, aggregate_module, aggregate_uuid, opts \\ [])
42+
43+
def open_aggregate(application, aggregate_module, aggregate_uuid, opts)
2844
when is_atom(application) and is_atom(aggregate_module) and is_binary(aggregate_uuid) do
2945
Logger.debug(fn ->
3046
"Locating aggregate process for `#{inspect(aggregate_module)}` with UUID " <>
@@ -34,10 +50,13 @@ defmodule Commanded.Aggregates.Supervisor do
3450
supervisor_name = Module.concat([application, __MODULE__])
3551
aggregate_name = Aggregate.name(application, aggregate_module, aggregate_uuid)
3652

53+
initial_state = Keyword.get(opts, :initial_state)
54+
3755
args = [
3856
application: application,
3957
aggregate_module: aggregate_module,
40-
aggregate_uuid: aggregate_uuid
58+
aggregate_uuid: aggregate_uuid,
59+
initial_state: initial_state
4160
]
4261

4362
case Registration.start_child(application, aggregate_name, supervisor_name, {Aggregate, args}) do
@@ -55,7 +74,7 @@ defmodule Commanded.Aggregates.Supervisor do
5574
end
5675
end
5776

58-
def open_aggregate(_application, _aggregate_module, aggregate_uuid),
77+
def open_aggregate(_application, _aggregate_module, aggregate_uuid, _opts),
5978
do: {:error, {:unsupported_aggregate_identity_type, aggregate_uuid}}
6079

6180
def init(args) do

lib/commanded/commands/dispatcher.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ defmodule Commanded.Commands.Dispatcher do
2424
:handler_function,
2525
:handler_before_execute,
2626
:aggregate_module,
27+
:initial_state,
2728
:identity,
2829
:identity_prefix,
2930
:timeout,
@@ -83,13 +84,16 @@ defmodule Commanded.Commands.Dispatcher do
8384
@dialyzer {:nowarn_function, execute: 3}
8485
defp execute(%Pipeline{} = pipeline, %Payload{} = payload, %ExecutionContext{} = context) do
8586
%Pipeline{application: application, assigns: %{aggregate_uuid: aggregate_uuid}} = pipeline
86-
%Payload{aggregate_module: aggregate_module, timeout: timeout} = payload
87+
88+
%Payload{aggregate_module: aggregate_module, initial_state: initial_state, timeout: timeout} =
89+
payload
8790

8891
{:ok, ^aggregate_uuid} =
8992
Commanded.Aggregates.Supervisor.open_aggregate(
9093
application,
9194
aggregate_module,
92-
aggregate_uuid
95+
aggregate_uuid,
96+
initial_state: initial_state
9397
)
9498

9599
task_dispatcher_name = Module.concat([application, Commanded.Commands.TaskDispatcher])

0 commit comments

Comments
 (0)