Skip to content

Commit cd0df7a

Browse files
steel97zarusz
authored andcommitted
feat(nats): add queues support
Signed-off-by: Ivan Yv <steel-97@mail.ru>
1 parent 41343b8 commit cd0df7a

16 files changed

+845
-262
lines changed

docs/provider_nats.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@ builder.Services.AddSlimMessageBus(mbb =>
2424
cfg.AuthOpts = NatsAuthOpts.Default;
2525
});
2626

27+
// pub/sub
2728
mbb
2829
.Produce<PingMessage>(x => x.DefaultTopic(topic))
2930
.Consume<PingMessage>(x => x.Topic(topic).Instances(1));
3031

32+
// queue
33+
mbb
34+
.Produce<QueueMessage>(x => x.DefaultQueue(topic))
35+
.Consume<QueueMessage>(x => x.Queue(topic).Instances(1));
36+
3137
mbb.AddServicesFromAssemblyContaining<PingConsumer>();
38+
mbb.AddServicesFromAssemblyContaining<QueueMessage>();
3239
mbb.AddJsonSerializer();
3340
});
3441
```

src/Samples/Sample.Nats.WebApi/PingConsumer.cs renamed to src/Samples/Sample.Nats.WebApi/Consumers.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,15 @@ public Task OnHandle(PingMessage message, CancellationToken cancellationToken)
1111
logger.LogInformation("Got message {Counter} on topic {Path}", message.Counter, Context?.Path);
1212
return Task.CompletedTask;
1313
}
14+
}
15+
16+
public class QueueConsumer(ILogger<QueueConsumer> logger) : IConsumer<QueueMessage>, IConsumerWithContext
17+
{
18+
public IConsumerContext? Context { get; set; }
19+
20+
public Task OnHandle(QueueMessage message, CancellationToken cancellationToken)
21+
{
22+
logger.LogInformation("Got message {Counter} on queue {Path}", message.Counter, Context?.Path);
23+
return Task.CompletedTask;
24+
}
1425
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
namespace Sample.Nats.WebApi;
22

33
public record PingMessage(int Counter, Guid Value);
4+
public record QueueMessage(int Counter, Guid Value);
45

Lines changed: 69 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,71 @@
1-
using NATS.Client.Core;
2-
3-
using Sample.Nats.WebApi;
4-
5-
using SecretStore;
6-
7-
using SlimMessageBus;
8-
using SlimMessageBus.Host;
1+
using NATS.Client.Core;
2+
3+
using Sample.Nats.WebApi;
4+
5+
using SecretStore;
6+
7+
using SlimMessageBus;
8+
using SlimMessageBus.Host;
9+
using SlimMessageBus.Host.Nats;
910
using SlimMessageBus.Host.Nats.Config;
10-
using SlimMessageBus.Host.Serialization.SystemTextJson;
11-
12-
var builder = WebApplication.CreateBuilder(args);
13-
14-
builder.Services.AddEndpointsApiExplorer();
15-
builder.Services.AddSwaggerGen();
16-
17-
Secrets.Load(@"..\..\..\..\..\secrets.txt");
18-
19-
var endpoint = Secrets.Service.PopulateSecrets(builder.Configuration["Nats:Endpoint"]);
20-
var topic = Secrets.Service.PopulateSecrets(builder.Configuration["Nats:Topic"]);
21-
22-
// doc:fragment:ExampleConfiguringMessageBus
23-
builder.Services.AddSlimMessageBus(mbb =>
24-
{
25-
mbb.WithProviderNats(cfg =>
26-
{
27-
cfg.Endpoint = endpoint;
28-
cfg.ClientName = $"MyService_{Environment.MachineName}";
29-
cfg.AuthOpts = NatsAuthOpts.Default;
30-
});
31-
32-
mbb
33-
.Produce<PingMessage>(x => x.DefaultTopic(topic))
34-
.Consume<PingMessage>(x => x.Topic(topic).Instances(1));
35-
36-
mbb.AddServicesFromAssemblyContaining<PingConsumer>();
37-
mbb.AddJsonSerializer();
38-
});
39-
// doc:fragment:ExampleConfiguringMessageBus
40-
41-
var app = builder.Build();
42-
43-
if (app.Environment.IsDevelopment())
44-
{
45-
app.UseSwagger();
46-
app.UseSwaggerUI();
47-
}
48-
49-
app.UseHttpsRedirection();
50-
51-
app.MapGet("/publish-message", (IMessageBus bus, CancellationToken cancellationToken) =>
52-
{
53-
PingMessage pingMessage = new(0, Guid.NewGuid());
54-
bus.Publish(pingMessage, cancellationToken: cancellationToken);
55-
});
56-
11+
using SlimMessageBus.Host.Serialization.SystemTextJson;
12+
13+
var builder = WebApplication.CreateBuilder(args);
14+
15+
builder.Services.AddEndpointsApiExplorer();
16+
builder.Services.AddSwaggerGen();
17+
18+
Secrets.Load(@"..\..\..\..\..\secrets.txt");
19+
20+
var endpoint = Secrets.Service.PopulateSecrets(builder.Configuration["Nats:Endpoint"]);
21+
var topic = Secrets.Service.PopulateSecrets(builder.Configuration["Nats:Topic"]);
22+
23+
// doc:fragment:ExampleConfiguringMessageBus
24+
builder.Services.AddSlimMessageBus(mbb =>
25+
{
26+
mbb.WithProviderNats(cfg =>
27+
{
28+
cfg.Endpoint = endpoint;
29+
cfg.ClientName = $"MyService_{Environment.MachineName}";
30+
cfg.AuthOpts = NatsAuthOpts.Default;
31+
});
32+
33+
// pub/sub
34+
mbb
35+
.Produce<PingMessage>(x => x.DefaultTopic(topic))
36+
.Consume<PingMessage>(x => x.Topic(topic).Instances(1));
37+
38+
// queue
39+
mbb
40+
.Produce<QueueMessage>(x => x.DefaultQueue(topic))
41+
.Consume<QueueMessage>(x => x.Queue(topic).Instances(1));
42+
43+
mbb.AddServicesFromAssemblyContaining<PingConsumer>();
44+
mbb.AddServicesFromAssemblyContaining<QueueMessage>();
45+
mbb.AddJsonSerializer();
46+
});
47+
// doc:fragment:ExampleConfiguringMessageBus
48+
49+
var app = builder.Build();
50+
51+
if (app.Environment.IsDevelopment())
52+
{
53+
app.UseSwagger();
54+
app.UseSwaggerUI();
55+
}
56+
57+
app.UseHttpsRedirection();
58+
59+
app.MapGet("/publish-message", (IMessageBus bus, CancellationToken cancellationToken) =>
60+
{
61+
PingMessage pingMessage = new(0, Guid.NewGuid());
62+
bus.Publish(pingMessage, cancellationToken: cancellationToken);
63+
});
64+
65+
app.MapGet("/publish-message-queue", (IMessageBus bus, CancellationToken cancellationToken) =>
66+
{
67+
QueueMessage queueMessage = new(0, Guid.NewGuid());
68+
bus.Publish(queueMessage, cancellationToken: cancellationToken);
69+
});
70+
5771
app.Run();
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
namespace SlimMessageBus.Host.Nats;
2+
3+
public static class NatsConsumerBuilderExtensions
4+
{
5+
public static ConsumerBuilder<T> Queue<T>(this ConsumerBuilder<T> natsBuilder, string natsQueue)
6+
{
7+
if (natsBuilder is null) throw new ArgumentNullException(nameof(natsBuilder));
8+
if (natsQueue is null) throw new ArgumentNullException(nameof(natsQueue));
9+
10+
natsBuilder.Path(natsQueue);
11+
natsBuilder.ConsumerSettings.PathKind = PathKind.Queue;
12+
return natsBuilder;
13+
}
14+
15+
public static ConsumerBuilder<T> Queue<T>(this ConsumerBuilder<T> natsBuilder, string natsQueue, Action<ConsumerBuilder<T>> natsTopicConfig)
16+
{
17+
if (natsBuilder is null) throw new ArgumentNullException(nameof(natsBuilder));
18+
if (natsTopicConfig is null) throw new ArgumentNullException(nameof(natsTopicConfig));
19+
20+
var b = natsBuilder.Queue(natsQueue);
21+
natsTopicConfig(b);
22+
return b;
23+
}
24+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
namespace SlimMessageBus.Host.Nats;
2+
3+
public static class NatsHandlerBuilderExtensions
4+
{
5+
/// <summary>
6+
/// Configure queue name that incoming requests (<see cref="TRequest"/>) are expected on.
7+
/// </summary>
8+
/// <param name="natsBuilder"></param>
9+
/// <param name="natsQueue">Queue name</param>
10+
/// <returns></returns>
11+
public static HandlerBuilder<TRequest, TResponse> Queue<TRequest, TResponse>(this HandlerBuilder<TRequest, TResponse> natsBuilder, string natsQueue)
12+
{
13+
if (natsBuilder is null) throw new ArgumentNullException(nameof(natsBuilder));
14+
if (natsQueue is null) throw new ArgumentNullException(nameof(natsQueue));
15+
16+
natsBuilder.Path(natsQueue);
17+
natsBuilder.ConsumerSettings.PathKind = PathKind.Queue;
18+
return natsBuilder;
19+
}
20+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
namespace SlimMessageBus.Host.Nats;
2+
3+
public static class NatsProducerBuilderExtensions
4+
{
5+
public static ProducerBuilder<T> DefaultQueue<T>(this ProducerBuilder<T> natsProducerBuilder, string natsQueue)
6+
{
7+
if (natsProducerBuilder is null) throw new ArgumentNullException(nameof(natsProducerBuilder));
8+
if (natsQueue is null) throw new ArgumentNullException(nameof(natsQueue));
9+
10+
natsProducerBuilder.DefaultTopic(natsQueue);
11+
natsProducerBuilder.ToQueue();
12+
return natsProducerBuilder;
13+
}
14+
15+
/// <summary>
16+
/// The topic parameter name in <see cref="IPublishBus.Publish{TMessage}"/> should be treated as a topic name
17+
/// </summary>
18+
/// <typeparam name="T"></typeparam>
19+
/// <param name="natsProducerBuilder"></param>
20+
/// <returns></returns>
21+
public static ProducerBuilder<T> ToTopic<T>(this ProducerBuilder<T> natsProducerBuilder)
22+
{
23+
if (natsProducerBuilder is null) throw new ArgumentNullException(nameof(natsProducerBuilder));
24+
25+
natsProducerBuilder.Settings.PathKind = PathKind.Topic;
26+
return natsProducerBuilder;
27+
}
28+
29+
/// <summary>
30+
/// The topic parameter name in <see cref="IPublishBus.Publish{TMessage}"/> should be treated as a queue group name
31+
/// </summary>
32+
/// <typeparam name="T"></typeparam>
33+
/// <param name="natsProducerBuilder"></param>
34+
/// <returns></returns>
35+
public static ProducerBuilder<T> ToQueue<T>(this ProducerBuilder<T> natsProducerBuilder)
36+
{
37+
if (natsProducerBuilder is null) throw new ArgumentNullException(nameof(natsProducerBuilder));
38+
39+
natsProducerBuilder.Settings.PathKind = PathKind.Queue;
40+
return natsProducerBuilder;
41+
}
42+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
namespace SlimMessageBus.Host.Nats;
2+
3+
public static class NatsRequestResponseBuilderExtensions
4+
{
5+
public static RequestResponseBuilder ReplyToQueue(this RequestResponseBuilder natsBuilder, string natsQueue)
6+
{
7+
if (natsBuilder is null) throw new ArgumentNullException(nameof(natsBuilder));
8+
if (natsQueue is null) throw new ArgumentNullException(nameof(natsQueue));
9+
10+
natsBuilder.Settings.Path = natsQueue;
11+
natsBuilder.Settings.PathKind = PathKind.Queue;
12+
return natsBuilder;
13+
}
14+
15+
public static RequestResponseBuilder ReplyToQueue(this RequestResponseBuilder natsBuilder, string natsQueue, Action<RequestResponseBuilder> natsBuilderConfig)
16+
{
17+
if (natsBuilder is null) throw new ArgumentNullException(nameof(natsBuilder));
18+
if (natsQueue is null) throw new ArgumentNullException(nameof(natsQueue));
19+
if (natsBuilderConfig is null) throw new ArgumentNullException(nameof(natsBuilderConfig));
20+
21+
var b = natsBuilder.ReplyToQueue(natsQueue);
22+
natsBuilderConfig(b);
23+
return b;
24+
}
25+
}

src/SlimMessageBus.Host.Nats/NatsMessageBus.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ protected override async Task CreateConsumers()
5757
MessageProvider<NatsMsg<byte[]>> GetMessageProvider(string path)
5858
=> SerializerProvider.GetSerializer(path).GetMessageProvider<byte[], NatsMsg<byte[]>>(t => t.Data);
5959

60-
foreach (var (subject, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList()))
60+
foreach (var ((subject, pathKind), consumerSettings) in Settings.Consumers.GroupBy(x => (x.Path, x.PathKind)).ToDictionary(x => x.Key, x => x.ToList()))
6161
{
62+
var queueGroup = pathKind == PathKind.Queue ? subject : null;
6263
var processor = new MessageProcessor<NatsMsg<byte[]>>(
6364
consumerSettings,
6465
messageBus: this,
@@ -67,22 +68,23 @@ MessageProvider<NatsMsg<byte[]>> GetMessageProvider(string path)
6768
this,
6869
consumerErrorHandlerOpenGenericType: typeof(INatsConsumerErrorHandler<>));
6970

70-
AddSubjectConsumer(consumerSettings, subject, processor);
71+
AddSubjectConsumer(consumerSettings, subject, queueGroup, processor);
7172
}
7273

7374
if (Settings.RequestResponse != null)
7475
{
7576
var subject = Settings.RequestResponse.Path;
77+
var queueGroup = Settings.RequestResponse.PathKind == PathKind.Queue ? subject : null;
7678

7779
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, GetMessageProvider(subject), PendingRequestStore, TimeProvider);
78-
AddSubjectConsumer([], subject, processor);
80+
AddSubjectConsumer([], subject, queueGroup, processor);
7981
}
8082
}
8183

82-
private void AddSubjectConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, string subject, IMessageProcessor<NatsMsg<byte[]>> processor)
84+
private void AddSubjectConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, string subject, string queueGroup, IMessageProcessor<NatsMsg<byte[]>> processor)
8385
{
8486
_logger.LogInformation("Creating consumer for {Subject}", subject);
85-
var consumer = new NatsSubjectConsumer<byte[]>(LoggerFactory.CreateLogger<NatsSubjectConsumer<byte[]>>(), consumerSettings, interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), subject, _connection, processor);
87+
var consumer = new NatsSubjectConsumer<byte[]>(LoggerFactory.CreateLogger<NatsSubjectConsumer<byte[]>>(), consumerSettings, interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), subject, queueGroup, _connection, processor);
8688
AddConsumer(consumer);
8789
}
8890

src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ public class NatsSubjectConsumer<TType> : AbstractConsumer
55
{
66
private readonly INatsConnection _connection;
77
private readonly IMessageProcessor<NatsMsg<TType>> _messageProcessor;
8+
private readonly string _queueGroup;
89
private INatsSub<TType>? _subscription;
910
private Task? _messageConsumerTask;
1011

1112
public NatsSubjectConsumer(ILogger logger,
1213
IEnumerable<AbstractConsumerSettings> consumerSettings,
1314
IEnumerable<IAbstractConsumerInterceptor> interceptors,
1415
string subject,
16+
string queueGroup,
1517
INatsConnection connection,
1618
IMessageProcessor<NatsMsg<TType>> messageProcessor)
1719
: base(logger,
@@ -21,11 +23,12 @@ public NatsSubjectConsumer(ILogger logger,
2123
{
2224
_connection = connection;
2325
_messageProcessor = messageProcessor;
26+
_queueGroup = queueGroup;
2427
}
2528

2629
protected override async Task OnStart()
2730
{
28-
_subscription ??= await _connection.SubscribeCoreAsync<TType>(Path, cancellationToken: CancellationToken);
31+
_subscription ??= await _connection.SubscribeCoreAsync<TType>(Path, queueGroup: _queueGroup, cancellationToken: CancellationToken);
2932

3033
_messageConsumerTask = Task.Factory.StartNew(OnLoop, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
3134
}
@@ -56,9 +59,9 @@ private async Task OnLoop()
5659
}
5760
}
5861
}
59-
catch (OperationCanceledException ex)
62+
catch (OperationCanceledException)
6063
{
61-
Logger.LogInformation(ex, "Consumer task was cancelled");
64+
Logger.LogInformation("Consumer task was cancelled");
6265
}
6366
}
6467
}

0 commit comments

Comments
 (0)