Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 30bdd77

Browse files
Re-wrote Message consumption doc
1 parent 241ef9c commit 30bdd77

File tree

1 file changed

+107
-102
lines changed

1 file changed

+107
-102
lines changed

docs/message-consumption.md

Lines changed: 107 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -104,82 +104,11 @@ public class Worker : BackgroundService
104104
}
105105
```
106106

107-
### Synchronous message handlers
108-
109-
The second step without which receiving messages does not make sense - configuration of message handling services. If there are no message handlers then received messages will not be processed.
110-
111-
Message handlers are classes that implement the `IMessageHandler` interface (or a few others) and contain functionality (including error handling) for processing messages.
112-
You can register `IMessageHandler` in your `Startup` like this.
113-
114-
```c#
115-
public class Startup
116-
{
117-
public static IConfiguration Configuration;
118-
119-
public Startup(IConfiguration configuration)
120-
{
121-
Configuration = configuration;
122-
}
123-
124-
public void ConfigureServices(IServiceCollection services)
125-
{
126-
var clientConfiguration = Configuration.GetSection("RabbitMq");
127-
var exchangeConfiguration = Configuration.GetSection("RabbitMqExchange");
128-
services.AddRabbitMqClient(clientConfiguration)
129-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
130-
.AddMessageHandlerSingleton<CustomMessageHandler>("routing.key");
131-
}
132-
}
133-
```
134-
135-
RabbitMQ client and exchange configuration sections are not specified in this example, but covered [here](rabbit-configuration.md) and [here](exchange-configuration.md).
136-
137-
`IMessageHandler` implementation will "listen" for messages by the specified routing key, or a collection of routing keys. If it is necessary, you can also register multiple message handler at once.
138-
139-
```c#
140-
services.AddRabbitMqClient(clientConfiguration)
141-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
142-
.AddMessageHandlerSingleton<CustomMessageHandler>("first.routing.key")
143-
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>(new[] { "second.routing.key", "third.routing.key" });
144-
```
145-
146-
You can also use **pattern matching** in routes where `*` (star) can substitute for exactly one word and `#` (hash) can substitute for zero or more words.
147-
148-
```c#
149-
services.AddRabbitMqClient(clientConfiguration)
150-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
151-
.AddMessageHandlerSingleton<CustomMessageHandler>("*.routing.*")
152-
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>(new[] { "#.key", "third.*" });
153-
```
154-
155-
You are also allowed to specify the exact exchange which will be "listened" by a message handler with the given routing key (or a pattern).
156-
157-
```c#
158-
services.AddRabbitMqClient(clientConfiguration)
159-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
160-
.AddMessageHandlerSingleton<CustomMessageHandler>("*.*.*", "ExchangeName")
161-
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>("routing.key", "ExchangeName");
162-
```
163-
164-
You can register it in two modes, **singleton** or **transient**, using `AddMessageHandlerSingleton` or `AddMessageHandlerTransient` methods respectively.
165-
166-
```c#
167-
services.AddRabbitMqClient(clientConfiguration)
168-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
169-
.AddMessageHandlerTransient<CustomMessageHandler>(new[] { "#.key", "third.*" });
170-
```
171-
172-
You can also set multiple message handlers for managing messages received by one routing key. This case can happen when you want to divide responsibilities between services (e.g. one contains business logic, and the other writes messages in the database).
107+
The second step is to define classes that will take responsibility of handling received messages. There are synchronous and asynchronous message handlers.
173108

174-
```c#
175-
services.AddRabbitMqClient(clientConfiguration)
176-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
177-
.AddMessageHandlerSingleton<CustomMessageHandler>("first.routing.key")
178-
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>("first.routing.key")
179-
.AddMessageHandlerSingleton<OneMoreCustomMessageHandler>("first.routing.key");
180-
```
109+
### Synchronous message handlers
181110

182-
`IMessageHandler` consists of one method `Handle` that gets a message in a string format. You can deserialize it (if it is a json message) or handle a raw value.
111+
`IMessageHandler` consists of one method `Handle` that gets a message in a string format. You can deserialize that message (if it is a json message) or handle its raw value.
183112
Thus, a message handler will look like this.
184113

185114
```c#
@@ -193,7 +122,7 @@ public class CustomMessageHandler : IMessageHandler
193122
}
194123
```
195124

196-
You can also inject services inside the `IMessageHandler` constructor.
125+
You can also inject almost any services inside the `IMessageHandler` constructor.
197126

198127
```c#
199128
public class CustomMessageHandler : IMessageHandler
@@ -212,16 +141,7 @@ public class CustomMessageHandler : IMessageHandler
212141
```
213142

