Skip to content

Feature: provide a way to use existing rabbitmq connection factories … #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Rebus.RabbitMq/Config/RabbitMqConfigurationExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Collections.Generic;
using RabbitMQ.Client;
using Rebus.Injection;
using Rebus.Internals;
using Rebus.Logging;
using Rebus.RabbitMq;
using Rebus.Retry;
using Rebus.Subscriptions;
using Rebus.Transport;
using System;
using System.Collections.Generic;
// ReSharper disable ExpressionIsAlwaysNull
// ReSharper disable ArgumentsStyleNamedExpression

Expand Down Expand Up @@ -39,6 +40,16 @@ public static RabbitMqOptionsBuilder UseRabbitMqAsOneWayClient(this StandardConf
return BuildInternal(configurer, true, (context, options) => new RabbitMqTransport(endpoints, null, context.Get<IRebusLoggerFactory>(), customizer: options.ConnectionFactoryCustomizer));
}

/// <summary>
/// Configures Rebus to use RabbitMQ to move messages around
/// </summary>
public static RabbitMqOptionsBuilder UseRabbitMq(this StandardConfigurer<ITransport> configurer, List<AmqpTcpEndpoint> endpoints, string inputQueueName, IConnectionFactory? factory, IConnection? connection)
{
if (inputQueueName == null) throw new ArgumentNullException(nameof(inputQueueName));
return BuildInternal(configurer, false, (context, options) => new RabbitMqTransport(endpoints, inputQueueName, factory, connection, context.Get<IRebusLoggerFactory>()));

}

/// <summary>
/// Configures Rebus to use RabbitMQ to move messages around
/// </summary>
Expand Down
66 changes: 66 additions & 0 deletions Rebus.RabbitMq/Internals/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,56 @@ class ConnectionManager : IAsyncDisposable
IConnection _activeConnection;
bool _disposed;

public ConnectionManager(IRebusLoggerFactory rebusLoggerFactory, IList<AmqpTcpEndpoint> endpoints, string inputQueueAddress, IConnectionFactory? factory, IConnection? connection)
{
if (endpoints == null) throw new ArgumentNullException(nameof(endpoints));
if (factory == null) throw new ArgumentNullException(nameof(factory));
if (connection == null) throw new ArgumentNullException(nameof(connection));
if (rebusLoggerFactory == null) throw new ArgumentNullException(nameof(rebusLoggerFactory));

_log = rebusLoggerFactory.GetLogger<ConnectionManager>();

if (inputQueueAddress != null)
{
_log.Info("Initializing RabbitMQ connection manager for transport with input queue {queueName}", inputQueueAddress);
}
else
{
_log.Info("Initializing RabbitMQ connection manager for one-way transport");
}

if (endpoints.Count == 0)
{
throw new ArgumentException("Please remember to specify at least one endpoints for a RabbitMQ server. You can also add multiple connection strings separated by ; or , which RabbitMq will use in failover scenarios");
}

if (endpoints.Count > 1)
{
_log.Info("RabbitMQ transport has {count} endpoints available", endpoints.Count);
}

foreach (var endpoint in endpoints)
{
if (endpoint == null)
{
throw new ArgumentException("Provided endpoint collection should not contain null values");
}

if (string.IsNullOrEmpty(endpoint.ToString()))
{
throw new ArgumentException("null or empty value is not valid for ConnectionString");
}
}

var uri = new Uri(endpoints.First().ToString());

_connectionFactory = ModifyConnectionFactory((ConnectionFactory)factory, uri, inputQueueAddress);

_amqpTcpEndpoints = endpoints;

_activeConnection = connection ?? throw new ArgumentNullException(nameof(connection), "Connection cannot be null. Please provide a valid connection.");
}

public ConnectionManager(IList<ConnectionEndpoint> endpoints, string inputQueueAddress, IRebusLoggerFactory rebusLoggerFactory, Func<IConnectionFactory, IConnectionFactory> customizer)
{
if (endpoints == null) throw new ArgumentNullException(nameof(endpoints));
Expand Down Expand Up @@ -140,6 +190,22 @@ public ConnectionManager(string connectionString, string inputQueueAddress, IReb
.ToList();
}

ConnectionFactory ModifyConnectionFactory(ConnectionFactory factory, Uri uri, string inputQueueAddress)
{
factory.AutomaticRecoveryEnabled = true;
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(30);
factory.ClientProperties = CreateClientProperties(inputQueueAddress);
factory.VirtualHost = GetVirtualHostPath();
return factory;

string GetVirtualHostPath()
{
return uri.LocalPath == "/"
? uri.LocalPath
: uri.LocalPath.Substring(1);
}
}

ConnectionFactory CreateConnectionFactory(Uri uri, string inputQueueAddress)
{
var connectionFactory = new ConnectionFactory
Expand Down
15 changes: 15 additions & 0 deletions Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ public class RabbitMqTransport : AbstractRebusTransport, IAsyncDisposable, IDisp
_log = rebusLoggerFactory.GetLogger<RabbitMqTransport>();
}

/// <summary>
/// Constructs the RabbitMQ transport with multiple amqp tcp endpoints. They should be also used in the connection factory.
/// A Connection factory and an existing connection must also be provided
/// </summary>
public RabbitMqTransport(List<AmqpTcpEndpoint> endpoints, string inputQueueAddress, IConnectionFactory? factory, IConnection? connection,
IRebusLoggerFactory rebusLoggerFactory, int maxMessagesToPrefetch = 50)
: this(rebusLoggerFactory, maxMessagesToPrefetch, inputQueueAddress)
{
if (endpoints == null) throw new ArgumentNullException(nameof(endpoints));
if (factory == null) throw new ArgumentNullException(nameof(factory));
if (connection == null) throw new ArgumentNullException(nameof(connection));

_connectionManager = new ConnectionManager(rebusLoggerFactory, endpoints, inputQueueAddress, factory, connection);
}

/// <summary>
/// Constructs the RabbitMQ transport with multiple connection endpoints. They will be tried in random order until working one is found
/// Credentials will be extracted from the connectionString of the first provided endpoint
Expand Down