Skip to content

Commit 45eb039

Browse files
committed
Added support in the projections module for DCB applications.
Also some adjustments made whilst tracking down weird segmentation violation from projections with Python 3.13 and MsgStruct instances with zero attribute. Also increased version to 9.5.0a3.
1 parent a094aa1 commit 45eb039

File tree

19 files changed

+538
-145
lines changed

19 files changed

+538
-145
lines changed

docs/topics/projection.rst

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ Application subscriptions
2727

2828
This module provides an :class:`~eventsourcing.projection.ApplicationSubscription` class, which can
2929
be used to "subscribe" to the domain events of an application.
30+
Please note, the :class:`~eventsourcing.popo.POPOApplicationRecorder` and
31+
:class:`~eventsourcing.postgres.PostgresApplicationRecorder` classes implement the
32+
required :func:`~eventsourcing.persistence.ApplicationRecorder.subscribe`
33+
method, but the :class:`~eventsourcing.sqlite.SQLiteApplicationRecorder` class does not.
3034

3135
Application subscription objects are iterators that return domain events from an application sequence.
3236
Each domain event is accompanied by a tracking object that identifies the position of the
@@ -99,10 +103,48 @@ method which can be used to stop the subscription to the application recorder in
99103
subscription.stop() # ...so we can continue with the examples
100104
101105
102-
Please note, the :class:`~eventsourcing.popo.POPOApplicationRecorder` and
103-
:class:`~eventsourcing.postgres.PostgresApplicationRecorder` classes implement the
104-
required :func:`~eventsourcing.persistence.ApplicationRecorder.subscribe`
105-
method, but the :class:`~eventsourcing.sqlite.SQLiteApplicationRecorder` class does not.
106+
The :class:`~eventsourcing.projection.DCBApplicationSubscription` class is the equivalent for
107+
:ref:`DCB applications <DCB application>`. It returns :ref:`tagged decisions <DCB Tagged>`.
108+
Please note, the :ref:`DCB recorders <DCB recorders>` :class:`~eventsourcing.dcb.popo.InMemoryDCBRecorder`,
109+
:class:`~eventsourcing.dcb.postgres_tt.PostgresDCBRecorderTT` and the ``eventsourcing_umadb`` extension
110+
all implement the required :func:`~eventsourcing.dcb.api.DCBRecorder.subscribe` method.
111+
112+
.. code-block:: python
113+
114+
from uuid import UUID
115+
116+
from eventsourcing.dcb.application import DCBApplication
117+
from eventsourcing.dcb.domain import Perspective, Tagged
118+
from eventsourcing.dcb.msgpack import Decision, InitialDecision, MessagePackMapper
119+
from eventsourcing.projection import DCBApplicationSubscription
120+
from eventsourcing.utils import get_topic
121+
122+
123+
# Define a perspective.
124+
class MyPerspective(Perspective[Decision]):
125+
@property
126+
def cb(self) -> Selector | Sequence[Selector]:
127+
return []
128+
129+
130+
# Construct an application object.
131+
app = DCBApplication(env={"MAPPER_TOPIC": get_topic(MessagePackMapper)})
132+
133+
# Record an event.
134+
perspective = MyPerspective()
135+
perspective.append_new_decision(
136+
Tagged([], InitialDecision(originator_topic=""))
137+
)
138+
app.repository.save(perspective)
139+
140+
# Position in application sequence from which to subscribe.
141+
max_tracking_id = 0
142+
143+
with DCBApplicationSubscription(app, gt=max_tracking_id, topics=()) as subscription:
144+
for tagged_event, tracking in subscription:
145+
# Process the event and record new state with tracking information.
146+
subscription.stop() # ...so we can continue with the examples
147+
106148
107149
.. _Projection:
108150

