Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit f0109f0

Browse files
Did some code style and small func fixes
1 parent fae1365 commit f0109f0

File tree

7 files changed

+53
-23
lines changed

7 files changed

+53
-23
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/f688764d2ba340099ec50b74726e25fd)](https://app.codacy.com/app/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection?utm_source=github.com&utm_medium=referral&utm_content=AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection&utm_campaign=Badge_Grade_Dashboard)<br/>
55
Wrapper-library of RabbitMQ.Client with Dependency Injection infrastructure under the .Net Core 2.2 platform.
66

7-
### Producer
7+
## Producer
88

99
First of all you have to add all service dependencies in the `ConfigureServices` method. `AddRabbitMqClient` adds `IQueueService` that can send messages and `AddExchange` configures and adds an exchange. You can add multiple exchanges but the queue service will be single (and it will be added as singleton obviously).
1010

@@ -75,7 +75,7 @@ queueService.Send(
7575
In order to make this possible, a default dead-letter-exchange with `"default.dlx.exchange"` name will be created. You can change it via main exchange configuration (example is down below).
7676
And also you have a default functionality of resending failed messages (if you get an error while processing recieved message).
7777

78-
### Consumer
78+
## Consumer
7979

8080
Lets imagine that you wanna make a consumer as a console application. Then code will look like this:
8181

@@ -205,12 +205,12 @@ services.AddRabbitMqClient(rabbitMqSection)
205205

206206
You can find example projects in the repository too.
207207

208-
### appsettings.json configuration
208+
## appsettings.json configuration
209209

210210
You have to add a couple configuration sections: (1) settings to connect to the RabbitMQ server and (2) a section that configures an exchange (one section per exchange frankly speaking).
211211
Exchange sections define how to bind queues and exchanges with each ohter and which routing keys to use for that.
212212
You can bind a queue to an exchange with more than one routing key, but if there are no routing keys in the queue section, then that queue will be bound to the exchange with its name.
213-
```
213+
```json
214214
{
215215
"RabbitMq": {
216216
"HostName": "127.0.0.1",
@@ -235,7 +235,7 @@ You can find example projects in the repository too.
235235
```
236236

237237
`Type`, `Durable`, `AutoDelete`, `DeadLetterExchange`, `RequeueFailedMessages` are set with default values in this example. So you can change it or leave it like this:
238-
```
238+
```json
239239
{
240240
"RabbitMq": {
241241
"HostName": "127.0.0.1",

src/Examples.ConsumerConsole/CustomAsyncMessageHandler.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ public class CustomAsyncMessageHandler : IAsyncMessageHandler
88
{
99
readonly ILogger<CustomAsyncMessageHandler> _logger;
1010

11-
public CustomAsyncMessageHandler(ILogger<CustomAsyncMessageHandler> logger) =>
11+
public CustomAsyncMessageHandler(ILogger<CustomAsyncMessageHandler> logger)
12+
{
1213
_logger = logger;
14+
}
1315

1416
public async Task Handle(string message, string routingKey)
1517
{
16-
await Task.Run(() => _logger.LogInformation("A weird example of runnig something async."));
18+
await Task.Run(() => _logger.LogInformation("A weird example of runnig something async.")).ConfigureAwait(false);
1719
}
1820
}
1921
}

src/Examples.ConsumerConsole/CustomAsyncNonCyclicMessageHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ public class CustomAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
88
{
99
readonly ILogger<CustomAsyncNonCyclicMessageHandler> _logger;
1010

11-
public CustomAsyncNonCyclicMessageHandler(ILogger<CustomAsyncNonCyclicMessageHandler> logger) =>
11+
public CustomAsyncNonCyclicMessageHandler(ILogger<CustomAsyncNonCyclicMessageHandler> logger)
12+
{
1213
_logger = logger;
14+
}
1315

1416
public async Task Handle(string message, string routingKey, IQueueService queueService)
1517
{

src/Examples.ConsumerHost/CustomMessageHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ namespace Examples.ConsumerHost
66
public class CustomMessageHandler : IMessageHandler
77
{
88
readonly ILogger<CustomMessageHandler> _logger;
9-
public CustomMessageHandler(ILogger<CustomMessageHandler> logger) =>
9+
public CustomMessageHandler(ILogger<CustomMessageHandler> logger)
10+
{
1011
_logger = logger;
12+
}
1113

1214
public void Handle(string message, string routingKey)
1315
{

src/Examples.Producer/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace Examples.Producer
88
{
9-
public class Program
9+
public static class Program
1010
{
1111
public static IConfiguration Configuration { get; set; }
1212

src/RabbitMQ.Client.Core.DependencyInjection/Configuration/RabbitMqExchangeOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class RabbitMqExchangeOptions
2020
/// <summary>
2121
/// AutoDelete option.
2222
/// </summary>
23-
public bool AutoDelete { get; set; } = false;
23+
public bool AutoDelete { get; set; }
2424

2525
/// <summary>
2626
/// Default dead-letter-exchange.

src/RabbitMQ.Client.Core.DependencyInjection/QueueService.cs

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal class QueueService : IQueueService
2828
public IModel Channel => _channel;
2929

3030
EventHandler<BasicDeliverEventArgs> _receivedMessage;
31-
bool _consumingStarted = false;
31+
bool _consumingStarted;
3232

3333
readonly IDictionary<string, IList<IMessageHandler>> _messageHandlers;
3434
readonly IDictionary<string, IList<IAsyncMessageHandler>> _asyncMessageHandlers;
@@ -128,8 +128,12 @@ public void StartConsuming()
128128
_consumingStarted = true;
129129

130130
foreach (var exchange in _exchanges)
131+
{
131132
foreach (var queue in exchange.Options.Queues)
133+
{
132134
_channel.BasicConsume(queue: queue.Name, autoAck: false, consumer: _consumer);
135+
}
136+
}
133137
}
134138

135139
public void Send<T>(T @object, string exchangeName, string routingKey) where T : class
@@ -201,28 +205,28 @@ public void Send(byte[] bytes, IBasicProperties properties, string exchangeName,
201205
}
202206

203207
public async Task SendAsync<T>(T @object, string exchangeName, string routingKey) where T : class =>
204-
await Task.Run(() => Send(@object, exchangeName, routingKey));
208+
await Task.Run(() => Send(@object, exchangeName, routingKey)).ConfigureAwait(false);
205209

206210
public async Task SendAsync<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class =>
207-
await Task.Run(() => Send(@object, exchangeName, routingKey, secondsDelay));
211+
await Task.Run(() => Send(@object, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
208212

209213
public async Task SendJsonAsync(string json, string exchangeName, string routingKey) =>
210-
await Task.Run(() => SendJson(json, exchangeName, routingKey));
214+
await Task.Run(() => SendJson(json, exchangeName, routingKey)).ConfigureAwait(false);
211215

212216
public async Task SendJsonAsync(string json, string exchangeName, string routingKey, int secondsDelay) =>
213-
await Task.Run(() => SendJson(json, exchangeName, routingKey, secondsDelay));
217+
await Task.Run(() => SendJson(json, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
214218

215219
public async Task SendStringAsync(string message, string exchangeName, string routingKey) =>
216-
await Task.Run(() => SendString(message, exchangeName, routingKey));
220+
await Task.Run(() => SendString(message, exchangeName, routingKey)).ConfigureAwait(false);
217221

218222
public async Task SendStringAsync(string message, string exchangeName, string routingKey, int secondsDelay) =>
219-
await Task.Run(() => SendString(message, exchangeName, routingKey, secondsDelay));
223+
await Task.Run(() => SendString(message, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
220224

221225
public async Task SendAsync(byte[] bytes, IBasicProperties properties, string exchangeName, string routingKey) =>
222-
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey));
226+
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey)).ConfigureAwait(false);
223227

224228
public async Task SendAsync(byte[] bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay) =>
225-
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey, secondsDelay));
229+
await Task.Run(() => Send(bytes, properties, exchangeName, routingKey, secondsDelay)).ConfigureAwait(false);
226230

227231
IBasicProperties CreateProperties()
228232
{
@@ -261,13 +265,15 @@ void HandleChannelBasicRecoverOk(object sender, EventArgs @event)
261265
{
262266
if (@event is null)
263267
return;
268+
264269
_logger.LogInformation("Connection has been reestablished.");
265270
}
266271

267272
void HandleChannelCallbackException(object sender, CallbackExceptionEventArgs @event)
268273
{
269274
if (@event is null)
270275
return;
276+
271277
_logger.LogError(new EventId(), @event.Exception, @event.Exception.Message, @event);
272278
}
273279

@@ -277,9 +283,13 @@ IDictionary<Type, List<string>> TransformMessageHandlerRouters(IEnumerable<Messa
277283
foreach (var router in routers)
278284
{
279285
if (dictionary.ContainsKey(router.Type))
286+
{
280287
dictionary[router.Type] = dictionary[router.Type].Union(router.RoutingKeys).ToList();
288+
}
281289
else
290+
{
282291
dictionary.Add(router.Type, router.RoutingKeys);
292+
}
283293
}
284294
return dictionary;
285295
}
@@ -295,10 +305,14 @@ IDictionary<string, IList<T>> TransformMessageHandlersCollection<T>(IEnumerable<
295305
if (dictionary.ContainsKey(routingKey))
296306
{
297307
if (!dictionary[routingKey].Any(x => x.GetType() == handler.GetType()))
308+
{
298309
dictionary[routingKey].Add(handler);
310+
}
299311
}
300312
else
313+
{
301314
dictionary.Add(routingKey, new List<T>() { handler });
315+
}
302316
}
303317
}
304318
return dictionary;
@@ -319,7 +333,9 @@ void StartClient()
319333
{
320334
var tasks = new List<Task>();
321335
foreach (var handler in _asyncMessageHandlers[@event.RoutingKey])
336+
{
322337
tasks.Add(RunAsyncHandler(handler, message, @event.RoutingKey));
338+
}
323339
Task.WaitAll(tasks.ToArray());
324340
}
325341
if (_messageHandlers.ContainsKey(@event.RoutingKey))
@@ -335,7 +351,9 @@ void StartClient()
335351
{
336352
var tasks = new List<Task>();
337353
foreach (var handler in _asyncNonCyclicHandlers[@event.RoutingKey])
354+
{
338355
tasks.Add(RunAsyncNonCyclicHandler(handler, message, @event.RoutingKey));
356+
}
339357
Task.WaitAll(tasks.ToArray());
340358
}
341359
if (_nonCyclicHandlers.ContainsKey(@event.RoutingKey))
@@ -385,10 +403,14 @@ void StartClient()
385403
.Distinct();
386404

387405
foreach (var exchangeName in deadLetterExchanges)
406+
{
388407
StartDeadLetterExchange(exchangeName);
408+
}
389409

390410
foreach (var exchange in _exchanges)
411+
{
391412
StartExchange(exchange);
413+
}
392414
}
393415

394416
async Task RunAsyncHandler(IAsyncMessageHandler handler, string message, string routingKey)
@@ -423,7 +445,9 @@ void StartExchange(RabbitMqExchange exchange)
423445
arguments: exchange.Options.Arguments);
424446

425447
foreach (var queue in exchange.Options.Queues)
448+
{
426449
StartQueue(queue, exchange.Name);
450+
}
427451
}
428452

429453
void StartQueue(RabbitMqQueueOptions queue, string exchangeName)
@@ -460,8 +484,7 @@ void ValidateArguments(string exchangeName, string routingKey)
460484
throw new ArgumentException($"Argument {nameof(routingKey)} is null or empty.", nameof(routingKey));
461485

462486
var deadLetterExchanges = _exchanges.Select(x => x.Options.DeadLetterExchange).Distinct();
463-
if (!_exchanges.Any(x => x.Name == exchangeName)
464-
&& !deadLetterExchanges.Any(x => x == exchangeName))
487+
if (!_exchanges.Any(x => x.Name == exchangeName) && !deadLetterExchanges.Any(x => x == exchangeName))
465488
throw new ArgumentException($"Exchange {nameof(exchangeName)} has not been deaclared yet.", nameof(exchangeName));
466489
}
467490

@@ -470,6 +493,7 @@ string GetDeadLetterExchange(string exchangeName)
470493
var exchange = _exchanges.FirstOrDefault(x => x.Name == exchangeName);
471494
if (string.IsNullOrEmpty(exchange.Options.DeadLetterExchange))
472495
throw new ArgumentException($"Exchange {nameof(exchangeName)} has not been configured with a dead letter exchange.", nameof(exchangeName));
496+
473497
return exchange.Options.DeadLetterExchange;
474498
}
475499

@@ -492,7 +516,7 @@ string DeclareDelayedQueue(string exchange, string deadLetterExchange, string ro
492516
return delayedQueueName;
493517
}
494518

495-
Dictionary<string, object> CreateArguments(string exchangeName, string routingKey, int secondsDelay) =>
519+
static Dictionary<string, object> CreateArguments(string exchangeName, string routingKey, int secondsDelay) =>
496520
new Dictionary<string, object>
497521
{
498522
{ "x-dead-letter-exchange", exchangeName },

0 commit comments

Comments
 (0)