22:mod: `~eventsourcing.projection ` --- Projections
33================================================
44
5- This module shows how :doc: ` event-sourced applications
6- </topics/application>` can be projected into materialised
5+ This module may help you develop event-processing components that project
6+ the state of an :doc: ` event-sourced applications </topics/application >` into materialised
77views that support arbitrary queries.
88
9- The central idea here follows the notion from `CQRS <https://en.wikipedia.org/wiki/Command_Query_Responsibility_Segregation >`_
9+ The central idea of this module follows the notion from `CQRS <https://en.wikipedia.org/wiki/Command_Query_Responsibility_Segregation >`_
1010of having separate command and query interfaces. This idea is often implemented in event-sourced systems
11- with distinct and separate "write" and "read" models. The "write model" is an event-sourced application,
11+ by developing distinct and separate "write" and "read" models. The "write model" is an event-sourced application,
1212and the "read model" is one or many "materialised views" of the event-sourced application. The event-sourced
1313application is projected into a materialised view, by processing the application's events,
1414usually with an asynchronous event-processing component, so that the materialised view is
@@ -21,117 +21,162 @@ the tracking records to be unique, and by resuming to process the application fr
2121position indicated by the last tracking record, the materialised
2222view will be a "reliable" deterministic function of the state of the application.
2323
24+ .. _Subscriptions :
2425
25- Tracking recorders
26- ==================
26+ Application subscriptions
27+ =========================
2728
28- The library's :ref: `tracking recorder <Tracking recorder >` classes
29- (:class: `~eventsourcing.popo.POPOTrackingRecorder `, :class: `~eventsourcing.sqlite.SQLiteTrackingRecorder `,
30- and :class: `~eventsourcing.postgres.PostgresTrackingRecorder `) can be extended arbitrarily to define command
31- and query methods that update and present a materialised view of the state of an event-sourced application
32- atomically with tracking information. The tracking information indicates the position in an application
33- sequence of an event that was processed when the materialised view was updated.
29+ This module provides an :class: `~eventsourcing.projection.ApplicationSubscription ` class, which can
30+ be used to "subscribe" to the domain events of an application.
3431
35- For example, the :class: `CountRecorder ` class, included below, extends the library's abstract base class
36- :class: `~eventsourcing.persistence.TrackingRecorder ` by defining abstract methods
37- :func: `incr_created_events_counter `, :func: `incr_subsequent_events_counter `, :func: `get_created_events_counter `,
38- :func: `get_subsequent_events_counter `, and :func: `get_all_events_counter `. These methods
39- will be implemented by concrete tracking recorder classes to update a materialised view
40- that counts aggregate events from an event-sourced application.
32+ Application subscriptions are useful when running an event processing component
33+ that projects the state of an event-sourced application into a materialised view
34+ of the state of the application.
4135
42- .. literalinclude :: ../../tests/projection_tests/test_projection.py
43- :pyobject: CountRecorder
36+ Application subscription objects are iterators that yield all domain events recorded
37+ in an application sequence. Iterating over an application subscription will block when
38+ all recorded domain events have been yielded, and then continue when new events are recorded.
4439
45- The :class: `POPOCountRecorder ` class, included below, implements this interface using plain old Python objects.
40+ Application subscription objects can be constructed with an application object, and an integer
41+ position in its application sequence (a notification ID). The application subscription will yield
42+ domain events that have notification IDs greater than the given position.
43+
44+ Application subscription objects use the :func: `~eventsourcing.persistence.ApplicationRecorder.subscribe `
45+ method of the application's recorder to listen to the application's database, selecting notification objects
46+ and converting them into domain events using the application's mapper.
47+
48+ Each yielded domain event is accompanied by a tracking object that identifies the position of the
49+ domain event in its application sequence. The tracking objects yielded by the application subscription
50+ can be recorded atomically along with the new state that results from processing the domain event.
51+
52+ .. code-block :: python
53+
54+ from eventsourcing.application import Application
55+ from eventsourcing.domain import Aggregate
56+ from eventsourcing.projection import ApplicationSubscription
57+
58+ # Construct an application object.
59+ app = Application()
60+
61+ # Record an event.
62+ aggregate = Aggregate()
63+ app.save(aggregate)
64+
65+ # Position in application sequence from which to subscribe.
66+ max_tracking_id = 0
67+
68+ with ApplicationSubscription(app, gt = max_tracking_id) as subscription:
69+ for domain_event, tracking in subscription:
70+ # Process the event and record new state with tracking information.
71+ break # ...so we can continue with the examples
72+
73+ If an event-processing component is using a :ref: `tracking recorder <Tracking recorder >` to record new state atomically
74+ with tracking objects, subscriptions can be started from the notification ID returned from the tracking recorder's
75+ :func: `~eventsourcing.persistence.TrackingRecorder.max_tracking_id ` method.
4676
47- .. literalinclude :: ../../tests/projection_tests/test_projection.py
48- :pyobject: POPOCountRecorder
4977
5078.. _Projection :
5179
5280Projection
5381==========
5482
55- The library's abstract base class :class: `~eventsourcing.projection.Projection ` can be used to define how
56- domain events will be processed. Subclasses are expected to use a particular kind of tracking recorder. It
57- defines an abstract method :func: `~eventsourcing.projection.Projection.process_event ` that must be implemented by subclasses.
83+ This module provides a generic abstract base class, :class: `~eventsourcing.projection.Projection `,
84+ which can be used to define how the domain events of an application will be processed.
5885
59- For example, the :class: `CountProjection ` class, included below, inherits :class: `~eventsourcing.projection.Projection `,
60- specifies that instances will use a :class: `CountRecorder `, and implements :func: `~eventsourcing.projection.Projection.process_event `
61- by calling :func: `incr_created_event_count ` for each :class: `Aggregate.Created <eventsourcing.domain.Aggregate.Created> ` event,
62- and by calling :func: `incr_subsequent_event_count ` for each subsequent :class: `Aggregate.Event <eventsourcing.domain.Aggregate.Event> `.
86+ The :class: `~eventsourcing.projection.Projection ` class is a generic class because it has one type variable,
87+ which is expected to be a type of tracking recorder.
6388
64- .. literalinclude :: ../../tests/projection_tests/test_projection.py
65- :pyobject: CountProjection
89+ The :class: `~eventsourcing.projection.Projection ` class has one required constructor argument, :data: `tracking_recorder `,
90+ which is expected to be a tracking recorder object of the type specified by the type variable. The constructor argument
91+ is used to initialise an object attribute called :data: `tracking_recorder `.
6692
67- .. _Projection runner :
93+ The :class: `~eventsourcing.projection.Projection ` class is an abstract class because it defines an abstract method,
94+ :func: `~eventsourcing.projection.Projection.process_event `, that must be implemented by subclasses.
6895
69- Projection Runner
70- =================
96+ The intention of this class is that it will be subclassed, and that domain events of an application will be processed by
97+ calling an implementation of the :func: `~eventsourcing.projection.Projection.process_event `, which will call command
98+ methods on a tracking recorder object given as the constructor argument when the subclass is constructed.
99+
100+ .. code-block :: python
101+
102+ from abc import ABC , abstractmethod
103+ from eventsourcing.domain import DomainEventProtocol
104+ from eventsourcing.dispatch import singledispatchmethod
105+ from eventsourcing.persistence import Tracking, TrackingRecorder
106+ from eventsourcing.popo import POPOTrackingRecorder
107+ from eventsourcing.postgres import PostgresTrackingRecorder
108+ from eventsourcing.projection import Projection
109+ from eventsourcing.sqlite import SQLiteTrackingRecorder
71110
72- The library's :class: `~eventsourcing.projection.ProjectionRunner ` class is provided for the purpose
73- or running projections.
111+ class MyTrackingRecorderInterface (TrackingRecorder , ABC ):
112+ @abstractmethod
113+ def my_command (self , tracking : Tracking):
114+ """ Updates materialised view"""
74115
75- A projection runner object can be constructed with an application class, a projection class, a tracking
76- recorder class, and an environment that specifies the persistence modules to be used by the application
77- and the tracking recorder.
116+ class MyPOPOTrackingRecorder (MyTrackingRecorderInterface , POPOTrackingRecorder ):
117+ def my_command (self , tracking : Tracking):
118+ with self ._datastore:
119+ # Insert tracking record...
120+ self ._insert_tracking(tracking)
121+ # ...and then update materialised view.
78122
79- The projection runner will construct an instance of the given application class, and an instance of
80- the given projection class, and an instance of the given tracking recorder class. It will
81- :ref: `subscribe to its application <Subscriptions >`, from the position indicated by its tracking recorder's
82- :func: `~eventsourcing.persistence.TrackingRecorder.max_tracking_id ` method. And then it will call
83- the :func: `~eventsourcing.projection.Projection.process_event ` method of the projection for each event
84- in the application sequence.
123+ class MySQLiteTrackingRecorder (MyTrackingRecorderInterface , SQLiteTrackingRecorder ):
124+ def my_command (self , tracking : Tracking):
125+ ...
85126
86- Because it starts a :ref: ` subscription < Subscriptions >` to the application, it will first catch up by
87- processing already recorded events that have not yet been processed. And then it will continue indefinitely
88- to process events that are recorded after the runner has been started .
127+ class MyPostgresTrackingRecorder ( MyTrackingRecorderInterface , PostgresTrackingRecorder ):
128+ def my_command ( self , tracking : Tracking):
129+ .. .
89130
90- The :class: `~eventsourcing.projection.ProjectionRunner ` class has a :func: `~eventsourcing.projection.ProjectionRunner.run_forever `
91- method, which blocks indefinitely, or until an optional timeout, or until an exception is raised by the projection or
92- by the subscription. This allows an event processing component to be started and run independently as a
93- separate operating system process, and then to terminate when there is an error. Operators of the system can
94- examine the error and resume processing by reconstructing the runner. Some errors may be transient operational
95- issues, such as database connectivity, in which case the processing could be resumed automatically. Some errors
96- may be programming errors, and will require manual intervention before the event processing can continue.
131+ class MyProjection (Projection[MyTrackingRecorderInterface]):
132+ @singledispatchmethod
133+ def process_event (self , domain_event : DomainEventProtocol, tracking : Tracking) -> None :
134+ pass
97135
98- The :class: ` TestCountProjection ` class shown below constructs a :class: ` ~eventsourcing.projection.ProjectionRunner `
99- with the library's :class: ` ~eventsourcing.application.Application ` class, the :class: ` CountProjection ` class,
100- and the :class: ` POPOCountRecorder `.
136+ @process_event.register
137+ def _ ( self , domain_event : Aggregate.Event, tracking : Tracking) -> None :
138+ self .tracking_recorder.my_command(tracking)
101139
102- Two aggregates are saved in the "write model". They have two subsequent events each.
103- The total counts for the application events are obtained from the "read model".
104140
105- .. literalinclude :: ../../tests/projection_tests/test_projection.py
106- :pyobject: TestCountProjection
141+ .. _Projection runner :
142+
143+ Projection runner
144+ =================
145+
146+ This module provides a :class: `~eventsourcing.projection.ProjectionRunner ` class, which can be used to run projections.
107147
108- If the application "write model" and the tracking recorder "read model" use a durable database, such as
109- PostgreSQL, any instance of the application can be used to write events, and any instance of the tracking
110- recorder can be used to query the materialised view. However, in this case, using the :ref: ` POPO module < popo-module >`
111- means that we need to use the same instance of the application and of the recorder.
148+ Projection runner objects can be constructed by calling the class with an application class, a projection class,
149+ and a tracking recorder class, and an optional environment. An application object will be constructed using the
150+ application class and the environment. An infrastructure factory will be constructed for the tracking recorder,
151+ also using the environment. A projection will also be constructed using the tracking recorder.
112152
153+ The projection runner will then start a subscription to the application, from the position indicated by the tracking
154+ recorder's :func: `~eventsourcing.persistence.TrackingRecorder.max_tracking_id ` method.
113155
114- With PostgreSQL
115- ===============
156+ The projection runner will iterate over the application subscription, calling the projection's
157+ :func: `~eventsourcing.projection.Projection.process_event ` method for each domain event
158+ and tracking object yielded by the application subscription.
116159
117- We can also implement the tracking recorder to work with PostgreSQL. The :func: `_incr_counter ` method
118- of :class: ` PostgresCountRecorder `, included below, updates the materialised view and records
119- a tracking object atomically in the same database transaction .
160+ The projection runner has a method :func: `~eventsourcing.projection.Projection.run_forever ` which will block
161+ until either the :func: ` ~eventsourcing.projection.Projection.process_event ` method raises an error, or until
162+ the application subscription raises an error, or until the optional timeout is reached .
120163
121- .. literalinclude :: ../../tests/projection_tests/test_projection.py
122- :pyobject: PostgresCountRecorder
164+ Projection runner objects can be used as context managers.
123165
124- Because this example uses a durable database, separate instances of the application and the recorder
125- can be used as interfaces to the "write model" and the "read model".
166+ .. code-block :: python
126167
127- The application and the projection could use separate databases, but in this example they simply
128- use different tables in the same database.
168+ from eventsourcing.projection import ProjectionRunner
129169
170+ with ProjectionRunner(
171+ application_class = Application,
172+ projection_class = MyProjection,
173+ tracking_recorder_class = MyPOPOTrackingRecorder,
174+ env = {},
175+ ) as runner:
176+ runner.run_forever(timeout = 1 )
130177
131- .. literalinclude :: ../../tests/projection_tests/test_projection.py
132- :pyobject: TestCountProjectionWithPostgres
133178
134- See example :doc: `/topics/examples/fts-projection ` for a more substantial example .
179+ See :doc: `Tutorial - Part 4 < /topics/tutorial/part4 > ` for more guidance on using this module .
135180
136181Code reference
137182==============
0 commit comments