@@ -150,7 +192,7 @@ The example below shows how a projection can be defined.
150192
from eventsourcing.projection import Projection
151193
from eventsourcing.utils import get_topic
152194
153-
class MyProjection(Projection["MyMaterialisedViewInterface"]):
195+
class AggregateEventProjection(Projection["MyMaterialisedViewInterface"]):
154196
name = "myprojection"
155197
topics = (get_topic(Aggregate.Event), )
156198
@@ -163,6 +205,35 @@ The example below shows how a projection can be defined.
163205
self.view.my_command(tracking)
164206
165207
208+
For projections that work with :ref:`DCB applications <DCB application>`, you will need to define the dispatching
209+
to work with :ref:`tagged decisions <DCB tagged>`. That is, because the ``process_event()`` method will receive
210+
:class:`~eventsourcing.dcb.domain.Tagged` objects, and because `singledispatchmethod` dispatches on the type of
211+
the first argument, you will need to forward ``tagged.decision`` and define handlers for different
212+
types of :class:`~eventsourcing.dcb.domain.Decision`.
213+
214+
.. code-block:: python
215+
216+
from eventsourcing.dcb.domain import Tagged
217+
from eventsourcing.dcb.msgpack import Decision, InitialDecision
218+
219+
220+
class TaggedDecisionProjection(Projection["MyMaterialisedViewInterface"]):
221+
name = "myprojection"
222+
topics = (get_topic(Decision), )
223+
224+
@singledispatchmethod
225+
def process_event(self, tagged: Tagged[Decision], tracking: Tracking) -> None:
226+
self.process_decision(tagged.decision, tracking)
227+
228+
@singledispatchmethod
229+
def process_decision(self, _: Decision, tracking: Tracking) -> None:
230+
pass
231+
232+
@process_decision.register
233+
def process_initial_decision(self, _: InitialDecision, tracking: Tracking) -> None:
234+
self.view.my_command(tracking)
235+
236+
166237
The example below indicates how the projection's materialised view can be defined.
167238

168239
.. code-block:: python
@@ -194,6 +265,7 @@ The example below indicates how the projection's materialised view can be define
194265
def my_command(self, tracking: Tracking) -> None:
195266
...
196267
268+
197269
.. _Projection runner:
198270

199271
Projection runner
@@ -249,7 +321,7 @@ signal after 1s.
249321
with ProjectionRunner(
250322
application_class=Application,
251323
view_class=MyPOPOMaterialisedView,
252-
projection_class=MyProjection,
324+
projection_class=AggregateEventProjection,
253325
env={},
254326
) as projection_runner:
255327
@@ -271,6 +343,37 @@ returned from calls to the event-sourced application's :func:`~eventsourcing.app
271343
method can be used by the user interface to :func:`~eventsourcing.persistence.TrackingRecorder.wait` until
272344
the "read model" has been updated.
273345

346+
The projection runner supports :ref:`DCB application classes <DCB application>`.
347+
348+
.. code-block:: python
349+
350+
import os, signal, threading, time
351+
352+
from eventsourcing.projection import ProjectionRunner
353+
354+
# For demonstration purposes, interrupt process with SIGINT after 1s.
355+
def sleep_then_kill() -> None:
356+
time.sleep(1)
357+
os.kill(os.getpid(), signal.SIGINT)
358+
359+
threading.Thread(target=sleep_then_kill).start()
360+
361+
# Run projection as a context manager.
362+
with ProjectionRunner(
363+
application_class=DCBApplication,
364+
view_class=MyPOPOMaterialisedView,
365+
projection_class=TaggedDecisionProjection,
366+
env={"MAPPER_TOPIC": get_topic(MessagePackMapper)},
367+
) as projection_runner:
368+
369+
# Register signal handler.
370+
signal.signal(signal.SIGINT, lambda *args: projection_runner.stop())
371+
372+
# Run until interrupted.
373+
projection_runner.run_forever()
374+
375+
376+
274377
See :doc:`Tutorial - Part 4 </topics/tutorial/part4>` for more guidance and examples.
275378

276379

@@ -302,6 +405,8 @@ to increment a ``Counter`` aggregate.
302405
.. literalinclude:: ../../tests/projection_tests/test_event_sourced_projection.py
303406
:pyobject: Counter
304407

408+
This library does does not yet support projections event-soured with :ref:`DCB applications <DCB application>`.
409+
305410

306411
.. _Event-sourced projection runner:
307412

@@ -363,6 +468,8 @@ currently support application subscriptions.
363468
assert runner.projection.get_count(Aggregate.Event) == 1
364469
365470
471+
The event-sourced projection runner does not yet support projections event-soured with :ref:`DCB applications <DCB application>`.
472+
366473
Code reference
367474
==============
368475

docs/topics/tutorial/part4.rst

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,13 @@ is not capable of reconstructing.
205205
Example projection
206206
==================
207207

