Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit e941538

Browse files
Merge pull request #47 from AntonyVorontsov/feature/retries-for-resending
WIP: Started making retries for failed messages.
2 parents cb052f8 + 5a424f9 commit e941538

File tree

13 files changed

+240
-119
lines changed

13 files changed

+240
-119
lines changed

docs/changelog.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22

33
All notable changes to this library will be documented in this file.
44

5+
## [4.2.0] - will be drafted
6+
7+
### Added
8+
9+
- Options that can allow configuring behaviour of re-queueing messages. New properties `RequeueTimeoutMilliseconds` and `RequeueAttempts` added to `RabbitMqExchangeOptions`.
10+
11+
### Changed
12+
13+
- **Breaking!** Now all `Send` or `SendAsync` methods of `IProducingService` (and `IQueueService`) with delay parameter use milliseconds instead of seconds.
14+
515
## [4.1.1] - 2020-05-30
616

717
### Fixed

docs/exchange-configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ And the `appsettings.json` file will be like this.
4242
"AutoDelete": false,
4343
"DeadLetterExchange": "default.dlx.exchange",
4444
"RequeueFailedMessages": true,
45+
"RequeueTimeoutMilliseconds": 200,
46+
"RequeueAttempts": 2,
4547
"Arguments": { "key": "value" },
4648
"Queues": [
4749
{
@@ -65,6 +67,8 @@ Exchanges can be configured with properties:
6567
- `AutoDelete` - an option for exchange auto deleting. The default value is `false`.
6668
- `Arguments` - a dictionary of additional arguments. The default value is `null`.
6769
- `RequeueFailedMessages` - an option that specifies behaviour of re-queueing failed messages with certain delay through the dead-letter-exchange. The default value is `true`. The mechanism of sending delayed messages is covered in the [documentation](message-production.md).
70+
- `RequeueTimeoutMilliseconds` - timeout in milliseconds after which the message will be re-queued. The default value is 200.
71+
- `RequeueAttempts` - a number of attempts which queueing service will try to re-queue a message. The default value is 2.
6872
- `DeadLetterExchange` - a value for dead-letter-exchange. The default value for the dead-letter-exchange name is `"default.dlx.exchange"`.
6973
- `Queues` - a collection of queues bound to the exchange.
7074

docs/message-consumption.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,13 @@ services.AddRabbitMqClient(clientConfiguration)
303303

304304
### Workflow of message handling
305305

306-
The message handling process is organized as follows:
306+
The message handling process organized as follows:
307307

308308
- `IQueueMessage` receives a message and delegates it to `IMessageHandlingService`.
309309
- `IMessageHandlingService` gets a message (as a byte array) and decodes it to the UTF8 string. It also checks if there are any message handlers in a combined collection of `IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler` and `IAsyncNonCyclicMessageHandler` instances and forwards a message to them.
310310
- All subscribed message handlers (`IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler`, `IAsyncNonCyclicMessageHandler`) process the given message in a given or a default order.
311311
- `IMessageHandlingService` acknowledges the message by its `DeliveryTag`.
312-
- If any exception occurs `IMessageHandlingService` acknowledges the message anyway and checks if the message has to be re-send. If exchange option `RequeueFailedMessages` is set `true` then `IMessageHandlingService` adds a header `"requeued"` to the message and sends it again with delay in 60 seconds. Mechanism of sending delayed messages covered in the message production [documentation](message-production.md).
312+
- If any exception occurs `IMessageHandlingService` acknowledges the message anyway and checks if the message has to be re-send. If exchange option `RequeueFailedMessages` is set `true` then `IMessageHandlingService` adds a header `"re-queue-attempts"` to the message and sends it again with delay in value of `RequeueTimeoutMilliseconds` (default is 200 milliseconds). The number of attempts is configurable and re-delivery will be made that many times as the value of `RequeueAttempts` property. Mechanism of sending delayed messages covered in the message production [documentation](message-production.md).
313313
- If any exception occurs within handling the message that has been already re-sent that message will not be re-send again (re-send happens only once).
314314

315315
### Batch message handlers

docs/message-production.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,24 +90,24 @@ queueService.Send(bytes, properties, exchangeName: "exchange.name", routingKey:
9090
await queueService.SendAsync(bytes, properties, exchangeName: "exchange.name", routingKey: "routing.key");
9191
```
9292

93-
You are also allowed to send messages with delay (in seconds). All of previously listed methods have an overload that takes a `delay` parameter.
93+
You are also allowed to send messages with delay (in milliseconds). All of previously listed methods have an overload that takes a `delay` parameter.
9494

9595
```c#
9696
// Objects
97-
queueService.Send(message, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
98-
await queueService.SendAsync(message, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
97+
queueService.Send(message, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
98+
await queueService.SendAsync(message, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
9999

100100
// Json
101-
queueService.SendJson(message, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
102-
await queueService.SendJsonAsync(message, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
101+
queueService.SendJson(message, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
102+
await queueService.SendJsonAsync(message, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
103103

104104
// Strings
105-
queueService.SendString(message, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
106-
await queueService.SendStringAsync(message, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
105+
queueService.SendString(message, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
106+
await queueService.SendStringAsync(message, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
107107

108108
// Bytes
109-
queueService.Send(bytes, properties, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
110-
await queueService.SendAsync(bytes, properties, exchangeName: "exchange.name", routingKey: "routing.key", secondsDelay: 10);
109+
queueService.Send(bytes, properties, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
110+
await queueService.SendAsync(bytes, properties, exchangeName: "exchange.name", routingKey: "routing.key", millisecondsDelay: 10);
111111
```
112112

113113
### Mechanism of sending delayed messages
@@ -116,15 +116,15 @@ The implementation of sending deferred (delayed) messages in this project is qui
116116
The image below shows a model of the whole process of passing the message from the producer to the consumer.
117117
![Model of sending delayed messages](./images/delayed-message-model.png)
118118

119-
**Prerequisites.** Let's say that producer want to send a message to the exchange **"Exchange B"** with a routing key **"routing.key"**, and a delay in **30 seconds**.
119+
**Prerequisites.** Let's say that producer want to send a message to the exchange **"Exchange B"** with a routing key **"routing.key"**, and a delay in **30 milliseconds**.
120120
- Message goes to the exchange **"Exchange A"** whose responsibility is to manage delaying (storing) the message.
121-
- After that a queue with a compound name and special arguments is being created. Name consists of three parts: the routing key of the sent message, a word "delayed", and a number of delay seconds .
121+
- After that a queue with a compound name and special arguments is being created. Name consists of three parts: the routing key of the sent message, a word "delayed", and a number of delay milliseconds .
122122
Queue arguments are as follows.
123123
```
124124
x-dead-letter-exchange : Exchange В
125125
x-dead-letter-routing-key : routing.key
126-
x-message-ttl : secondsDelay * 1000
127-
x-expires : secondsDelay * 1000 + 60000
126+
x-message-ttl : millisecondsDelay
127+
x-expires : millisecondsDelay + 60000
128128
```
129129
- A message, which gets in that queue, will have a specified ttl (time to live), and an exchange to which the message will be sent after expiration.
130130
- That new queue bounds to the **Exchange A**. That queue will be automatically deleted if there are no more messages in it within a minute.

readme.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ _queueService.Send(
6565
@object: messageObject,
6666
exchangeName: "exchange.name",
6767
routingKey: "routing.key",
68-
secondsDelay: 10);
68+
millisecondsDelay: 10);
6969
```
7070

7171
The mechanism of sending delayed messages described in the [documentation](./docs/message-production.md). Dive into it for more detailed information.
@@ -283,14 +283,14 @@ When the message collection is full to the size of `PrefetchCount` they are pass
283283

284284
## Advanced usage and nuances
285285

286-
RabbitMQ client implemented in this library (class which implements `IQueueService`) opens two connections to the RabbitMQ server. One connection is used for message production and the other one is for message consumption.
286+
RabbitMQ client implemented in this library (class which implements `IQueueService`) opens two connections to the RabbitMQ server. One connection is used for message production, and the other one is for message consumption.
287287
This behavior covered in the [advanced usage documentation file](./docs/advanced-usage.md), dive into it deeply if you want to control the client behavior tighter.
288288

289-
There is also an [example project](./examples/Examples.AdvancedConfiguration) that demonstrates an advances usage of the RabbitMQ client.
289+
There is also an [example project](./examples/Examples.AdvancedConfiguration) that demonstrates an advanced usage of the RabbitMQ client.
290290

291291
## Changelog
292292

293-
All notable changes are being tracked in the [changelog](./docs/changelog.md) file.
293+
All notable changes covered in the [changelog](./docs/changelog.md) file.
294294

295295
## License
296296

src/RabbitMQ.Client.Core.DependencyInjection/Configuration/RabbitMqClientOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class RabbitMqClientOptions
2222
/// Collection of RabbitMQ server host names.
2323
/// It can be used when RabbitMQ HA cluster is running, and you want to connect multiple hosts.
2424
/// If HostNames collection is null or empty then HostName will be used to create connection.
25-
/// Otherwise HostNames collection will be used and HostName property value will be ignored.
25+
/// Otherwise, HostNames collection will be used and HostName property value will be ignored.
2626
/// </summary>
2727
/// <remarks>
2828
/// Has the second priority between properties TcpEndpoints, HostNames and HostName.

src/RabbitMQ.Client.Core.DependencyInjection/Configuration/RabbitMqExchangeOptions.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,20 @@ public class RabbitMqExchangeOptions
2828
public string DeadLetterExchange { get; set; } = "default.dlx.exchange";
2929

3030
/// <summary>
31-
/// Option to re-queue failed messages (once).
31+
/// Option to re-queue failed messages.
3232
/// </summary>
3333
public bool RequeueFailedMessages { get; set; } = true;
3434

35+
/// <summary>
36+
/// Re-queue message attempts.
37+
/// </summary>
38+
public int RequeueAttempts { get; set; } = 2;
39+
40+
/// <summary>
41+
/// Re-queue timeout in milliseconds.
42+
/// </summary>
43+
public int RequeueTimeoutMilliseconds { get; set; } = 200;
44+
3545
/// <summary>
3646
/// Additional arguments.
3747
/// </summary>

src/RabbitMQ.Client.Core.DependencyInjection/Services/IProducingService.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public interface IProducingService
3434
/// <param name="object">Object message.</param>
3535
/// <param name="exchangeName">Exchange name.</param>
3636
/// <param name="routingKey">Routing key.</param>
37-
/// <param name="secondsDelay">Delay time.</param>
38-
void Send<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class;
37+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
38+
void Send<T>(T @object, string exchangeName, string routingKey, int millisecondsDelay) where T : class;
3939

4040
/// <summary>
4141
/// Send a message.
@@ -51,8 +51,8 @@ public interface IProducingService
5151
/// <param name="json"></param>
5252
/// <param name="exchangeName"></param>
5353
/// <param name="routingKey"></param>
54-
/// <param name="secondsDelay">Delay time.</param>
55-
void SendJson(string json, string exchangeName, string routingKey, int secondsDelay);
54+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
55+
void SendJson(string json, string exchangeName, string routingKey, int millisecondsDelay);
5656

5757
/// <summary>
5858
/// Send a message.
@@ -68,8 +68,8 @@ public interface IProducingService
6868
/// <param name="message">Message.</param>
6969
/// <param name="exchangeName">Exchange name.</param>
7070
/// <param name="routingKey">Routing key.</param>
71-
/// <param name="secondsDelay">Delay time.</param>
72-
void SendString(string message, string exchangeName, string routingKey, int secondsDelay);
71+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
72+
void SendString(string message, string exchangeName, string routingKey, int millisecondsDelay);
7373

7474
/// <summary>
7575
/// Send a message.
@@ -87,8 +87,8 @@ public interface IProducingService
8787
/// <param name="properties">Message properties.</param>
8888
/// <param name="exchangeName">Exchange name.</param>
8989
/// <param name="routingKey">Routing key.</param>
90-
/// <param name="secondsDelay">Delay time.</param>
91-
void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay);
90+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
91+
void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int millisecondsDelay);
9292

9393
/// <summary>
9494
/// Send a message asynchronously.
@@ -106,8 +106,8 @@ public interface IProducingService
106106
/// <param name="object">Object message.</param>
107107
/// <param name="exchangeName">Exchange name.</param>
108108
/// <param name="routingKey">Routing key.</param>
109-
/// <param name="secondsDelay">Delay time.</param>
110-
Task SendAsync<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class;
109+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
110+
Task SendAsync<T>(T @object, string exchangeName, string routingKey, int millisecondsDelay) where T : class;
111111

112112
/// <summary>
113113
/// Send a message asynchronously.
@@ -123,8 +123,8 @@ public interface IProducingService
123123
/// <param name="json">Json message.</param>
124124
/// <param name="exchangeName">Exchange name.</param>
125125
/// <param name="routingKey">Routing key.</param>
126-
/// <param name="secondsDelay">Delay time.</param>
127-
Task SendJsonAsync(string json, string exchangeName, string routingKey, int secondsDelay);
126+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
127+
Task SendJsonAsync(string json, string exchangeName, string routingKey, int millisecondsDelay);
128128

129129
/// <summary>
130130
/// Send a message asynchronously.
@@ -140,8 +140,8 @@ public interface IProducingService
140140
/// <param name="message">Message.</param>
141141
/// <param name="exchangeName">Exchange name.</param>
142142
/// <param name="routingKey">Routing key.</param>
143-
/// <param name="secondsDelay">Delay time.</param>
144-
Task SendStringAsync(string message, string exchangeName, string routingKey, int secondsDelay);
143+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
144+
Task SendStringAsync(string message, string exchangeName, string routingKey, int millisecondsDelay);
145145

146146
/// <summary>
147147
/// Send a message asynchronously.
@@ -159,7 +159,7 @@ public interface IProducingService
159159
/// <param name="properties">Message properties.</param>
160160
/// <param name="exchangeName">Exchange name.</param>
161161
/// <param name="routingKey">Routing key.</param>
162-
/// <param name="secondsDelay">Delay time.</param>
163-
Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay);
162+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
163+
Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int millisecondsDelay);
164164
}
165165
}

0 commit comments

Comments
 (0)