Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 4c1e1fe

Browse files
author
Anton Vorontsov
committed
Changed delay from seconds to milliseconds.
1 parent 7280360 commit 4c1e1fe

File tree

3 files changed

+40
-39
lines changed

3 files changed

+40
-39
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/Services/IProducingService.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public interface IProducingService
3434
/// <param name="object">Object message.</param>
3535
/// <param name="exchangeName">Exchange name.</param>
3636
/// <param name="routingKey">Routing key.</param>
37-
/// <param name="secondsDelay">Delay time.</param>
38-
void Send<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class;
37+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
38+
void Send<T>(T @object, string exchangeName, string routingKey, int millisecondsDelay) where T : class;
3939

4040
/// <summary>
4141
/// Send a message.
@@ -51,8 +51,8 @@ public interface IProducingService
5151
/// <param name="json"></param>
5252
/// <param name="exchangeName"></param>
5353
/// <param name="routingKey"></param>
54-
/// <param name="secondsDelay">Delay time.</param>
55-
void SendJson(string json, string exchangeName, string routingKey, int secondsDelay);
54+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
55+
void SendJson(string json, string exchangeName, string routingKey, int millisecondsDelay);
5656

5757
/// <summary>
5858
/// Send a message.
@@ -68,8 +68,8 @@ public interface IProducingService
6868
/// <param name="message">Message.</param>
6969
/// <param name="exchangeName">Exchange name.</param>
7070
/// <param name="routingKey">Routing key.</param>
71-
/// <param name="secondsDelay">Delay time.</param>
72-
void SendString(string message, string exchangeName, string routingKey, int secondsDelay);
71+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
72+
void SendString(string message, string exchangeName, string routingKey, int millisecondsDelay);
7373

7474
/// <summary>
7575
/// Send a message.
@@ -87,8 +87,8 @@ public interface IProducingService
8787
/// <param name="properties">Message properties.</param>
8888
/// <param name="exchangeName">Exchange name.</param>
8989
/// <param name="routingKey">Routing key.</param>
90-
/// <param name="secondsDelay">Delay time.</param>
91-
void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay);
90+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
91+
void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int millisecondsDelay);
9292

9393
/// <summary>
9494
/// Send a message asynchronously.
@@ -106,8 +106,8 @@ public interface IProducingService
106106
/// <param name="object">Object message.</param>
107107
/// <param name="exchangeName">Exchange name.</param>
108108
/// <param name="routingKey">Routing key.</param>
109-
/// <param name="secondsDelay">Delay time.</param>
110-
Task SendAsync<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class;
109+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
110+
Task SendAsync<T>(T @object, string exchangeName, string routingKey, int millisecondsDelay) where T : class;
111111

112112
/// <summary>
113113
/// Send a message asynchronously.
@@ -123,8 +123,8 @@ public interface IProducingService
123123
/// <param name="json">Json message.</param>
124124
/// <param name="exchangeName">Exchange name.</param>
125125
/// <param name="routingKey">Routing key.</param>
126-
/// <param name="secondsDelay">Delay time.</param>
127-
Task SendJsonAsync(string json, string exchangeName, string routingKey, int secondsDelay);
126+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
127+
Task SendJsonAsync(string json, string exchangeName, string routingKey, int millisecondsDelay);
128128

129129
/// <summary>
130130
/// Send a message asynchronously.
@@ -140,8 +140,8 @@ public interface IProducingService
140140
/// <param name="message">Message.</param>
141141
/// <param name="exchangeName">Exchange name.</param>
142142
/// <param name="routingKey">Routing key.</param>
143-
/// <param name="secondsDelay">Delay time.</param>
144-
Task SendStringAsync(string message, string exchangeName, string routingKey, int secondsDelay);
143+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
144+
Task SendStringAsync(string message, string exchangeName, string routingKey, int millisecondsDelay);
145145