208-
For example, the ``EventCountersProjection`` class, shown below, processes events of an event-sourced application by
208+
For example, the ``AggregateEventCountersProjection`` class, shown below, processes events of an event-sourced application by
209209
calling methods of a concrete instance of ``EventCountersInterface``. It inherits the :class:`~eventsourcing.projection.Projection`
210210
class, setting the type argument as ``EventCountersInterface`` class. It sets the :py:attr:`~eventsourcing.projection.Projection.name`
211211
attribute as ``'eventcounters'``. It sets the :py:attr:`~eventsourcing.projection.Projection.topics` attribute
212212
to mention the topics of the domain events processed by this projection.
213213

214-
The ``EventCountersProjection`` class implements the :func:`~eventsourcing.projection.Projection.process_event`
214+
The ``AggregateEventCountersProjection`` class implements the :func:`~eventsourcing.projection.Projection.process_event`
215215
by calling the ``incr_created_event_counter()`` and ``incr_subsequent_event_counter()`` methods of ``EventCountersInterface``
216216
available on :data:`~eventsourcing.projection.Projection.view`.
217217

@@ -224,7 +224,7 @@ progress along an application sequence can be recorded, for example if large gap
224224
subscription returning checkpoints.
225225

226226
.. literalinclude:: ../../../tests/projection_tests/test_projection.py
227-
:pyobject: EventCountersProjection
227+
:pyobject: AggregateEventCountersProjection
228228

229229

230230
Runners
@@ -272,11 +272,11 @@ to wait until an event has been processed by the projection before calling a que
272272
Counting events in memory
273273
=========================
274274

275-
For example, the ``TestEventCountersProjection`` class, shown below, tests the ``EventCountersProjection``
275+
For example, the ``TestAggregateEventCountersProjection`` class, shown below, tests the ``AggregateEventCountersProjection``
276276
projection class with the ``POPOEventCounters`` class.
277277

278278
.. literalinclude:: ../../../tests/projection_tests/test_projection.py
279-
:pyobject: TestEventCountersProjection
279+
:pyobject: TestAggregateEventCountersProjection
280280

281281
The method ``test_event_counters_projection()`` constructs a runner, and from the runner gets references
282282
to an event-sourced application "write model" and a materialised view "read model".
@@ -286,7 +286,7 @@ instance of the application object and of the materialised view object as the pr
286286
for generating events and for getting the counted numbers of events.
287287

288288
Events are generated in the event-sourced application "write model". The "created" and subsequent events are
289-
processed, by updating the materialised view "read model", according to the logic of the ``EventCountersProjection``
289+
processed, by updating the materialised view "read model", according to the logic of the ``AggregateEventCountersProjection``
290290
projection. The counted numbers of each kind of event are obtained from the "read model". The materialised view is
291291
"eventually consistent" because the event processing is asynchronous, and so the
292292
:func:`~eventsourcing.persistence.TrackingRecorder.wait` method is used to wait for the events to be
@@ -302,8 +302,8 @@ method will time out.
302302
Counting events in PostgreSQL
303303
=============================
304304

305-
The ``TestEventCountersProjectionWithPostgres`` class extends ``TestEventCountersProjection`` and runs
306-
``EventCountersProjection`` with the ``PostgresEventCounters`` class.
305+
The ``TestAggregateEventCountersProjectionWithPostgres`` class extends ``TestAggregateEventCountersProjection`` and runs
306+
``AggregateEventCountersProjection`` with the ``PostgresEventCounters`` class.
307307
Because this test case uses a durable database for both the event-sourced application and the materialised view,
308308
any instance of the application can be used to write events, and any instance of the materialised view can be used to
309309
obtain the counted numbers of events. If a durable database is used in production, the event-sourced
@@ -318,7 +318,7 @@ caused by the ``SpannerThrown`` event. The event-sourced application and the mat
318318
databases. But in this example they are configured more simply to use different tables in the same database.
319319

320320
.. literalinclude:: ../../../tests/projection_tests/test_projection.py
321-
:pyobject: TestEventCountersProjectionWithPostgres
321+
:pyobject: TestAggregateEventCountersProjectionWithPostgres
322322

323323

324324
Exercises

eventsourcing/dcb/api.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from abc import ABC, abstractmethod
44
from collections.abc import Iterator
55
from dataclasses import dataclass, field
6-
from typing import TYPE_CHECKING, Any
6+
from typing import TYPE_CHECKING, Any, Generic, TypeVar
77

88
from eventsourcing.persistence import ProgrammingError
99

