Skip to content

Commit fbc4761

Browse files
committed
domain event handling across modules
1 parent 407b476 commit fbc4761

File tree

7 files changed

+426
-16
lines changed

7 files changed

+426
-16
lines changed

src/seedwork/application/command_handlers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,14 @@ def failure(cls, message="Failure", exception=None) -> "CommandResult":
2828
return result
2929

3030
@classmethod
31-
def success(cls, entity_id=None, payload=None, events=[]) -> "CommandResult":
31+
def success(
32+
cls, entity_id=None, payload=None, event=None, events=None
33+
) -> "CommandResult":
3234
"""Creates a successful result"""
35+
if events is None:
36+
events = []
37+
if event:
38+
events.append(event)
3339
return cls(entity_id=entity_id, payload=payload, events=events)
3440

3541

src/seedwork/application/event_dispatcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self):
2323
def add_event_handler(self, event_class: type[Event], event_handler: callable):
2424
self._handlers[event_class].add(event_handler)
2525

26-
def dispatch(self, event: type[Event]):
26+
def dispatch(self, event: type[Event], sender: any = None):
2727
event_class = type(event)
2828
for event_handler in self._handlers[event_class]:
29-
event_handler(event)
29+
event_handler(event, sender)

src/seedwork/application/modules.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def unit_of_work(self, **kwargs):
9898
engine = get_arg("engine", kwargs, self.init_kwargs)
9999
correlation_id = uuid.uuid4()
100100
with Session(engine) as db_session:
101-
uow = self.create_unit_of_work(correlation_id, db_session)
101+
uow = self.create_unit_of_work(correlation_id, db_session, kwargs)
102102
self.configure_unit_of_work(uow)
103103
request_context.correlation_id.set(correlation_id)
104104
self._uow.set(uow)
@@ -108,29 +108,27 @@ def unit_of_work(self, **kwargs):
108108
self._uow.set(None)
109109
request_context.correlation_id.set(None)
110110

111-
def create_unit_of_work(self, correlation_id, db_session):
111+
def create_unit_of_work(self, correlation_id, db_session, kwargs):
112112
"""Unit of Work factory, creates new unit of work"""
113113
uow = self.unit_of_work_class(
114114
module=self,
115115
correlation_id=correlation_id,
116116
db_session=db_session,
117117
**self.get_unit_of_work_init_kwargs(),
118+
**kwargs,
118119
)
119120
return uow
120121

121122
def get_unit_of_work_init_kwargs(self):
122123
"""Returns additional kwargs used for initialization of new Unit of Work"""
123-
return dict()
124+
return self.init_kwargs
124125

125126
def configure_unit_of_work(self, uow):
126127
"""Allows to alter Unit of Work (i.e. add extra attributes) after it is instantiated"""
127128

128129
def end_unit_of_work(self, uow):
129130
uow.db_session.commit()
130131

131-
def configure(self, **kwargs):
132-
self.init_kwargs = kwargs
133-
134132
def execute_command(self, command):
135133
"""Module entrypoint. Use it to change the state of the module by passing a command object"""
136134
command_class = type(command)
@@ -160,26 +158,43 @@ def execute_query(self, query):
160158
def uow(self) -> UnitOfWork:
161159
"""Get current unit of work. Use self.unit_of_work() to create a new instance of UoW"""
162160
uow = self._uow.get()
163-
assert uow, "Unit of work not set, use context manager"
161+
assert uow, f"Unit of work not set in {self}, use context manager"
164162
return uow
165163

166164
def resolve_handler_kwargs(self, kwarg_params) -> dict:
167165
"""Match kwargs required by a function to attributes available in a unit of work"""
168166
kwargs = {}
169167
for param_name, param_type in kwarg_params.items():
170-
for attr in self.uow.__dict__.values():
171-
if isinstance(attr, param_type):
172-
kwargs[param_name] = attr
168+
for attr_name, attr_value in self.uow.__dict__.items():
169+
if attr_name == param_name or isinstance(attr_value, param_type):
170+
kwargs[param_name] = attr_value
173171
return kwargs
174172

175173
def publish_domain_events(self, events):
176-
...
174+
for event in events:
175+
self._domain_event_dispatcher.dispatch(event=event, sender=self)
177176

178-
def handle_domain_event(self, event: type[DomainEvent]):
177+
def handle_domain_event(
178+
self, event: type[DomainEvent], sender: type["BusinessModule"]
179+
):
179180
"""Execute all registered handlers within this module for this event type"""
181+
event_was_sent_by_other_module = self is not sender
182+
183+
if event_was_sent_by_other_module:
184+
# The sender executed the command that resulted in an event being published.
185+
# as a rule of thumb we want to handle the domain event within same transaction,
186+
# thus within same Unit of Work.
187+
# Therefore, the receiver must use UoW of sender
188+
original_uow = self._uow.get()
189+
self._uow.set(sender.uow)
190+
180191
for handler in self.event_handlers:
181192
event_class, handler_parameters = self.registry.inspect_handler_parameters(
182193
handler
183194
)
184195
if event_class is type(event):
185196
handler(event, self)
197+
198+
if event_was_sent_by_other_module:
199+
# restore the original UoW
200+
self._uow.set(original_uow)

src/seedwork/application/registry.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,11 @@ def domain_event_handler(self, fn: Callable):
131131

132132
@functools.wraps(fn)
133133
def decorator(*args, **kwargs):
134-
raise not NotImplementedError()
134+
event = find_object_of_class(args, DomainEvent) or find_object_of_class(
135+
kwargs.items(), DomainEvent
136+
)
137+
print("handling event", f"{type(event).__module__}.{type(event).__name__}")
138+
return fn(*args, **kwargs)
135139