214143
The only exception is the `IQueueService`. You can't inject it inside a message handler because of the appearance of cyclic dependencies. If you want to use an instance of `IQueueService` (e.g. handle one message and send another) use `INonCyclicMessageHandler`.
215-
`INonCyclicMessageHandler` can be registered the same way as `IMessageHandler`. There are similar semantic methods for adding it in **singleton** or **transient** modes.
216-
217-
```c#
218-
services.AddRabbitMqClient(clientConfiguration)
219-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
220-
.AddNonCyclicMessageHandlerTransient<CustomNonCyclicMessageHandler>("first.routing.key")
221-
.AddNonCyclicMessageHandlerSingleton<AnotherNonCyclicCustomMessageHandler>(new [] { "second.routing.key", "third.routing.key" });
222-
```
223-
224-
And the code of `INonCyclicMessageHandler` will look like this.
144+
An example of `INonCyclicMessageHandler` will look like this.
225145

226146
```c#
227147
public class CustomNonCyclicMessageHandler : INonCyclicMessageHandler
@@ -244,27 +164,12 @@ public class CustomNonCyclicMessageHandler : INonCyclicMessageHandler
244164
### Asynchronous message handlers
245165

246166
`IMessageHandler` and `INonCyclicMessageHandler` work synchronously, but if you want to use an async technology then use `IAsyncMessageHandler` and `IAsyncNonCyclicMessageHandler`.
247-
There are extension methods that allow you to register it the same way as synchronous ones in **singleton** or **transient** modes.
248-
249-
```c#
250-
services.AddRabbitMqClient(clientConfiguration)
251-
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
252-
.AddAsyncMessageHandlerTransient<CustomAsyncMessageHandler>("first.routing.key")
253-
.AddAsyncNonCyclicMessageHandlerSingleton<CustomAsyncNonCyclicMessageHandler>(new [] { "second.routing.key", "third.routing.key" });
254-
```
255167

256168
`IAsyncMessageHandler` will look like this.
257169

258170
```c#
259171
public class CustomAsyncMessageHandler : IAsyncMessageHandler
260172
{
261-
readonly ILogger<CustomAsyncMessageHandler> _logger;
262-
263-
public CustomAsyncMessageHandler(ILogger<CustomAsyncMessageHandler> logger)
264-
{
265-
_logger = logger;
266-
}
267-
268173
public async Task Handle(string message, string routingKey)
269174
{
270175
// Do whatever you want asynchronously!
@@ -279,6 +184,7 @@ public class CustomAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
279184
{
280185
readonly ILogger<CustomAsyncNonCyclicMessageHandler> _logger;
281186

187+
// Injecting services is a privilege, you can leave it clean.
282188
public CustomAsyncNonCyclicMessageHandler(ILogger<CustomAsyncNonCyclicMessageHandler> logger)
283189
{
284190
_logger = logger;
@@ -295,13 +201,112 @@ public class CustomAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
295201

296202
So you can use async/await power inside your message handler.
297203

204+
### Message handlers registering
205+
206+
The third and final step is to register defined message handlers and let them "listen" for messages relying on specified rules. If there are no message handlers registered then received messages will not be processed.
207+
You can register `IMessageHandler` in your `Startup` calling one of `AddMessageHandler`-ish methods. You are allowed to add message handlers in two modes, **singleton** or **transient**, and there are extension methods for each mode and each message handler type:
208+
209+
- `AddMessageHandlerTransient`
210+
- `AddMessageHandlerSingleton`
211+
- `AddNonCyclicMessageHandlerTransient`
212+
- `AddNonCyclicMessageHandlerSingleton`
213+
- `AddAsyncMessageHandlerTransient`
214+
- `AddAsyncMessageHandlerSingleton`
215+
- `AddAsyncNonCyclicMessageHandlerTransient`
216+
- `AddAsyncNonCyclicMessageHandlerSingleton`
217+
218+
And this will look like this in your `Startup` code.
219+
220+
```c#
221+
public class Startup
222+
{
223+
public static IConfiguration Configuration;
224+
225+
public Startup(IConfiguration configuration)
226+
{
227+
Configuration = configuration;
228+
}
229+
230+
public void ConfigureServices(IServiceCollection services)
231+
{
232+
var clientConfiguration = Configuration.GetSection("RabbitMq");
233+
var exchangeConfiguration = Configuration.GetSection("RabbitMqExchange");
234+
services.AddRabbitMqClient(clientConfiguration)
235+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
236+
.AddMessageHandlerSingleton<CustomMessageHandler>("routing.key");
237+
}
238+
}
239+
```
240+
241+
RabbitMQ client and exchange configuration sections are not specified in this example, but covered [here](rabbit-configuration.md) and [here](exchange-configuration.md).
242+
243+
Message handlers can "listen" for messages by the **specified routing key**, or a **collection of routing keys**. If it is necessary, you can also register multiple message handler at once.
244+
245+
```c#
246+
services.AddRabbitMqClient(clientConfiguration)
247+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
248+
.AddMessageHandlerSingleton<CustomMessageHandler>("first.routing.key")
249+
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>(new[] { "second.routing.key", "third.routing.key" });
250+
```
251+
252+
You can also use **pattern matching** in routes where `*` (star) can substitute for exactly one word and `#` (hash) can substitute for zero or more words.
253+
254+
```c#
255+
services.AddRabbitMqClient(clientConfiguration)
256+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
257+
.AddMessageHandlerSingleton<CustomMessageHandler>("*.routing.*")
258+
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>(new[] { "#.key", "third.*" });
259+
```
260+
261+
You are also allowed to specify the exact exchange which will be "listened" by a message handler with the given routing key (or a pattern).
262+
263+
```c#
264+
services.AddRabbitMqClient(clientConfiguration)
265+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
266+
.AddMessageHandlerSingleton<CustomMessageHandler>("*.*.*", "ExchangeName")
267+
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>("routing.key", "ExchangeName");
268+
```
269+
270+
You can also set multiple message handlers for managing messages received by one routing key. This case can happen when you want to divide responsibilities between services (e.g. one contains business logic, and the other writes messages in the database).
271+
272+
```c#
273+
services.AddRabbitMqClient(clientConfiguration)
274+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
275+
.AddMessageHandlerSingleton<CustomMessageHandler>("first.routing.key")
276+
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>("first.routing.key")
277+
.AddMessageHandlerSingleton<OneMoreCustomMessageHandler>("first.routing.key");
278+
```
279+
280+
Since you are allowed to register multiple message handlers for one routing key (or one route pattern) you might want to make it run in a special order. You are allowed to do that too.
281+
282+
```c#
283+
services.AddRabbitMqClient(clientConfiguration)
284+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
285+
.AddMessageHandlerSingleton<CustomMessageHandler>("first.routing.key", order: 1)
286+
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>("first.routing.key", order: 20)
287+
.AddMessageHandlerSingleton<OneMoreCustomMessageHandler>("first.routing.key", order: 300);
288+
```
289+
290+
The higher order value - the more important message handler is. So it the previous code snippet the `OneMoreCustomMessageHandler` will process the received message first, `AnotherCustomMessageHandler` will be the second and `CustomMessageHandler` will be the third one.
291+
292+
You can also combine exchange and order configurations together!
293+
```c#
294+
services.AddRabbitMqClient(clientConfiguration)
295+
.AddExchange("ExchangeName", isConsuming: true, exchangeConfiguration)
296+
.AddMessageHandlerSingleton<CustomMessageHandler>("first.routing.key", "an.exchange", order: 1)
297+
.AddMessageHandlerSingleton<AnotherCustomMessageHandler>("first.routing.key", "an.exchange", order: 20)
298+
.AddMessageHandlerSingleton<OneMoreCustomMessageHandler>("first.routing.key", "an.exchange", order: 300)
299+
.AddMessageHandlerSingleton<SecondMessageHandler>("second.routing.key", "other.exchange", order: 10)
300+
.AddMessageHandlerSingleton<ThirdMessageHandler>("second.routing.key", "other.exchange", order: 20);
301+
```
302+
298303
### Workflow of message handling
299304

300305
The message handling process is organized as follows:
301306

302307
- `IQueueMessage` receives a message and delegates it to `IMessageHandlingService`.
303-
- `IMessageHandlingService` gets a message (as a byte array) and decodes it to the UTF8 string. It also checks if there are any message handlers in collections of `IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler` and `IAsyncNonCyclicMessageHandler` instances and forwards a message to them.
304-
- All subscribed message handlers (`IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler`, `IAsyncNonCyclicMessageHandler`) process the given message.
308+
- `IMessageHandlingService` gets a message (as a byte array) and decodes it to the UTF8 string. It also checks if there are any message handlers in a combined collection of `IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler` and `IAsyncNonCyclicMessageHandler` instances and forwards a message to them.
309+
- All subscribed message handlers (`IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler`, `IAsyncNonCyclicMessageHandler`) process the given message in a given or a default order.
305310
- `IMessageHandlingService` acknowledges the message by its `DeliveryTag`.
306311
- If any exception occurs `IMessageHandlingService` acknowledges the message anyway and checks if the message has to be re-send. If exchange option `RequeueFailedMessages` is set `true` then `IMessageHandlingService` adds a header `"requeued"` to the message and sends it again with delay in 60 seconds. Mechanism of sending delayed messages covered in the message production [documentation](message-production.md).
307312
- If any exception occurs within handling the message that has been already re-sent that message will not be re-send again (re-send happens only once).

0 commit comments

Comments
 (0)