Skip to content

Commit 3545e63

Browse files
docs: update cqrs chapter
1 parent c769232 commit 3545e63

File tree

1 file changed

+124
-29
lines changed

1 file changed

+124
-29
lines changed

content/recipes/cqrs.md

Lines changed: 124 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
### CQRS
22

3-
The flow of simple [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) (Create, Read, Update and Delete) applications can be described using the following steps:
3+
The flow of simple [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) (Create, Read, Update and Delete) applications can be described as follows:
44

5-
1. The **controllers** layer handles HTTP requests and delegates tasks to the services layer.
6-
2. The **services layer** is where most of the business logic lives.
7-
3. Services use **repositories / DAOs** to change / persist entities.
5+
1. The controllers layer handles HTTP requests and delegates tasks to the services layer.
6+
2. The services layer is where most of the business logic lives.
7+
3. Services use repositories / DAOs to change / persist entities.
88
4. Entities act as containers for the values, with setters and getters.
99

10-
In most cases, for small and medium-sized applications, this pattern is sufficient. However, when our requirements become more complex, the **CQRS** model may be more appropriate and scalable. To facilitate that model, Nest provides a lightweight [CQRS module](https://github.com/nestjs/cqrs). This chapter describes how to use it.
10+
While this pattern is usually sufficient for small and medium-sized applications, it may not be the best choice for larger, more complex applications. In such cases, the **CQRS** (Command and Query Responsibility Segregation) model may be more appropriate and scalable (depending on the application's requirements). Benefits of this model include:
11+
12+
- **Separation of concerns**. The model separates the read and write operations into separate models.
13+
- **Scalability**. The read and write operations can be scaled independently.
14+
- **Flexibility**. The model allows for the use of different data stores for read and write operations.
15+
- **Performance**. The model allows for the use of different data stores optimized for read and write operations.
16+
17+
To facilitate that model, Nest provides a lightweight [CQRS module](https://github.com/nestjs/cqrs). This chapter describes how to use it.
1118

1219
#### Installation
1320

@@ -19,7 +26,7 @@ $ npm install --save @nestjs/cqrs
1926

2027
#### Commands
2128

22-
In this model, each action is called a **Command**. When a command is dispatched, the application reacts to it. Commands can be dispatched from the services layer, or directly from controllers/gateways. Commands are consumed by **Command Handlers**.
29+
Commands are used to change the application state. They should be task-based, rather than data centric. When a command is dispatched, it is handled by a corresponding **Command Handler**. The handler is responsible for updating the application state.
2330

2431
```typescript
2532
@@filename(heroes-game.service)
@@ -49,7 +56,7 @@ export class HeroesGameService {
4956
}
5057
```
5158

52-
Here's a sample service that dispatches `KillDragonCommand`. Let's see how the command looks:
59+
In the code snippet above, we instantiate the `KillDragonCommand` class and pass it to the `CommandBus`'s `execute()` method. This is the demonstrated command class:
5360

5461
```typescript
5562
@@filename(kill-dragon.command)
@@ -68,7 +75,9 @@ export class KillDragonCommand {
6875
}
6976
```
7077

71-
The `CommandBus` is a **stream** of commands. It delegates commands to the equivalent handlers. Each command must have a corresponding **Command Handler**:
78+
The `CommandBus` represents a **stream** of commands. It is responsible for dispatching commands to the appropriate handlers. The `execute()` method returns a promise, which resolves to the value returned by the handler.
79+
80+
Let's create a handler for the `KillDragonCommand` command.
7281

7382
```typescript
7483
@@filename(kill-dragon.handler)
@@ -102,11 +111,19 @@ export class KillDragonHandler {
102111
}
103112
```
104113

105-
With this approach, every application state change is driven by the occurrence of a **Command**. The logic is encapsulated in handlers. With this approach, we can simply add behavior like logging or persisting commands in the database (e.g., for diagnostics purposes).
114+
This handler retrieves the `Hero` entity from the repository, calls the `killEnemy()` method, and then persists the changes. The `KillDragonHandler` class implements the `ICommandHandler` interface, which requires the implementation of the `execute()` method. The `execute()` method receives the command object as an argument.
115+
116+
#### Queries
117+
118+
Queries are used to retrieve data from the application state. They should be data centric, rather than task-based. When a query is dispatched, it is handled by a corresponding **Query Handler**. The handler is responsible for retrieving the data.
119+
120+
The `QueryBus` follows the same pattern as the `CommandBus`. Query handlers should implement the `IQueryHandler` interface and be annotated with the `@QueryHandler()` decorator.
106121

107122
#### Events
108123

109-
Command handlers neatly encapsulate logic. While beneficial, the application structure is still not flexible enough, not **reactive**. To remedy this, we also introduce **events**.
124+
Events are used to notify other parts of the application about changes in the application state. They are dispatched by **models** or directly using the `EventBus`. When an event is dispatched, it is handled by corresponding **Event Handlers**. Handlers can then, for example, update the read model.
125+
126+
For demonstration purposes, let's create an event class:
110127

111128
```typescript
112129
@@filename(hero-killed-dragon.event)
@@ -125,7 +142,7 @@ export class HeroKilledDragonEvent {
125142
}
126143
```
127144

128-
Events are asynchronous. They are dispatched either by **models** or directly using `EventBus`. In order to dispatch events, models have to extend the `AggregateRoot` class.
145+
Now while events can be dispatched directly using the `EventBus.publish()` method, we can also dispatch them from the model. Let's update the `Hero` model to dispatch the `HeroKilledDragonEvent` event when the `killEnemy()` method is called.
129146

130147
```typescript
131148
@@filename(hero.model)
@@ -135,7 +152,7 @@ export class Hero extends AggregateRoot {
135152
}
136153

137154
killEnemy(enemyId: string) {
138-
// logic
155+
// Business logic
139156
this.apply(new HeroKilledDragonEvent(this.id, enemyId));
140157
}
141158
}
@@ -147,13 +164,13 @@ export class Hero extends AggregateRoot {
147164
}
148165

149166
killEnemy(enemyId) {
150-
// logic
167+
// Business logic
151168
this.apply(new HeroKilledDragonEvent(this.id, enemyId));
152169
}
153170
}
154171
```
155172

156-
The `apply()` method does not dispatch events yet because there's no relationship between the model and the `EventPublisher` class. How do we associate the model and the publisher? By using a publisher `mergeObjectContext()` method inside our command handler.
173+
The `apply()` method is used to dispatch events. It accepts an event object as an argument. However, since our model is not aware of the `EventBus`, we need to associate it with the model. We can do that by using the `EventPublisher` class.
157174

158175
```typescript
159176
@@filename(kill-dragon.handler)
@@ -193,14 +210,29 @@ export class KillDragonHandler {
193210
}
194211
```
195212

196-
Now everything works as expected. Notice that we need to `commit()` events since they're not being dispatched immediately. Obviously, an object doesn't have to exist up front. We can easily merge type context as well:
213+
The `EventPublisher#mergeObjectContext` method merges the event publisher into the provided object, which means that the object will now be able to publish events to the events stream.
214+
215+
Notice that in this example we also call the `commit()` method on the model. This method is used to dispatch any outstanding events. To automatically dispatch events, we can set the `autoCommit` property to `true`:
216+
217+
```typescript
218+
export class Hero extends AggregateRoot {
219+
constructor(private id: string) {
220+
super();
221+
this.autoCommit = true;
222+
}
223+
}
224+
```
225+
226+
In case we want to merge the event publisher into a non-existing object, but rather into a class, we can use the `EventPublisher#mergeClassContext` method:
197227

198228
```typescript
199229
const HeroModel = this.publisher.mergeClassContext(Hero);
200-
new HeroModel('id');
230+
const hero = new HeroModel('id'); // <-- HeroModel is a class
201231
```
202232

203-
Now the model has the ability to publish events. Additionally, we can emit events manually using `EventBus`:
233+
Now every instance of the `HeroModel` class will be able to publish events without using `mergeObjectContext()` method.
234+
235+
Additionally, we can emit events manually using `EventBus`:
204236

205237
```typescript
206238
this.eventBus.publish(new HeroKilledDragonEvent());
@@ -217,24 +249,25 @@ export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEv
217249
constructor(private repository: HeroRepository) {}
218250

219251
handle(event: HeroKilledDragonEvent) {
220-
// logic
252+
// Business logic
221253
}
222254
}
223255
```
224256

225-
Now we can move the **write logic** into the event handlers.
226-
227257
> info **Hint** Be aware that when you start using event handlers you get out of the traditional HTTP web context.
258+
>
228259
> - Errors in `CommandHandlers` can still be caught by built-in [Exception filters](/exception-filters).
229260
> - Errors in `EventHandlers` can't be caught by Exception filters: you will have to handle them manually. Either by a simple `try/catch`, using [Sagas](/recipes/cqrs#sagas) by triggering a compensating event, or whatever other solution you choose.
230261
> - HTTP Responses in `CommandHandlers` can still be sent back to the client.
231262
> - HTTP Responses in `EventHandlers` cannot. If you want to send information to the client you could use [WebSocket](/websockets/gateways), [SSE](/techniques/server-sent-events), or whatever other solution you choose.
232263
233264
#### Sagas
234265

235-
This type of **Event-Driven Architecture** improves application **reactiveness and scalability**. Now, when we have events, we can simply react to them in various ways. **Sagas** are the final building block from an architectural point of view.
266+
Saga is a long-running process that listens to events and may trigger new commands. It is usually used to manage complex workflows in the application. For example, when a user signs up, a saga may listen to the `UserRegisteredEvent` and send a welcome email to the user.
267+
268+
Sagas are an extremely powerful feature. A single saga may listen for 1..\* events. Using the [RxJS](https://github.com/ReactiveX/rxjs) library, we can filter, map, fork, and merge event streams to create sophisticated workflows. Each saga returns an Observable which produces a command instance. This command is then dispatched **asynchronously** by the `CommandBus`.
236269

237-
Sagas are an extremely powerful feature. A single saga may listen for 1..\* events. Using the [RxJS](https://github.com/ReactiveX/rxjs) library, it can combine, merge, filter or apply other `RxJS` operators on the event stream. Each saga returns an Observable which contains a command. This command is dispatched **asynchronously**.
270+
Let's create a saga that listens to the `HeroKilledDragonEvent` and dispatches the `DropAncientItemCommand` command.
238271

239272
```typescript
240273
@@filename(heroes-game.saga)
@@ -263,15 +296,13 @@ export class HeroesGameSagas {
263296

264297
> info **Hint** The `ofType` operator and the `@Saga()` decorator are exported from the `@nestjs/cqrs` package.
265298
266-
We declared a rule - when any hero kills the dragon, the ancient item should be dropped. With this in place, `DropAncientItemCommand` will be dispatched and processed by the appropriate handler.
299+
The `@Saga()` decorator marks the method as a saga. The `events$` argument is an Observable stream of all events. The `ofType` operator filters the stream by the specified event type. The `map` operator maps the event to a new command instance.
267300

268-
#### Queries
269-
270-
The `CqrsModule` can also be used for handling queries. The `QueryBus` follows the same pattern as the `CommandsBus`. Query handlers should implement the `IQueryHandler` interface and be marked with the `@QueryHandler()` decorator.
301+
In this example, we map the `HeroKilledDragonEvent` to the `DropAncientItemCommand` command. The `DropAncientItemCommand` command is then auto-dispatched by the `CommandBus`.
271302

272303
#### Setup
273304

274-
Finally, let's look at how to set up the whole CQRS mechanism.
305+
To wrap up, we need to register all command handlers, event handlers, and sagas in the `HeroesGameModule`:
275306

276307
```typescript
277308
@@filename(heroes-game.module)
@@ -292,9 +323,73 @@ export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
292323
export class HeroesGameModule {}
293324
```
294325

295-
#### Summary
326+
#### Unhandled exceptions
327+
328+
Event handlers are executed in the asynchronous manner. This means they should always handle all exceptions to prevent application from entering the inconsistent state. However, if an exception is not handled, the `EventBus` will create the `UnhandledExceptionInfo` object and push it to the `UnhandledExceptionBus` stream. This stream is an `Observable` which can be used to process unhandled exceptions.
296329

297-
`CommandBus`, `QueryBus` and `EventBus` are **Observables**. This means that you can easily subscribe to the whole stream and enrich your application with **Event Sourcing**.
330+
```typescript
331+
private destroy$ = new Subject<void>();
332+
333+
constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
334+
this.unhandledExceptionsBus
335+
.pipe(takeUntil(this.destroy$))
336+
.subscribe((exceptionInfo) => {
337+
// Handle exception here
338+
// e.g. send it to external service, terminate process, or publish a new event
339+
});
340+
}
341+
342+
onModuleDestroy() {
343+
this.destroy$.next();
344+
this.destroy$.complete();
345+
}
346+
```
347+
348+
To filter out exceptions, we can use the `ofType` operator, as follows:
349+
350+
```typescript
351+
this.unhandledExceptionsBus.pipe(takeUntil(this.destroy$), UnhandledExceptionBus.ofType(TransactionNotAllowedException)).subscribe((exceptionInfo) => {
352+
// Handle exception here
353+
});
354+
```
355+
356+
Where `TransactionNotAllowedException` is the exception we want to filter out.
357+
358+
The `UnhandledExceptionInfo` object contains the following properties:
359+
360+
```typescript
361+
export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
362+
/**
363+
* The exception that was thrown.
364+
*/
365+
exception: Exception;
366+
/**
367+
* The cause of the exception (event or command reference).
368+
*/
369+
cause: Cause;
370+
}
371+
```
372+
373+
#### Subscribing to all events
374+
375+
`CommandBus`, `QueryBus` and `EventBus` are all **Observables**. This means that we can subscribe to the entire stream and, for example, process all events. For example, we can log all events to the console, or save them to the event store.
376+
377+
```typescript
378+
private destroy$ = new Subject<void>();
379+
380+
constructor(private eventBus: EventBus) {
381+
this.eventBus
382+
.pipe(takeUntil(this.destroy$))
383+
.subscribe((event) => {
384+
// Save events to database
385+
});
386+
}
387+
388+
onModuleDestroy() {
389+
this.destroy$.next();
390+
this.destroy$.complete();
391+
}
392+
```
298393

299394
#### Example
300395

0 commit comments

Comments
 (0)