Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public static IServiceCollection AddRabbitMQSubscriptionPublisher(
services.TryAddSingleton(options ?? new SubscriptionOptions());
services.TryAddSingleton<IMessageSerializer, DefaultJsonMessageSerializer>();
services.TryAddSingleton<RabbitMQPubSub>();
services.TryAddSingleton<RabbitMQTopologyHelper>();
services.TryAddSingleton<ITopicEventSender>(sp => sp.GetRequiredService<RabbitMQPubSub>());

return services;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal sealed class RabbitMQPubSub : DefaultPubSub
private readonly IRabbitMQConnection _connection;
private readonly IMessageSerializer _serializer;
private readonly RabbitMQSubscriptionOptions _rabbitMqSubscriptionOptions;
private readonly RabbitMQTopologyHelper _topologyHelper;
private readonly string _completed;
private readonly int _topicBufferCapacity;
private readonly TopicBufferFullMode _topicBufferFullMode;
Expand All @@ -20,12 +21,14 @@ public RabbitMQPubSub(
IMessageSerializer serializer,
SubscriptionOptions options,
RabbitMQSubscriptionOptions rabbitMqSubscriptionOptions,
ISubscriptionDiagnosticEvents diagnosticEvents)
ISubscriptionDiagnosticEvents diagnosticEvents,
RabbitMQTopologyHelper topologyHelper)
: base(options, diagnosticEvents)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_rabbitMqSubscriptionOptions = rabbitMqSubscriptionOptions ?? throw new ArgumentNullException(nameof(rabbitMqSubscriptionOptions));
_topologyHelper = topologyHelper;
_topicBufferCapacity = options.TopicBufferCapacity;
_topicBufferFullMode = options.TopicBufferFullMode;
_completed = serializer.CompleteMessage;
Expand Down Expand Up @@ -55,12 +58,14 @@ protected override DefaultTopic<TMessage> OnCreateTopic<TMessage>(
bufferCapacity ?? _topicBufferCapacity,
bufferFullMode ?? _topicBufferFullMode,
_rabbitMqSubscriptionOptions,
DiagnosticEvents);
DiagnosticEvents,
_topologyHelper);

private async Task PublishAsync(string formattedTopic, string message, CancellationToken cancellationToken)
{
var body = Encoding.UTF8.GetBytes(message);
var channel = await _connection.GetChannelAsync(cancellationToken).ConfigureAwait(false);
await _topologyHelper.ConfigurePublishingAsync(channel, formattedTopic, cancellationToken).ConfigureAwait(false);

await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
Expand Down
24 changes: 5 additions & 19 deletions src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal sealed class RabbitMQTopic<TMessage> : DefaultTopic<TMessage>
private readonly IRabbitMQConnection _connection;
private readonly IMessageSerializer _serializer;
private readonly RabbitMQSubscriptionOptions _rabbitMqSubscriptionOptions;
private readonly RabbitMQTopologyHelper _topologyHelper;

public RabbitMQTopic(
string name,
Expand All @@ -20,12 +21,14 @@ public RabbitMQTopic(
int capacity,
TopicBufferFullMode fullMode,
RabbitMQSubscriptionOptions rabbitMqSubscriptionOptions,
ISubscriptionDiagnosticEvents diagnosticEvents)
ISubscriptionDiagnosticEvents diagnosticEvents,
RabbitMQTopologyHelper topologyHelper)
: base(name, capacity, fullMode, diagnosticEvents)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_rabbitMqSubscriptionOptions = rabbitMqSubscriptionOptions ?? throw new ArgumentNullException(nameof(rabbitMqSubscriptionOptions));
_topologyHelper = topologyHelper;
}

protected override async ValueTask<IAsyncDisposable> OnConnectAsync(CancellationToken cancellationToken)
Expand All @@ -39,24 +42,7 @@ protected override async ValueTask<IAsyncDisposable> OnConnectAsync(Cancellation
var queueName = string.IsNullOrEmpty(_rabbitMqSubscriptionOptions.QueuePrefix)
? string.Empty // use server-generated name
: _rabbitMqSubscriptionOptions.QueuePrefix + Guid.NewGuid();

await channel.ExchangeDeclareAsync(
exchange: Name,
type: ExchangeType.Fanout,
durable: true,
autoDelete: false,
cancellationToken: cancellationToken);
await channel.QueueDeclareAsync(
queue: queueName,
durable: true,
exclusive: true,
autoDelete: true,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
queue: queueName,
exchange: Name,
routingKey: string.Empty,
cancellationToken: cancellationToken);
await _topologyHelper.ConfigureConsumingAsync(channel, Name, queueName, cancellationToken).ConfigureAwait(false);

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, args) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Collections.Concurrent;
using RabbitMQ.Client;

namespace HotChocolate.Subscriptions.RabbitMQ;

internal class RabbitMQTopologyHelper
{
private readonly ConcurrentDictionary<string, bool> _declaredExchanges = new();

public async ValueTask ConfigurePublishingAsync(IChannel channel, string formattedTopic, CancellationToken cancellationToken)
{
// Create an exchange if it wasn't created.
// This extra check isn't required, but it's faster to do it in memory than go to RabbitMQ every time before publishing a message
if (!_declaredExchanges.ContainsKey(formattedTopic))
{
await channel.ExchangeDeclareAsync(
exchange: formattedTopic,
type: ExchangeType.Fanout,
durable: true,
autoDelete: false,
cancellationToken: cancellationToken);

_declaredExchanges.TryAdd(formattedTopic, true);
}
}

public async Task ConfigureConsumingAsync(IChannel channel, string formattedTopic, string queueName, CancellationToken cancellationToken)
{
// need to declare an exchange so that we can bind a queue to it
await ConfigurePublishingAsync(channel, formattedTopic, cancellationToken);

await channel.QueueDeclareAsync(
queue: queueName,
durable: true,
exclusive: true,
autoDelete: true,
cancellationToken: cancellationToken);

await channel.QueueBindAsync(
queue: queueName,
exchange: formattedTopic,
routingKey: string.Empty,
cancellationToken: cancellationToken);
}
}
Loading