Skip to content

Commit 407b476

Browse files
committed
add event dispatcher and module event handling
1 parent 981e672 commit 407b476

File tree

13 files changed

+363
-143
lines changed

13 files changed

+363
-143
lines changed

src/api/tests/test_catalog.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ def test_catalog_list_with_two_items(api, api_client):
7676
assert len(response_data) == 2
7777

7878

79+
# @pytest.mark.integration
80+
# def test_catalog_create_draft(api, api_client):
81+
# response = api_client.post("/catalog")
82+
# assert False
83+
84+
85+
def test_catalog_create_draft_fails_due_to_incomplete_data(api, api_client):
86+
response = api_client.post("/catalog")
87+
assert response.status_code == 422
88+
89+
7990
@pytest.mark.integration
8091
def test_catalog_delete_draft(api, api_client):
8192
catalog_module = api.container.catalog_module()

src/config/container.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from modules.catalog import CatalogModule
66
from modules.iam import IamModule
7+
from seedwork.application.event_dispatcher import InMemoryEventDispatcher
78

89

910
def _default(val):
@@ -56,13 +57,16 @@ class Container(containers.DeclarativeContainer):
5657

5758
config = providers.Configuration()
5859
engine = providers.Singleton(create_engine_once, config)
60+
domain_event_dispatcher = InMemoryEventDispatcher()
5961

6062
catalog_module = providers.Factory(
6163
CatalogModule,
6264
engine=engine,
65+
domain_event_dispatcher=domain_event_dispatcher,
6366
)
6467

6568
iam_module = providers.Factory(
6669
IamModule,
6770
engine=engine,
71+
domain_event_dispatcher=domain_event_dispatcher,
6872
)

src/modules/catalog/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ class CatalogModule(BusinessModule):
2424
PublishListingCommand,
2525
)
2626
supported_queries = (GetAllListings, GetListingDetails, GetListingsOfSeller)
27-
supported_events = ()
2827

2928
def configure_unit_of_work(self, uow):
3029
"""Here we have a chance to add extra UOW attributes to be injected into command/query handers"""

src/modules/catalog/application/__init__.py

Whitespace-only changes.

src/modules/catalog/application/event/__init__.py

Whitespace-only changes.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from modules.catalog import CatalogModule
2+
from modules.catalog.domain.events import ListingPublishedEvent
3+
4+
5+
def do_nothing_when_listing_published(
6+
event: ListingPublishedEvent, module: CatalogModule
7+
):
8+
print("Listing has been published")
Lines changed: 5 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,119 +1,8 @@
1-
import functools
2-
from collections.abc import Callable
3-
from inspect import signature
1+
from seedwork.application.registry import registry
42

5-
from pydantic.error_wrappers import ValidationError
3+
# decorators
4+
command_handler = registry.command_handler
65

7-
from seedwork.application.commands import Command
8-
from seedwork.application.queries import Query
9-
from seedwork.domain.exceptions import BusinessRuleValidationException
10-
from seedwork.infrastructure.logging import logger
6+
query_handler = registry.query_handler
117