146146
/// <summary>
147147
/// Send a message asynchronously.
@@ -159,7 +159,7 @@ public interface IProducingService
159159
/// <param name="properties">Message properties.</param>
160160
/// <param name="exchangeName">Exchange name.</param>
161161
/// <param name="routingKey">Routing key.</param>
162-
/// <param name="secondsDelay">Delay time.</param>
163-
Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay);
162+
/// <param name="millisecondsDelay">Delay time in milliseconds.</param>
163+
Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int millisecondsDelay);
164164
}
165165
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/MessageHandlingService.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace RabbitMQ.Client.Core.DependencyInjection.Services
1616
/// </summary>
1717
public class MessageHandlingService : IMessageHandlingService
1818
{
19+
// TODO: Убрать это.
1920
const int ResendTimeout = 60;
2021

2122
readonly IEnumerable<RabbitMqExchange> _exchanges;
@@ -52,7 +53,7 @@ public async Task HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs, I
5253
catch (Exception exception)
5354
{
5455
_logger.LogError(new EventId(), exception, $"An error occurred while processing received message with the delivery tag {eventArgs.DeliveryTag}.");
55-
await HandleFailedMessageProcessing(eventArgs, queueService);
56+
await HandleFailedMessageProcessing(eventArgs, queueService).ConfigureAwait(false);
5657
}
5758
finally
5859
{

src/RabbitMQ.Client.Core.DependencyInjection/Services/QueueService.cs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ public void Send<T>(T @object, string exchangeName, string routingKey) where T :
173173
Send(bytes, properties, exchangeName, routingKey);
174174
}
175175

176-
public void Send<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class
176+
public void Send<T>(T @object, string exchangeName, string routingKey, int millisecondsDelay) where T : class
177177
{
178178
EnsureProducingChannelIsNotNull();
179179
ValidateArguments(exchangeName, routingKey);
180180
var deadLetterExchange = GetDeadLetterExchange(exchangeName);
181-
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, secondsDelay);
181+
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, millisecondsDelay);
182182
Send(@object, deadLetterExchange, delayedQueueName);
183183
}
184184

@@ -191,12 +191,12 @@ public void SendJson(string json, string exchangeName, string routingKey)
191191
Send(bytes, properties, exchangeName, routingKey);
192192
}
193193

194-
public void SendJson(string json, string exchangeName, string routingKey, int secondsDelay)
194+
public void SendJson(string json, string exchangeName, string routingKey, int millisecondsDelay)
195195
{
196196
EnsureProducingChannelIsNotNull();
197197
ValidateArguments(exchangeName, routingKey);
198198
var deadLetterExchange = GetDeadLetterExchange(exchangeName);
199-
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, secondsDelay);
199+
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, millisecondsDelay);
200200
SendJson(json, deadLetterExchange, delayedQueueName);
201201
}
202202

@@ -208,12 +208,12 @@ public void SendString(string message, string exchangeName, string routingKey)
208208
Send(bytes, CreateProperties(), exchangeName, routingKey);
209209
}
210210

211-
public void SendString(string message, string exchangeName, string routingKey, int secondsDelay)
211+
public void SendString(string message, string exchangeName, string routingKey, int millisecondsDelay)
212212
{
213213
EnsureProducingChannelIsNotNull();
214214
ValidateArguments(exchangeName, routingKey);
215215
var deadLetterExchange = GetDeadLetterExchange(exchangeName);
216-
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, secondsDelay);
216+
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, millisecondsDelay);
217217
SendString(message, deadLetterExchange, delayedQueueName);
218218
}
219219

@@ -230,38 +230,38 @@ public void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string
230230
}
231231
}
232232

233-
public void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay)
233+
public void Send(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int millisecondsDelay)
234234
{
235235
EnsureProducingChannelIsNotNull();
236236
ValidateArguments(exchangeName, routingKey);
237237
var deadLetterExchange = GetDeadLetterExchange(exchangeName);
238-
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, secondsDelay);
238+
var delayedQueueName = DeclareDelayedQueue(exchangeName, deadLetterExchange, routingKey, millisecondsDelay);
239239
Send(bytes, properties, deadLetterExchange, delayedQueueName);
240240
}
241241

