diff --git a/Rebus.RabbitMq/Config/RabbitMqConfigurationExtensions.cs b/Rebus.RabbitMq/Config/RabbitMqConfigurationExtensions.cs index 73faeab..40ecf7d 100644 --- a/Rebus.RabbitMq/Config/RabbitMqConfigurationExtensions.cs +++ b/Rebus.RabbitMq/Config/RabbitMqConfigurationExtensions.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using RabbitMQ.Client; using Rebus.Injection; using Rebus.Internals; using Rebus.Logging; @@ -7,6 +6,8 @@ using Rebus.Retry; using Rebus.Subscriptions; using Rebus.Transport; +using System; +using System.Collections.Generic; // ReSharper disable ExpressionIsAlwaysNull // ReSharper disable ArgumentsStyleNamedExpression @@ -39,6 +40,16 @@ public static RabbitMqOptionsBuilder UseRabbitMqAsOneWayClient(this StandardConf return BuildInternal(configurer, true, (context, options) => new RabbitMqTransport(endpoints, null, context.Get(), customizer: options.ConnectionFactoryCustomizer)); } + /// + /// Configures Rebus to use RabbitMQ to move messages around + /// + public static RabbitMqOptionsBuilder UseRabbitMq(this StandardConfigurer configurer, List endpoints, string inputQueueName, IConnectionFactory? factory, IConnection? connection) + { + if (inputQueueName == null) throw new ArgumentNullException(nameof(inputQueueName)); + return BuildInternal(configurer, false, (context, options) => new RabbitMqTransport(endpoints, inputQueueName, factory, connection, context.Get())); + + } + /// /// Configures Rebus to use RabbitMQ to move messages around /// diff --git a/Rebus.RabbitMq/Internals/ConnectionManager.cs b/Rebus.RabbitMq/Internals/ConnectionManager.cs index a7a7fbf..81dea79 100644 --- a/Rebus.RabbitMq/Internals/ConnectionManager.cs +++ b/Rebus.RabbitMq/Internals/ConnectionManager.cs @@ -21,6 +21,56 @@ class ConnectionManager : IAsyncDisposable IConnection _activeConnection; bool _disposed; + public ConnectionManager(IRebusLoggerFactory rebusLoggerFactory, IList endpoints, string inputQueueAddress, IConnectionFactory? factory, IConnection? connection) + { + if (endpoints == null) throw new ArgumentNullException(nameof(endpoints)); + if (factory == null) throw new ArgumentNullException(nameof(factory)); + if (connection == null) throw new ArgumentNullException(nameof(connection)); + if (rebusLoggerFactory == null) throw new ArgumentNullException(nameof(rebusLoggerFactory)); + + _log = rebusLoggerFactory.GetLogger(); + + if (inputQueueAddress != null) + { + _log.Info("Initializing RabbitMQ connection manager for transport with input queue {queueName}", inputQueueAddress); + } + else + { + _log.Info("Initializing RabbitMQ connection manager for one-way transport"); + } + + if (endpoints.Count == 0) + { + throw new ArgumentException("Please remember to specify at least one endpoints for a RabbitMQ server. You can also add multiple connection strings separated by ; or , which RabbitMq will use in failover scenarios"); + } + + if (endpoints.Count > 1) + { + _log.Info("RabbitMQ transport has {count} endpoints available", endpoints.Count); + } + + foreach (var endpoint in endpoints) + { + if (endpoint == null) + { + throw new ArgumentException("Provided endpoint collection should not contain null values"); + } + + if (string.IsNullOrEmpty(endpoint.ToString())) + { + throw new ArgumentException("null or empty value is not valid for ConnectionString"); + } + } + + var uri = new Uri(endpoints.First().ToString()); + + _connectionFactory = ModifyConnectionFactory((ConnectionFactory)factory, uri, inputQueueAddress); + + _amqpTcpEndpoints = endpoints; + + _activeConnection = connection ?? throw new ArgumentNullException(nameof(connection), "Connection cannot be null. Please provide a valid connection."); + } + public ConnectionManager(IList endpoints, string inputQueueAddress, IRebusLoggerFactory rebusLoggerFactory, Func customizer) { if (endpoints == null) throw new ArgumentNullException(nameof(endpoints)); @@ -140,6 +190,22 @@ public ConnectionManager(string connectionString, string inputQueueAddress, IReb .ToList(); } + ConnectionFactory ModifyConnectionFactory(ConnectionFactory factory, Uri uri, string inputQueueAddress) + { + factory.AutomaticRecoveryEnabled = true; + factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(30); + factory.ClientProperties = CreateClientProperties(inputQueueAddress); + factory.VirtualHost = GetVirtualHostPath(); + return factory; + + string GetVirtualHostPath() + { + return uri.LocalPath == "/" + ? uri.LocalPath + : uri.LocalPath.Substring(1); + } + } + ConnectionFactory CreateConnectionFactory(Uri uri, string inputQueueAddress) { var connectionFactory = new ConnectionFactory diff --git a/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs b/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs index f0f322a..67aa0b1 100644 --- a/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs +++ b/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs @@ -99,6 +99,21 @@ public class RabbitMqTransport : AbstractRebusTransport, IAsyncDisposable, IDisp _log = rebusLoggerFactory.GetLogger(); } + /// + /// Constructs the RabbitMQ transport with multiple amqp tcp endpoints. They should be also used in the connection factory. + /// A Connection factory and an existing connection must also be provided + /// + public RabbitMqTransport(List endpoints, string inputQueueAddress, IConnectionFactory? factory, IConnection? connection, + IRebusLoggerFactory rebusLoggerFactory, int maxMessagesToPrefetch = 50) + : this(rebusLoggerFactory, maxMessagesToPrefetch, inputQueueAddress) + { + if (endpoints == null) throw new ArgumentNullException(nameof(endpoints)); + if (factory == null) throw new ArgumentNullException(nameof(factory)); + if (connection == null) throw new ArgumentNullException(nameof(connection)); + + _connectionManager = new ConnectionManager(rebusLoggerFactory, endpoints, inputQueueAddress, factory, connection); + } + /// /// Constructs the RabbitMQ transport with multiple connection endpoints. They will be tried in random order until working one is found /// Credentials will be extracted from the connectionString of the first provided endpoint