@@ -92,7 +92,7 @@ def subscribe(
9292
query: DCBQuery | None = None,
9393
*,
9494
after: int | None = None,
95-
) -> DCBSubscription:
95+
) -> DCBSubscription[Self]:
9696
"""
9797
Returns all events, unless 'after' is given then only those with position
9898
greater than 'after', and unless any query items are given, then only those
@@ -103,10 +103,13 @@ def subscribe(
103103
"""
104104

105105

106-
class DCBSubscription(Iterator[DCBSequencedEvent]):
106+
TDCBRecorder_co = TypeVar("TDCBRecorder_co", bound=DCBRecorder, covariant=True)
107+
108+
109+
class DCBSubscription(Iterator[DCBSequencedEvent], Generic[TDCBRecorder_co]):
107110
def __init__(
108111
self,
109-
recorder: DCBRecorder,
112+
recorder: TDCBRecorder_co,
110113
query: DCBQuery | None = None,
111114
after: int | None = None,
112115
) -> None:

eventsourcing/dcb/application.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import os
4-
from typing import TYPE_CHECKING, Any, Generic
4+
from typing import TYPE_CHECKING, Any, Generic, cast
55

66
from eventsourcing.dcb.domain import (
77
EnduringObject,
@@ -15,6 +15,7 @@
1515
from eventsourcing.dcb.persistence import (
1616
DCBEventStore,
1717
DCBInfrastructureFactory,
18+
DCBMapper,
1819
NotFoundError,
1920
)
2021
from eventsourcing.utils import Environment, EnvType, resolve_topic
@@ -37,10 +38,15 @@ def __init_subclass__(cls, **kwargs: Any) -> None:
3738
def __init__(self, env: EnvType | None = None):
3839
self.env = self.construct_env(self.name, env)
3940
self.factory = DCBInfrastructureFactory.construct(self.env)
40-
self.recorder = self.factory.dcb_event_store()
41+
self.recorder = self.factory.dcb_recorder()
4142
if "MAPPER_TOPIC" in self.env:
42-
mapper = resolve_topic(self.env["MAPPER_TOPIC"])()
43-
self.events = DCBEventStore(mapper, self.recorder)
43+
# Only need a mapper, event store, and repository
44+
# if we are using the higher-level abstractions.
45+
self.mapper = cast(
46+
DCBMapper[Any], resolve_topic(self.env["MAPPER_TOPIC"])()
47+
)
48+
assert isinstance(self.mapper, DCBMapper)
49+
self.events = DCBEventStore(self.mapper, self.recorder)
4450
self.repository = DCBRepository(self.events)
4551

4652
def construct_env(self, name: str, env: EnvType | None = None) -> Environment:
@@ -51,6 +57,9 @@ def construct_env(self, name: str, env: EnvType | None = None) -> Environment:
5157
_env.update(env)
5258
return Environment(name, _env)
5359

60+
def close(self) -> None:
61+
self.factory.close()
62+
5463
def __enter__(self) -> Self:
5564
self.factory.__enter__()
5665
return self
@@ -61,6 +70,7 @@ def __exit__(
6170
exc_val: BaseException | None,
6271
exc_tb: TracebackType | None,
6372
) -> None:
73+
self.close()
6474
self.factory.__exit__(exc_type, exc_val, exc_tb)
6575

6676

eventsourcing/dcb/persistence.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
DCBRecorder,
1616
DCBSequencedEvent,
1717
DCBSubscription,
18+
TDCBRecorder_co,
1819
)
1920
from eventsourcing.dcb.domain import (
2021
Selector,
@@ -49,6 +50,8 @@ def append(
4950
cb: Selector | Sequence[Selector] | None = None,
5051
after: int | None = None,
5152
) -> int:
53+
if len(events) == 0:
54+
return 0
5255
if not cb and not after:
5356
condition = None
5457
else:
@@ -113,14 +116,14 @@ class NotFoundError(Exception):
113116

114117
class DCBInfrastructureFactory(BaseInfrastructureFactory[TTrackingRecorder], ABC):
115118
@abstractmethod
116-
def dcb_event_store(self) -> DCBRecorder:
119+
def dcb_recorder(self) -> DCBRecorder:
117120
pass # pragma: no cover
118121

119122

120-
class DCBListenNotifySubscription(DCBSubscription):
123+
class DCBListenNotifySubscription(DCBSubscription[TDCBRecorder_co]):
121124
def __init__(
122125
self,
123-
recorder: DCBRecorder,
126+
recorder: TDCBRecorder_co,
124127
query: DCBQuery | None = None,
125128
after: int | None = None,
126129
) -> None:

0 commit comments

Comments
 (0)