242242
public async Task SendAsync<T>(T @object, string exchangeName, string routingKey) where T : class =>
243243
await Task.Run(() => Send(@object, exchangeName, routingKey)).ConfigureAwait(false);
244244

245-
public async Task SendAsync<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class =>
246-
await Task.Run(() => Send(@object, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
245+
public async Task SendAsync<T>(T @object, string exchangeName, string routingKey, int millisecondsDelay) where T : class =>
246+
await Task.Run(() => Send(@object, exchangeName, routingKey, millisecondsDelay)).ConfigureAwait(false);
247247

248248
public async Task SendJsonAsync(string json, string exchangeName, string routingKey) =>
249249
await Task.Run(() => SendJson(json, exchangeName, routingKey)).ConfigureAwait(false);
250250

251-
public async Task SendJsonAsync(string json, string exchangeName, string routingKey, int secondsDelay) =>
252-
await Task.Run(() => SendJson(json, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
251+
public async Task SendJsonAsync(string json, string exchangeName, string routingKey, int millisecondsDelay) =>
252+
await Task.Run(() => SendJson(json, exchangeName, routingKey, millisecondsDelay)).ConfigureAwait(false);
253253

254254
public async Task SendStringAsync(string message, string exchangeName, string routingKey) =>
255255
await Task.Run(() => SendString(message, exchangeName, routingKey)).ConfigureAwait(false);
256256

257-
public async Task SendStringAsync(string message, string exchangeName, string routingKey, int secondsDelay) =>
258-
await Task.Run(() => SendString(message, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
257+
public async Task SendStringAsync(string message, string exchangeName, string routingKey, int millisecondsDelay) =>
258+
await Task.Run(() => SendString(message, exchangeName, routingKey, millisecondsDelay)).ConfigureAwait(false);
259259

260260
public async Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey) =>
261261
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey)).ConfigureAwait(false);
262262

263-
public async Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay) =>
264-
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
263+
public async Task SendAsync(ReadOnlyMemory<byte> bytes, IBasicProperties properties, string exchangeName, string routingKey, int millisecondsDelay) =>
264+
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey, millisecondsDelay)).ConfigureAwait(false);
265265

266266
IBasicProperties CreateProperties()
267267
{
@@ -472,10 +472,10 @@ string GetDeadLetterExchange(string exchangeName)
472472
return exchange.Options.DeadLetterExchange;
473473
}
474474

475-
string DeclareDelayedQueue(string exchange, string deadLetterExchange, string routingKey, int secondsDelay)
475+
string DeclareDelayedQueue(string exchange, string deadLetterExchange, string routingKey, int millisecondsDelay)
476476
{
477-
var delayedQueueName = $"{routingKey}.delayed.{secondsDelay}";
478-
var arguments = CreateArguments(exchange, routingKey, secondsDelay);
477+
var delayedQueueName = $"{routingKey}.delayed.{millisecondsDelay}";
478+
var arguments = CreateArguments(exchange, routingKey, millisecondsDelay);
479479

480480
Channel.QueueDeclare(
481481
queue: delayedQueueName,
@@ -491,13 +491,13 @@ string DeclareDelayedQueue(string exchange, string deadLetterExchange, string ro
491491
return delayedQueueName;
492492
}
493493

494-
static Dictionary<string, object> CreateArguments(string exchangeName, string routingKey, int secondsDelay) =>
494+
static Dictionary<string, object> CreateArguments(string exchangeName, string routingKey, int millisecondsDelay) =>
495495
new Dictionary<string, object>
496496
{
497497
{ "x-dead-letter-exchange", exchangeName },
498498
{ "x-dead-letter-routing-key", routingKey },
499-
{ "x-message-ttl", secondsDelay * 1000 },
500-
{ "x-expires", secondsDelay * 1000 + QueueExpirationTime }
499+
{ "x-message-ttl", millisecondsDelay },
500+
{ "x-expires", millisecondsDelay + QueueExpirationTime }
501501
};
502502

503503
Task ConsumerOnReceived(object sender, BasicDeliverEventArgs eventArgs) => _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);

0 commit comments

Comments
 (0)