12-
from .command_handlers import CommandResult
13-
from .query_handlers import QueryResult
14-
15-
16-
class Registry:
17-
def __init__(self):
18-
self.command_handlers = {}
19-
self.query_handlers = {}
20-
21-
def register_command_handler(
22-
self, command_class: type[Command], handler: Callable, handler_parameters
23-
):
24-
logger.info(f"registering command handler for {command_class} as {handler}")
25-
self.command_handlers[command_class] = (handler, handler_parameters)
26-
27-
def get_command_handler_for(self, command_class) -> Callable:
28-
assert (
29-
command_class in self.command_handlers
30-
), f"handler for {command_class} not registered"
31-
return self.command_handlers[command_class][0]
32-
33-
def get_command_handler_parameters_for(self, command_class) -> dict:
34-
return self.command_handlers[command_class][1].copy()
35-
36-
def register_query_handler(
37-
self, query_class: type[Query], handler: Callable, handler_parameters
38-
):
39-
logger.info(f"registering query handler for {query_class} as {handler}")
40-
self.query_handlers[query_class] = (handler, handler_parameters)
41-
42-
def get_query_handler_for(self, query_class) -> dict:
43-
assert (
44-
query_class in self.query_handlers
45-
), f"handler for {query_class} not registered"
46-
return self.query_handlers[query_class][0]
47-
48-
def get_query_handler_parameters_for(self, query_class) -> Callable:
49-
return self.query_handlers[query_class][1].copy()
50-
51-
def clear(self):
52-
self.command_handlers.clear()
53-
54-
55-
def find_object_of_class(iterable, cls):
56-
for item in iterable:
57-
if isinstance(item, cls):
58-
return item
59-
return None
60-
61-
62-
registry = Registry()
63-
64-
65-
def command_handler(fn):
66-
@functools.wraps(fn)
67-
def decorator(*args, **kwargs):
68-
try:
69-
command = find_object_of_class(args, Command) or find_object_of_class(
70-
kwargs.items(), Command
71-
)
72-
print(
73-
"handling command",
74-
f"{type(command).__module__}.{type(command).__name__}",
75-
)
76-
return fn(*args, **kwargs)
77-
except ValidationError as e:
78-
return CommandResult.failure("Validation error", exception=e)
79-
except BusinessRuleValidationException as e:
80-
return CommandResult.failure("Business rule validation error", exception=e)
81-
82-
handler_signature = signature(fn)
83-
kwargs_iterator = iter(handler_signature.parameters.items())
84-
_, first_param = next(kwargs_iterator)
85-
command_class = first_param.annotation
86-
assert issubclass(
87-
command_class, Command
88-
), "The first parameter must be of type Command"
89-
handler_parameters = {}
90-
for name, param in kwargs_iterator:
91-
handler_parameters[name] = param.annotation
92-
registry.register_command_handler(command_class, decorator, handler_parameters)
93-
return decorator
94-
95-
96-
def query_handler(fn):
97-
@functools.wraps(fn)
98-
def decorator(*args, **kwargs):
99-
try:
100-
query = find_object_of_class(args, Query) or find_object_of_class(
101-
kwargs.items(), Query
102-
)
103-
print("handling query", f"{type(query).__module__}.{type(query).__name__}")
104-
return fn(*args, **kwargs)
105-
except ValidationError as e:
106-
return QueryResult.failed("Validation error", exception=e)
107-
except BusinessRuleValidationException as e:
108-
return QueryResult.failed("Business rule validation error", exception=e)
109-
110-
handler_signature = signature(fn)
111-
kwargs_iterator = iter(handler_signature.parameters.items())
112-
_, first_param = next(kwargs_iterator)
113-
query_class = first_param.annotation
114-
assert issubclass(query_class, Query), "The first parameter must be of type Command"
115-
handler_parameters = {}
116-
for name, param in kwargs_iterator:
117-
handler_parameters[name] = param.annotation
118-
registry.register_query_handler(query_class, decorator, handler_parameters)
119-
return decorator
8+
domain_event_handler = registry.domain_event_handler
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import abc
2+
from collections import defaultdict
3+
4+
from seedwork.domain.events import Event
5+
6+
7+
class EventDispatcher(metaclass=abc.ABCMeta):
8+
"""An interface for a generic event dispatcher"""
9+
10+
@abc.abstractmethod
11+
def add_event_handler(self, event_class: type[Event], event_handler: callable):
12+
raise NotImplementedError()
13+
14+
@abc.abstractmethod
15+
def dispatch(self):
16+
raise NotImplementedError()
17+
18+
19+
class InMemoryEventDispatcher(EventDispatcher):
20+
def __init__(self):
21+
self._handlers = defaultdict(set)
22+
23+
def add_event_handler(self, event_class: type[Event], event_handler: callable):
24+
self._handlers[event_class].add(event_handler)
25+
26+
def dispatch(self, event: type[Event]):
27+
event_class = type(event)
28+
for event_handler in self._handlers[event_class]:
29+
event_handler(event)

src/seedwork/application/modules.py

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from seedwork.application.event_dispatcher import EventDispatcher
12
from seedwork.infrastructure.logging import logger
23

34

@@ -26,7 +27,8 @@ def handle(module, query_or_command, *args, **kwargs):
2627

2728
from sqlalchemy.orm import Session
2829

29-
from seedwork.application.decorators import registry
30+
from seedwork.application.decorators import registry as default_registry
31+
from seedwork.domain.events import DomainEvent
3032
from seedwork.domain.repositories import GenericRepository
3133
from seedwork.infrastructure.request_context import request_context
3234

@@ -61,11 +63,34 @@ class BusinessModule:
6163
unit_of_work_class = UnitOfWork
6264
supported_commands = ()
6365
supported_queries = ()
64-
supported_events = ()
66+
event_handlers = ()
67+
registry = default_registry
6568