136140
domain_event_class, handler_parameters = self.inspect_handler_parameters(fn)
137141
assert issubclass(
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
from dataclasses import dataclass
2+
3+
import pytest
4+
5+
from seedwork.application.command_handlers import CommandResult
6+
from seedwork.application.commands import Command
7+
from seedwork.application.event_dispatcher import InMemoryEventDispatcher
8+
from seedwork.application.modules import BusinessModule, UnitOfWork
9+
from seedwork.application.registry import Registry
10+
from seedwork.domain.events import DomainEvent
11+
12+
registry = Registry()
13+
14+
15+
@dataclass
16+
class CompleteOrderCommand(Command):
17+
order_id: str
18+
19+
20+
@dataclass
21+
class ProcessPaymentCommand(Command):
22+
order_id: str
23+
24+
25+
@dataclass
26+
class ShipOrderCommand(Command):
27+
order_id: str
28+
29+
30+
class OrderCompletedEvent(DomainEvent):
31+
order_id: str
32+
33+
34+
class PaymentProcessedEvent(DomainEvent):
35+
order_id: str
36+
37+
38+
class OrderShippedEvent(DomainEvent):
39+
order_id: str
40+
41+
42+
@registry.command_handler
43+
def complete_order(command: CompleteOrderCommand, history):
44+
history.append(f"completing {command.order_id}")
45+
return CommandResult.success(
46+
payload=None, event=OrderCompletedEvent(order_id=command.order_id)
47+
)
48+
49+
50+
@registry.command_handler
51+
def process_payment(command: ProcessPaymentCommand, history):
52+
history.append(f"processing payment for {command.order_id}")
53+
return CommandResult.success(
54+
payload=None, event=PaymentProcessedEvent(order_id=command.order_id)
55+
)
56+
57+
58+
@registry.command_handler
59+
def ship_order(command: ShipOrderCommand, history):
60+
history.append(f"shipping {command.order_id}")
61+
return CommandResult.success(
62+
payload=None, event=OrderShippedEvent(order_id=command.order_id)
63+
)
64+
65+
66+
@registry.domain_event_handler
67+
def when_order_is_completed_process_payment_policy(
68+
event: OrderCompletedEvent, module: type[BusinessModule]
69+
):
70+
module.uow.history.append(
71+
f"starting when_order_is_completed_process_payment_policy for {event.order_id}"
72+
)
73+
module.execute_command(ProcessPaymentCommand(order_id=event.order_id))
74+
75+
76+
@registry.domain_event_handler
77+
def when_order_is_completed_ship_order_policy(
78+
event: OrderCompletedEvent, module: type[BusinessModule]
79+
):
80+
module.uow.history.append(
81+
f"starting when_order_is_completed_ship_order_policy for {event.order_id}"
82+
)
83+
module.execute_command(ShipOrderCommand(order_id=event.order_id))
84+
85+
86+
@registry.domain_event_handler
87+
def when_payment_is_processed_open_champagne_policy(
88+
event: PaymentProcessedEvent, module: type[BusinessModule]
89+
):
90+
module.uow.history.append(
91+
f"starting when_payment_is_processed_open_champagne_policy for {event.order_id}"
92+
)
93+
94+
95+
@registry.domain_event_handler
96+
def when_order_is_shipped_sit_and_relax_policy(
97+
event: OrderShippedEvent, module: type[BusinessModule]
98+
):
99+
module.uow.history.append(
100+
f"starting when_order_is_shipped_sit_and_relax_policy for {event.order_id}"
101+
)
102+
103+
104+
@dataclass
105+
class MonoUnitOfWork(UnitOfWork):
106+
history: list
107+
108+
109+
class MonoModule(BusinessModule):
110+
registry = registry
111+
unit_of_work_class = MonoUnitOfWork
112+
supported_commands = (CompleteOrderCommand, ProcessPaymentCommand, ShipOrderCommand)
113+
supported_queries = ()
114+
event_handlers = (
115+
when_order_is_completed_process_payment_policy,
116+
when_order_is_completed_ship_order_policy,
117+
when_payment_is_processed_open_champagne_policy,
118+
when_order_is_shipped_sit_and_relax_policy,
119+
)
120+
121+
122+
@pytest.mark.integration
123+
def test_mono_module_command_branching_flow():
124+
"""This tests the branching code flow:
125+
CompleteOrderCommand
126+
127+
OrderCompletedEvent
128+
↓ ↓
129+
when_order_is_completed_process_payment_policy when_order_is_completed_ship_order_policy
130+
↓ ↓
131+
ProcessPaymentCommand ShipOrderCommand
132+
↓ ↓
133+
PaymentProcessedEvent OrderShippedEvent
134+
↓ ↓
135+
when_payment_is_processed_ship_order_policy when_order_is_shipped_sit_and_relax_policy
136+
"""
137+
history = []
138+
dispatcher = InMemoryEventDispatcher()
139+
mono_module = MonoModule(domain_event_dispatcher=dispatcher, history=history)
140+
141+
with mono_module.unit_of_work():
142+
mono_module.execute_command(CompleteOrderCommand(order_id="order1"))
143+
144+
assert history == [
145+
"completing order1",
146+
"starting when_order_is_completed_process_payment_policy for order1",
147+
"processing payment for order1",
148+
"starting when_payment_is_processed_open_champagne_policy for order1",
149+
"starting when_order_is_completed_ship_order_policy for order1",
150+
"shipping order1",
151+
"starting when_order_is_shipped_sit_and_relax_policy for order1",
152+
]

0 commit comments

Comments
 (0)