66-
def __init__(self, **kwargs):
69+
def __init__(self, domain_event_dispatcher: type[EventDispatcher], **kwargs):
6770
self._uow: ContextVar[UnitOfWork] = ContextVar("_uow", default=None)
6871
self.init_kwargs = kwargs
72+
self._domain_event_dispatcher = domain_event_dispatcher
73+
self.register_event_handlers()
74+
75+
def register_event_handlers(self):
76+
"""Registers all event handlers declared in this module"""
77+
if self._domain_event_dispatcher is None:
78+
return
79+
80+
for event_class in self.get_handleable_domain_events():
81+
self._domain_event_dispatcher.add_event_handler(
82+
event_class=event_class, event_handler=self.handle_domain_event
83+
)
84+
85+
def get_handleable_domain_events(self) -> list[type[DomainEvent]]:
86+
"""Returns a list of domain event classes that this module is capable of handling"""
87+
handled_event_types = set()
88+
for handler in self.event_handlers:
89+
event_class, handler_parameters = self.registry.inspect_handler_parameters(
90+
handler
91+
)
92+
handled_event_types.add(event_class)
93+
return handled_event_types
6994

7095
@contextmanager
7196
def unit_of_work(self, **kwargs):
@@ -84,7 +109,7 @@ def unit_of_work(self, **kwargs):
84109
request_context.correlation_id.set(None)
85110

86111
def create_unit_of_work(self, correlation_id, db_session):
87-
"""Unit of Work factory"""
112+
"""Unit of Work factory, creates new unit of work"""
88113
uow = self.unit_of_work_class(
89114
module=self,
90115
correlation_id=correlation_id,
@@ -94,11 +119,11 @@ def create_unit_of_work(self, correlation_id, db_session):
94119
return uow
95120

96121
def get_unit_of_work_init_kwargs(self):
97-
"""Provide additional kwargs for Unit of Work if you are using a custom one"""
122+
"""Returns additional kwargs used for initialization of new Unit of Work"""
98123
return dict()
99124

100125
def configure_unit_of_work(self, uow):
101-
"""Allows to alter Unit of Work (i.e. add extra attributes)"""
126+
"""Allows to alter Unit of Work (i.e. add extra attributes) after it is instantiated"""
102127

103128
def end_unit_of_work(self, uow):
104129
uow.db_session.commit()
@@ -107,35 +132,54 @@ def configure(self, **kwargs):
107132
self.init_kwargs = kwargs
108133

109134
def execute_command(self, command):
135+
"""Module entrypoint. Use it to change the state of the module by passing a command object"""
110136
command_class = type(command)
111137
assert (
112138
command_class in self.supported_commands
113139
), f"{command_class} is not included in {type(self).__name__}.supported_commands"
114-
handler = registry.get_command_handler_for(command_class)
115-
kwarg_params = registry.get_command_handler_parameters_for(command_class)
140+
handler = self.registry.get_command_handler_for(command_class)
141+
kwarg_params = self.registry.get_command_handler_parameters_for(command_class)
116142
kwargs = self.resolve_handler_kwargs(kwarg_params)
117-
return handler(command, **kwargs)
143+
command_result = handler(command, **kwargs)
144+
if command_result.is_success():
145+
self.publish_domain_events(command_result.events)
146+
return command_result
118147

119148
def execute_query(self, query):
149+
"""Module entrypoint. Use it to read the state of the module by passing a query object"""
120150
query_class = type(query)
121151
assert (
122152
query_class in self.supported_queries
123153
), f"{query_class} is not included in {type(self).__name__}.supported_queries"
124-
handler = registry.get_query_handler_for(query_class)
125-
kwarg_params = registry.get_query_handler_parameters_for(query_class)
154+
handler = self.registry.get_query_handler_for(query_class)
155+
kwarg_params = self.registry.get_query_handler_parameters_for(query_class)
126156
kwargs = self.resolve_handler_kwargs(kwarg_params)
127157
return handler(query, **kwargs)
128158

129159
@property
130160
def uow(self) -> UnitOfWork:
161+
"""Get current unit of work. Use self.unit_of_work() to create a new instance of UoW"""
131162
uow = self._uow.get()
132163
assert uow, "Unit of work not set, use context manager"
133164
return uow
134165

135166
def resolve_handler_kwargs(self, kwarg_params) -> dict:
167+
"""Match kwargs required by a function to attributes available in a unit of work"""
136168
kwargs = {}
137169
for param_name, param_type in kwarg_params.items():
138170
for attr in self.uow.__dict__.values():
139171
if isinstance(attr, param_type):
140172
kwargs[param_name] = attr
141173
return kwargs
174+
175+
def publish_domain_events(self, events):
176+
...
177+
178+
def handle_domain_event(self, event: type[DomainEvent]):
179+
"""Execute all registered handlers within this module for this event type"""
180+
for handler in self.event_handlers:
181+
event_class, handler_parameters = self.registry.inspect_handler_parameters(
182+
handler
183+
)
184+
if event_class is type(event):
185+
handler(event, self)

0 commit comments

Comments
 (0)