diff --git a/RabbitMQ.AMQP.Client/ConnectionSettings.cs b/RabbitMQ.AMQP.Client/ConnectionSettings.cs index b280972..0d0f372 100644 --- a/RabbitMQ.AMQP.Client/ConnectionSettings.cs +++ b/RabbitMQ.AMQP.Client/ConnectionSettings.cs @@ -12,6 +12,19 @@ namespace RabbitMQ.AMQP.Client { + public interface IUriSelector + { + Uri Select(ICollection uris); + } + + public class RandomUriSelector : IUriSelector + { + public Uri Select(ICollection uris) + { + return uris.Skip(Utils.RandomNext(0, uris.Count)).First(); + } + } + public class ConnectionSettingsBuilder { private string _host = "localhost"; @@ -24,7 +37,10 @@ public class ConnectionSettingsBuilder private uint _maxFrameSize = Consts.DefaultMaxFrameSize; private SaslMechanism _saslMechanism = Client.SaslMechanism.Anonymous; private IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration(); - private IList? _uris; + private TlsSettings? _tlsSettings = null; + private Uri? _uri; + private List? _uris; + private IUriSelector? _uriSelector; public static ConnectionSettingsBuilder Create() { @@ -57,8 +73,15 @@ public ConnectionSettingsBuilder Password(string password) public ConnectionSettingsBuilder Scheme(string scheme) { - _scheme = scheme; - return this; + if (Utils.IsValidScheme(scheme)) + { + _scheme = scheme; + return this; + } + else + { + throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp' or 'amqps'"); + } } public ConnectionSettingsBuilder ContainerId(string containerId) @@ -76,10 +99,10 @@ public ConnectionSettingsBuilder VirtualHost(string virtualHost) public ConnectionSettingsBuilder MaxFrameSize(uint maxFrameSize) { _maxFrameSize = maxFrameSize; - if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512) + if (_maxFrameSize != Consts.DefaultMaxFrameSize && _maxFrameSize < 512) { throw new ArgumentOutOfRangeException(nameof(maxFrameSize), - "maxFrameSize must be greater or equal to 512"); + "maxFrameSize must be 0 (no limit) or greater than or equal to 512"); } return this; } @@ -103,21 +126,70 @@ public ConnectionSettingsBuilder RecoveryConfiguration(IRecoveryConfiguration re return this; } + public ConnectionSettingsBuilder TlsSettings(TlsSettings tlsSettings) + { + _tlsSettings = tlsSettings; + return this; + } + + public ConnectionSettingsBuilder Uri(Uri uri) + { + _uri = uri; + ValidateUris(); + return this; + } + public ConnectionSettingsBuilder Uris(IEnumerable uris) { _uris = uris.ToList(); + ValidateUris(); + return this; + } + + public ConnectionSettingsBuilder UriSelector(IUriSelector uriSelector) + { + _uriSelector = uriSelector; return this; } public ConnectionSettings Build() { // TODO this should do something similar to consolidate in the Java code - var c = new ConnectionSettings(_scheme, _host, _port, _user, - _password, _virtualHost, - _containerId, _saslMechanism, - _recoveryConfiguration, - _maxFrameSize); - return c; + ValidateUris(); + if (_uri is not null) + { + return new ConnectionSettings(_uri, + _containerId, _saslMechanism, + _recoveryConfiguration, + _maxFrameSize, + _tlsSettings); + } + else if (_uris is not null) + { + return new ClusterConnectionSettings(_uris, + _uriSelector, + _containerId, _saslMechanism, + _recoveryConfiguration, + _maxFrameSize, + _tlsSettings); + } + else + { + return new ConnectionSettings(_scheme, _host, _port, _user, + _password, _virtualHost, + _containerId, _saslMechanism, + _recoveryConfiguration, + _maxFrameSize, + _tlsSettings); + } + } + + private void ValidateUris() + { + if (_uri is not null && _uris is not null) + { + throw new ArgumentOutOfRangeException("uris", "Do not set both Uri and Uris"); + } } } @@ -126,44 +198,30 @@ public ConnectionSettings Build() // public class ConnectionSettings : IEquatable { - private readonly Address _address; - private readonly string _virtualHost = "/"; - private readonly string _containerId = ""; + protected Address _address = new("amqp://localhost:5672"); + protected string _virtualHost = Consts.DefaultVirtualHost; + private readonly string _containerId = string.Empty; private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize; private readonly TlsSettings? _tlsSettings; private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain; private readonly IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration(); - public ConnectionSettings(Uri uri) + public ConnectionSettings(Uri uri, + string? containerId = null, + SaslMechanism? saslMechanism = null, + IRecoveryConfiguration? recoveryConfiguration = null, + uint? maxFrameSize = null, + TlsSettings? tlsSettings = null) + : this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings) { - string? user = null; - string? password = null; - string userInfo = uri.UserInfo; - if (!string.IsNullOrEmpty(userInfo)) - { - string[] userPass = userInfo.Split(':'); - if (userPass.Length > 2) - { - throw new ArgumentException($"Bad user info in AMQP URI: {userInfo}"); - } + (string? user, string? password) = ProcessUserInfo(uri); - user = UriDecode(userPass[0]); - if (userPass.Length == 2) - { - password = UriDecode(userPass[1]); - } - } - - // C# automatically changes URIs into a canonical form - // that has at least the path segment "/" - if (uri.Segments.Length > 2) - { - throw new ArgumentException($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}"); - } + _virtualHost = ProcessUriSegmentsForVirtualHost(uri); - if (uri.Segments.Length == 2) + string scheme = uri.Scheme; + if (false == Utils.IsValidScheme(scheme)) { - _virtualHost = UriDecode(uri.Segments[1]); + throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp' or 'amqps'"); } _address = new Address(host: uri.Host, @@ -171,7 +229,7 @@ public ConnectionSettings(Uri uri) user: user, password: password, path: "/", - scheme: uri.Scheme); + scheme: scheme); if (_address.UseSsl && _tlsSettings == null) { @@ -179,36 +237,72 @@ public ConnectionSettings(Uri uri) } } - public ConnectionSettings(string scheme, string host, int port, - string? user, string? password, - string virtualHost, string containerId, - SaslMechanism saslMechanism, - IRecoveryConfiguration recoveryConfiguration, - uint maxFrameSize = Consts.DefaultMaxFrameSize, + public ConnectionSettings(string scheme, + string host, + int port, + string? user = null, + string? password = null, + string? virtualHost = null, + string containerId = "", + SaslMechanism? saslMechanism = null, + IRecoveryConfiguration? recoveryConfiguration = null, + uint? maxFrameSize = null, TlsSettings? tlsSettings = null) + : this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings) { + if (false == Utils.IsValidScheme(scheme)) + { + throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp' or 'amqps'"); + } + _address = new Address(host: host, port: port, user: user, password: password, path: "/", scheme: scheme); - _containerId = containerId; - _virtualHost = virtualHost; - _saslMechanism = saslMechanism; - _maxFrameSize = maxFrameSize; - if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512) + if (virtualHost is not null) { - throw new ArgumentOutOfRangeException(nameof(maxFrameSize), - "maxFrameSize must be greater or equal to 512"); + _virtualHost = virtualHost; } - _tlsSettings = tlsSettings; - if (_address.UseSsl && _tlsSettings == null) { _tlsSettings = new TlsSettings(); } + } - _recoveryConfiguration = recoveryConfiguration; + protected ConnectionSettings( + string? containerId = null, + SaslMechanism? saslMechanism = null, + IRecoveryConfiguration? recoveryConfiguration = null, + uint? maxFrameSize = null, + TlsSettings? tlsSettings = null) + { + if (containerId is not null) + { + _containerId = containerId; + } + + if (saslMechanism is not null) + { + _saslMechanism = saslMechanism; + } + + if (recoveryConfiguration is not null) + { + _recoveryConfiguration = recoveryConfiguration; + } + + if (maxFrameSize is not null) + { + _maxFrameSize = (uint)maxFrameSize; + if (_maxFrameSize != Consts.DefaultMaxFrameSize && _maxFrameSize < 512) + { + throw new ArgumentOutOfRangeException(nameof(maxFrameSize), + "maxFrameSize must be 0 (no limit) or greater than or equal to 512"); + } + } + + _tlsSettings = tlsSettings; } public string Host => _address.Host; @@ -224,9 +318,10 @@ public ConnectionSettings(string scheme, string host, int port, public SaslMechanism SaslMechanism => _saslMechanism; public TlsSettings? TlsSettings => _tlsSettings; public IRecoveryConfiguration Recovery => _recoveryConfiguration; - public IEnumerable? Uris => throw new NotImplementedException(); - internal Address Address => _address; + internal virtual Address Address => _address; + + internal virtual IList
Addresses => new[] { _address }; public override string ToString() { @@ -287,6 +382,48 @@ public override int GetHashCode() _address.Scheme, _containerId, _address.Path); } + protected static (string? user, string? password) ProcessUserInfo(Uri uri) + { + string? user = null; + string? password = null; + string userInfo = uri.UserInfo; + if (!string.IsNullOrEmpty(userInfo)) + { + string[] userPass = userInfo.Split(':'); + if (userPass.Length > 2) + { + throw new ArgumentException($"Bad user info in AMQP URI: {userInfo}"); + } + + user = UriDecode(userPass[0]); + if (userPass.Length == 2) + { + password = UriDecode(userPass[1]); + } + } + + return (user, password); + } + + protected static string ProcessUriSegmentsForVirtualHost(Uri uri) + { + // C# automatically changes URIs into a canonical form + // that has at least the path segment "/" + if (uri.Segments.Length > 2) + { + throw new ArgumentException($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}"); + } + + if (uri.Segments.Length == 2) + { + return UriDecode(uri.Segments[1]); + } + else + { + return Consts.DefaultVirtualHost; + } + } + /// /// Unescape a string, protecting '+'. /// @@ -296,6 +433,130 @@ private static string UriDecode(string str) } } + public class ClusterConnectionSettings : ConnectionSettings + { + private readonly List _uris; + private readonly Dictionary _uriToAddress; + private readonly IUriSelector _uriSelector = new RandomUriSelector(); + + public ClusterConnectionSettings(IEnumerable uris, + IUriSelector? uriSelector = null, + string? containerId = null, + SaslMechanism? saslMechanism = null, + IRecoveryConfiguration? recoveryConfiguration = null, + uint? maxFrameSize = null, + TlsSettings? tlsSettings = null) + : base(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings) + { + _uris = uris.ToList(); + if (_uris.Count == 0) + { + throw new ArgumentOutOfRangeException(nameof(uris), "At least one Uri is required."); + } + + _uriToAddress = new(_uris.Count); + + if (uriSelector is not null) + { + _uriSelector = uriSelector; + } + + string? tmpVirtualHost = null; + + bool first = true; + foreach (Uri uri in _uris) + { + string scheme = uri.Scheme; + if (false == Utils.IsValidScheme(scheme)) + { + throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp' or 'amqps'"); + } + + (string? user, string? password) = ProcessUserInfo(uri); + + if (tmpVirtualHost is null) + { + tmpVirtualHost = ProcessUriSegmentsForVirtualHost(uri); + } + else + { + string thisVirtualHost = ProcessUriSegmentsForVirtualHost(uri); + if (false == thisVirtualHost.Equals(tmpVirtualHost, StringComparison.InvariantCultureIgnoreCase)) + { + throw new ArgumentException($"All AMQP URIs must use the same virtual host. Expected '{tmpVirtualHost}', got '{thisVirtualHost}'"); + } + } + + var address = new Address(host: uri.Host, + port: uri.Port, + user: user, + password: password, + path: "/", + scheme: scheme); + + _uriToAddress[uri] = address; + + if (first) + { + _address = address; + first = false; + } + } + + if (tmpVirtualHost is not null) + { + _virtualHost = tmpVirtualHost; + } + } + + public override bool Equals(object? obj) + { + if (obj is null) + { + return false; + } + + if (base.Equals(obj) && (obj is ClusterConnectionSettings other)) + { + for (int i = 0; i < _uris.Count; i++) + { + Uri thisUri = _uris[i]; + Uri otherUri = other._uris[i]; + if (false == thisUri.Equals(otherUri)) + { + return false; + } + } + + return true; + } + + return false; + } + + public override int GetHashCode() + { + int baseHashCode = base.GetHashCode(); + int hashCode = baseHashCode; + for (int i = 0; i < _uris.Count; i++) + { + hashCode ^= _uris[i].GetHashCode(); + } + return hashCode; + } + + internal override Address Address + { + get + { + Uri uri = _uriSelector.Select(_uris); + return _uriToAddress[uri]; + } + } + + internal override IList
Addresses => _uriToAddress.Values.ToList(); + } + public class TlsSettings { internal const SslProtocols DefaultSslProtocols = SslProtocols.None; diff --git a/RabbitMQ.AMQP.Client/Consts.cs b/RabbitMQ.AMQP.Client/Consts.cs index f4bae84..25b4a8e 100644 --- a/RabbitMQ.AMQP.Client/Consts.cs +++ b/RabbitMQ.AMQP.Client/Consts.cs @@ -16,5 +16,10 @@ public static class Consts /// uint.MinValue means "no limit" /// public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024 + + /// + /// The default virtual host, / + /// + public const string DefaultVirtualHost = "/"; } } diff --git a/RabbitMQ.AMQP.Client/IEnvironment.cs b/RabbitMQ.AMQP.Client/IEnvironment.cs index 9ba35a8..6993868 100644 --- a/RabbitMQ.AMQP.Client/IEnvironment.cs +++ b/RabbitMQ.AMQP.Client/IEnvironment.cs @@ -34,6 +34,7 @@ public interface IEnvironment /// Close all connections. /// /// + // TODO cancellation token Task CloseAsync(); } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs index da17e7d..cc9d168 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs @@ -34,8 +34,10 @@ public class AmqpConnection : AbstractLifeCycle, IConnection private readonly AmqpManagement _management; private readonly RecordingTopologyListener _recordingTopologyListener = new(); - internal readonly ConnectionSettings _connectionSettings; + private readonly ConnectionSettings _connectionSettings; private readonly IMetricsReporter? _metricsReporter; + + // TODO this is coupled with publishers and consumers internal readonly AmqpSessionManagement _nativePubSubSessions; private readonly Dictionary _connectionProperties = new(); @@ -350,23 +352,9 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened) try { - ConnectionSettings connectionSettings; - if (_connectionSettings is null) - { - // TODO create "internal bug" exception type? - throw new InvalidOperationException( - "_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); - } - else - { - // TODO - // There is absolutely NO POINT in having an interface if this - // is what will be done! - connectionSettings = (ConnectionSettings)_connectionSettings; - Address address = connectionSettings.Address; - _nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened) - .ConfigureAwait(false); - } + Address address = _connectionSettings.Address; + _nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened) + .ConfigureAwait(false); } catch (Exception ex) { diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs b/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs index b4ae647..c3c3e72 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs @@ -62,6 +62,10 @@ public Task CreateConnectionAsync() public ReadOnlyCollection GetConnections() => new(_connections.Values.ToList()); - public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync())); + // TODO cancellation token + public Task CloseAsync() + { + return Task.WhenAll(_connections.Values.Select(c => c.CloseAsync())); + } } } diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 1a7fcf7..a7b3c1e 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -3,6 +3,7 @@ abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Thre abstract RabbitMQ.AMQP.Client.Impl.StreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder! const RabbitMQ.AMQP.Client.Consts.Bindings = "bindings" -> string! const RabbitMQ.AMQP.Client.Consts.DefaultMaxFrameSize = 0 -> uint +const RabbitMQ.AMQP.Client.Consts.DefaultVirtualHost = "/" -> string! const RabbitMQ.AMQP.Client.Consts.Exchanges = "exchanges" -> string! const RabbitMQ.AMQP.Client.Consts.Key = "key" -> string! const RabbitMQ.AMQP.Client.Consts.Messages = "messages" -> string! @@ -10,6 +11,8 @@ const RabbitMQ.AMQP.Client.Consts.Queues = "queues" -> string! const RabbitMQ.AMQP.Client.MetricsReporter.MeterName = "RabbitMQ.Amqp" -> string! const RabbitMQ.AMQP.Client.MetricsReporter.MetricPrefix = "rabbitmq.amqp" -> string! override RabbitMQ.AMQP.Client.BackOffDelayPolicy.ToString() -> string! +override RabbitMQ.AMQP.Client.ClusterConnectionSettings.Equals(object? obj) -> bool +override RabbitMQ.AMQP.Client.ClusterConnectionSettings.GetHashCode() -> int override RabbitMQ.AMQP.Client.ConnectionSettings.Equals(object? obj) -> bool override RabbitMQ.AMQP.Client.ConnectionSettings.GetHashCode() -> int override RabbitMQ.AMQP.Client.ConnectionSettings.ToString() -> string! @@ -53,12 +56,15 @@ RabbitMQ.AMQP.Client.ClassicQueueMode.Lazy = 1 -> RabbitMQ.AMQP.Client.ClassicQu RabbitMQ.AMQP.Client.ClassicQueueVersion RabbitMQ.AMQP.Client.ClassicQueueVersion.V1 = 0 -> RabbitMQ.AMQP.Client.ClassicQueueVersion RabbitMQ.AMQP.Client.ClassicQueueVersion.V2 = 1 -> RabbitMQ.AMQP.Client.ClassicQueueVersion +RabbitMQ.AMQP.Client.ClusterConnectionSettings +RabbitMQ.AMQP.Client.ClusterConnectionSettings.ClusterConnectionSettings(System.Collections.Generic.IEnumerable! uris, RabbitMQ.AMQP.Client.IUriSelector? uriSelector = null, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void RabbitMQ.AMQP.Client.ConnectionException RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message) -> void RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message, System.Exception! innerException) -> void RabbitMQ.AMQP.Client.ConnectionSettings -RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! containerId, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration, uint maxFrameSize = 0, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void -RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri) -> void +RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user = null, string? password = null, string? virtualHost = null, string! containerId = "", RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void +RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void +RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void RabbitMQ.AMQP.Client.ConnectionSettings.ContainerId.get -> string! RabbitMQ.AMQP.Client.ConnectionSettings.Host.get -> string! RabbitMQ.AMQP.Client.ConnectionSettings.MaxFrameSize.get -> uint @@ -69,10 +75,11 @@ RabbitMQ.AMQP.Client.ConnectionSettings.Recovery.get -> RabbitMQ.AMQP.Client.IRe RabbitMQ.AMQP.Client.ConnectionSettings.SaslMechanism.get -> RabbitMQ.AMQP.Client.SaslMechanism! RabbitMQ.AMQP.Client.ConnectionSettings.Scheme.get -> string! RabbitMQ.AMQP.Client.ConnectionSettings.TlsSettings.get -> RabbitMQ.AMQP.Client.TlsSettings? -RabbitMQ.AMQP.Client.ConnectionSettings.Uris.get -> System.Collections.Generic.IEnumerable? RabbitMQ.AMQP.Client.ConnectionSettings.User.get -> string? RabbitMQ.AMQP.Client.ConnectionSettings.UseSsl.get -> bool RabbitMQ.AMQP.Client.ConnectionSettings.VirtualHost.get -> string! +RabbitMQ.AMQP.Client.ConnectionSettings._address -> Amqp.Address! +RabbitMQ.AMQP.Client.ConnectionSettings._virtualHost -> string! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Build() -> RabbitMQ.AMQP.Client.ConnectionSettings! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.ConnectionSettingsBuilder() -> void @@ -84,7 +91,10 @@ RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Port(int port) -> RabbitMQ.AMQP.C RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.SaslMechanism(RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Scheme(string! scheme) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! +RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.TlsSettings(RabbitMQ.AMQP.Client.TlsSettings! tlsSettings) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! +RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uri(System.Uri! uri) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uris(System.Collections.Generic.IEnumerable! uris) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! +RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.UriSelector(RabbitMQ.AMQP.Client.IUriSelector! uriSelector) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.User(string! user) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.VirtualHost(string! virtualHost) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! RabbitMQ.AMQP.Client.Consts @@ -668,6 +678,8 @@ RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterS RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! +RabbitMQ.AMQP.Client.IUriSelector +RabbitMQ.AMQP.Client.IUriSelector.Select(System.Collections.Generic.ICollection! uris) -> System.Uri! RabbitMQ.AMQP.Client.LifeCycleCallBack RabbitMQ.AMQP.Client.MessageHandler RabbitMQ.AMQP.Client.MetricsReporter @@ -711,6 +723,9 @@ RabbitMQ.AMQP.Client.QueueType.STREAM = 2 -> RabbitMQ.AMQP.Client.QueueType RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy +RabbitMQ.AMQP.Client.RandomUriSelector +RabbitMQ.AMQP.Client.RandomUriSelector.RandomUriSelector() -> void +RabbitMQ.AMQP.Client.RandomUriSelector.Select(System.Collections.Generic.ICollection! uris) -> System.Uri! RabbitMQ.AMQP.Client.RecoveryConfiguration RabbitMQ.AMQP.Client.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! RabbitMQ.AMQP.Client.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! @@ -752,6 +767,8 @@ static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Cli static RabbitMQ.AMQP.Client.ByteCapacity.Kb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! static RabbitMQ.AMQP.Client.ByteCapacity.Mb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! static RabbitMQ.AMQP.Client.ByteCapacity.Tb(long terabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! +static RabbitMQ.AMQP.Client.ConnectionSettings.ProcessUriSegmentsForVirtualHost(System.Uri! uri) -> string! +static RabbitMQ.AMQP.Client.ConnectionSettings.ProcessUserInfo(System.Uri! uri) -> (string? user, string? password) static RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Create() -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder! static RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper.AddressBuilder() -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! static RabbitMQ.AMQP.Client.Impl.AmqpConnection.CreateAsync(RabbitMQ.AMQP.Client.ConnectionSettings! connectionSettings, RabbitMQ.AMQP.Client.IMetricsReporter? metricsReporter = null) -> System.Threading.Tasks.Task! diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs index 27cad4e..a09676f 100644 --- a/RabbitMQ.AMQP.Client/Utils.cs +++ b/RabbitMQ.AMQP.Client/Utils.cs @@ -179,6 +179,19 @@ internal static bool CompareMap(Map map1, Map map2) return true; } + internal static bool IsValidScheme(string scheme) + { + if (scheme.Equals("amqp", StringComparison.InvariantCultureIgnoreCase) || + scheme.Equals("amqps", StringComparison.InvariantCultureIgnoreCase)) + { + return true; + } + else + { + return false; + } + } + internal static void ValidateMessageAnnotations(Dictionary annotations) { foreach (KeyValuePair kvp in annotations) diff --git a/Tests/ClusterTests.cs b/Tests/ClusterTests.cs index 0a8b3cc..1a073a3 100644 --- a/Tests/ClusterTests.cs +++ b/Tests/ClusterTests.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -15,7 +16,7 @@ public class ClusterTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper, setupConnectionAndManagement: false) { [SkippableFact] - public Task CreateConnectionWithEnvironmentAndMultipleUris() + public async Task CreateConnectionWithEnvironmentAndMultipleUris() { Skip.IfNot(IsCluster); @@ -31,16 +32,16 @@ public Task CreateConnectionWithEnvironmentAndMultipleUris() connectionSettingBuilder.Uris(uris); ConnectionSettings connectionSettings = connectionSettingBuilder.Build(); - /* - IEnvironment env = AmqpEnvironment.Create(ConnectionSettingBuilder.Create().Build()); - IConnection connection = await env.CreateConnectionAsync(); - Assert.NotNull(connection); + IEnvironment env = AmqpEnvironment.Create(connectionSettings); + + // Note: by using _connection, the test will dispose the object on teardown + _connection = await env.CreateConnectionAsync(); + Assert.NotNull(_connection); Assert.NotEmpty(env.GetConnections()); + await env.CloseAsync(); - Assert.Equal(State.Closed, connection.State); - Assert.Empty(env.GetConnections()); - */ - return Task.CompletedTask; + Assert.Equal(State.Closed, _connection.State); + Assert.Empty(env.GetConnections()); } } diff --git a/Tests/ConnectionTests/ConnectionSettingsTests.cs b/Tests/ConnectionTests/ConnectionSettingsTests.cs index 24673aa..db23ae4 100644 --- a/Tests/ConnectionTests/ConnectionSettingsTests.cs +++ b/Tests/ConnectionTests/ConnectionSettingsTests.cs @@ -3,9 +3,9 @@ // Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; -using System.Threading.Tasks; +using System.Collections.Generic; +using System.Security.Authentication; using RabbitMQ.AMQP.Client; -using RabbitMQ.AMQP.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -15,11 +15,12 @@ public class ConnectionSettingsTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper, setupConnectionAndManagement: false) { [Fact] - public void ValidateAddress() + public void ConnectionSettingsValidation() { - RecoveryConfiguration recoveryConfiguration = new RecoveryConfiguration(); + IBackOffDelayPolicy backOffDelayPolicy = new TestBackoffDelayPolicy(); + IRecoveryConfiguration recoveryConfiguration = new TestRecoveryConfiguration(backOffDelayPolicy); - ConnectionSettings connectionSettings = new("amqp1", "localhost", 5672, "guest-user", + ConnectionSettings connectionSettings = new("amqp", "localhost", 5672, "guest-user", "guest-password", "vhost_1", "connection_name", SaslMechanism.External, recoveryConfiguration); Assert.Equal("localhost", connectionSettings.Host); @@ -27,24 +28,27 @@ public void ValidateAddress() Assert.Equal("guest-user", connectionSettings.User); Assert.Equal("guest-password", connectionSettings.Password); Assert.Equal("vhost_1", connectionSettings.VirtualHost); - Assert.Equal("amqp1", connectionSettings.Scheme); + Assert.Equal("amqp", connectionSettings.Scheme); Assert.Equal(SaslMechanism.External, connectionSettings.SaslMechanism); - ConnectionSettings second = new("amqp1", "localhost", 5672, "guest-user", + ConnectionSettings second = new("amqp", "localhost", 5672, "guest-user", "guest-password", "vhost_1", "connection_name", SaslMechanism.External, recoveryConfiguration); Assert.Equal(connectionSettings, second); - ConnectionSettings third = new("amqp2", "localhost", 5672, "guest-user", + ConnectionSettings third = new("amqp", "localhost", 5672, "guest-user", "guest-password", "path/", "connection_name", SaslMechanism.Plain, recoveryConfiguration); Assert.NotEqual(connectionSettings, third); + + Assert.Same(recoveryConfiguration, connectionSettings.Recovery); + Assert.Same(backOffDelayPolicy, connectionSettings.Recovery.GetBackOffDelayPolicy()); } [Fact] - public void ValidateAddressBuilder() + public void ConnectionSettingsViaBuilder() { ConnectionSettings connectionSettings = ConnectionSettingsBuilder.Create() .Host("localhost") @@ -63,7 +67,21 @@ public void ValidateAddressBuilder() } [Fact] - public void ValidateBuilderWithSslOptions() + public void ConnectionSettingsWithBadSchemeThrows() + { + Assert.ThrowsAny(() => + { + ConnectionSettingsBuilder.Create().Scheme("amqpX").Build(); + }); + + Assert.ThrowsAny(() => + { + new ConnectionSettings("amqpY", "foobar", 5672); + }); + } + + [Fact] + public void ConnectionSettingsViaBuilderWithSslOptions() { ConnectionSettings connectionSettings = ConnectionSettingsBuilder.Create() .Host("localhost") @@ -104,23 +122,238 @@ public void ConnectionSettingsViaUri() } [Fact] - public async Task RaiseErrorsIfTheParametersAreNotValid() + public void ConnectionSettingsViaUris() + { + const string scheme = "amqps"; + const string host = "rabbitmq-host.foo.baz.com"; + const string vhost = "/frazzle"; + string user = RandomString(10); + string pass = RandomString(10); + + var uri0 = new Uri($"{scheme}://{user}:{pass}@{host}:5671/%2Ffrazzle"); + var uri1 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle"); + var uri2 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle"); + + List uris = [uri0, uri1, uri2]; + var connectionSettings = new ClusterConnectionSettings(uris); + + Assert.True(connectionSettings.UseSsl); + Assert.Equal(host, connectionSettings.Host); + Assert.Equal(5671, connectionSettings.Port); + Assert.Equal(user, connectionSettings.User); + Assert.Equal(pass, connectionSettings.Password); + Assert.Equal(vhost, connectionSettings.VirtualHost); + Assert.Equal(scheme, connectionSettings.Scheme); + + Assert.NotNull(connectionSettings.Addresses); + Amqp.Address a0 = connectionSettings.Addresses[0]; + Assert.Equal(host, a0.Host); + Assert.Equal(5671, a0.Port); + Assert.Equal(user, a0.User); + Assert.Equal(pass, a0.Password); + Assert.Equal(scheme, a0.Scheme); + + Amqp.Address a1 = connectionSettings.Addresses[1]; + Assert.Equal(host, a1.Host); + Assert.Equal(5681, a1.Port); + Assert.Equal(user, a1.User); + Assert.Equal(pass, a1.Password); + Assert.Equal(scheme, a1.Scheme); + + Amqp.Address a2 = connectionSettings.Addresses[2]; + Assert.Equal(host, a2.Host); + Assert.Equal(5691, a2.Port); + Assert.Equal(user, a2.User); + Assert.Equal(pass, a2.Password); + Assert.Equal(scheme, a2.Scheme); + } + + [Fact] + public void ConnectionSettingsViaBuilderWithUris() { - await Assert.ThrowsAsync(async () => - await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().VirtualHost("wrong_vhost").Build())); + const string scheme = "amqps"; + const string host = "rabbitmq-host.foo.baz.com"; + const string vhost = "/frazzle"; + const uint maxFrameSize = 1234; + string user = RandomString(10); + string pass = RandomString(10); + string containerId = RandomString(10); + + var uri0 = new Uri($"{scheme}://{user}:{pass}@{host}:5671/%2Ffrazzle"); + var uri1 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle"); + var uri2 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle"); + + IBackOffDelayPolicy backOffDelayPolicy = new TestBackoffDelayPolicy(); + IRecoveryConfiguration recoveryConfiguration = new TestRecoveryConfiguration(backOffDelayPolicy); + TlsSettings tlsSettings = new(SslProtocols.Tls12); + + List uris = [uri0, uri1, uri2]; + ConnectionSettingsBuilder connectionSettingsBuilder = ConnectionSettingsBuilder.Create() + .Uris(uris) + .ContainerId(containerId) + .SaslMechanism(SaslMechanism.Anonymous) + .RecoveryConfiguration(recoveryConfiguration) + .MaxFrameSize(maxFrameSize) + .TlsSettings(tlsSettings); + ConnectionSettings connectionSettings = connectionSettingsBuilder.Build(); - // TODO check inner exception is a SocketException - await Assert.ThrowsAnyAsync(async () => - await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().Host("wrong_host").Build())); + Assert.Same(recoveryConfiguration, connectionSettings.Recovery); + Assert.Same(backOffDelayPolicy, connectionSettings.Recovery.GetBackOffDelayPolicy()); + Assert.Same(tlsSettings, connectionSettings.TlsSettings); + Assert.True(connectionSettings.UseSsl); + Assert.Equal(SaslMechanism.Anonymous, connectionSettings.SaslMechanism); + Assert.Equal(containerId, connectionSettings.ContainerId); + Assert.Equal(maxFrameSize, connectionSettings.MaxFrameSize); + Assert.Equal(host, connectionSettings.Host); + Assert.Equal(5671, connectionSettings.Port); + Assert.Equal(user, connectionSettings.User); + Assert.Equal(pass, connectionSettings.Password); + Assert.Equal(vhost, connectionSettings.VirtualHost); + Assert.Equal(scheme, connectionSettings.Scheme); + + Assert.NotNull(connectionSettings.Addresses); + Amqp.Address a0 = connectionSettings.Addresses[0]; + Assert.Equal(host, a0.Host); + Assert.Equal(5671, a0.Port); + Assert.Equal(user, a0.User); + Assert.Equal(pass, a0.Password); + Assert.Equal(scheme, a0.Scheme); + + Amqp.Address a1 = connectionSettings.Addresses[1]; + Assert.Equal(host, a1.Host); + Assert.Equal(5681, a1.Port); + Assert.Equal(user, a1.User); + Assert.Equal(pass, a1.Password); + Assert.Equal(scheme, a1.Scheme); - await Assert.ThrowsAsync(async () => - await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().Password("wrong_password").Build())); + Amqp.Address a2 = connectionSettings.Addresses[2]; + Assert.Equal(host, a2.Host); + Assert.Equal(5691, a2.Port); + Assert.Equal(user, a2.User); + Assert.Equal(pass, a2.Password); + Assert.Equal(scheme, a2.Scheme); + } + + [Fact] + public void ConnectionSettingsWithEqualUris() + { + const string scheme = "amqps"; + const string host = "rabbitmq-host.foo.baz.com"; + string user = RandomString(10); + string pass = RandomString(10); + string containerId = RandomString(10); + + var uri0_0 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle"); + var uri0_1 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle"); + + var uri1_0 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle"); + var uri1_1 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle"); + + var uri2_0 = new Uri($"{scheme}://{user}:{pass}@{host}:5677/%2Ffrazzle"); + var uri2_1 = new Uri($"{scheme}://{user}:{pass}@{host}:5677/%2Ffrazzle"); + + List uris0 = [uri0_0, uri1_0, uri2_0]; + ConnectionSettingsBuilder connectionSettingsBuilder0 = ConnectionSettingsBuilder.Create() + .Uris(uris0) + .ContainerId(containerId); + ConnectionSettings connectionSettings0 = connectionSettingsBuilder0.Build(); + + List uris1 = [uri0_1, uri1_1, uri2_1]; + ConnectionSettingsBuilder connectionSettingsBuilder1 = ConnectionSettingsBuilder.Create() + .Uris(uris1) + .ContainerId(containerId); + ConnectionSettings connectionSettings1 = connectionSettingsBuilder1.Build(); + Assert.Equal(connectionSettings0, connectionSettings1); + Assert.Equal(connectionSettings0.GetHashCode(), connectionSettings1.GetHashCode()); + } + + [Fact] + public void ConnectionSettingsWithUrisNotEqual() + { + const string scheme = "amqps"; + const string host = "rabbitmq-host.foo.baz.com"; + string user = RandomString(10); + string pass = RandomString(10); + string containerId = RandomString(10); + + var uri0 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle"); + var uri1 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle"); + var uri2_0 = new Uri($"{scheme}://{user}:{pass}@{host}:5671/%2Ffrazzle"); + var uri2_1 = new Uri($"{scheme}://{user}:{pass}@{host}:5677/%2Ffrazzle"); + + List uris0 = [uri0, uri1, uri2_0]; + ConnectionSettingsBuilder connectionSettingsBuilder0 = ConnectionSettingsBuilder.Create() + .Uris(uris0) + .ContainerId(containerId); + ConnectionSettings connectionSettings0 = connectionSettingsBuilder0.Build(); + + List uris1 = [uri0, uri1, uri2_1]; + ConnectionSettingsBuilder connectionSettingsBuilder1 = ConnectionSettingsBuilder.Create() + .Uris(uris1) + .ContainerId(containerId); + ConnectionSettings connectionSettings1 = connectionSettingsBuilder1.Build(); + Assert.NotEqual(connectionSettings0, connectionSettings1); + Assert.NotEqual(connectionSettings0.GetHashCode(), connectionSettings1.GetHashCode()); + } + + [Fact] + public void ConnectionSettingsViaUrisThrowsWithDifferentVirtualHosts() + { + const string scheme = "amqps"; + const string host = "rabbitmq-host.foo.baz.com"; + string user = RandomString(10); + string pass = RandomString(10); + + var uri0 = new Uri($"{scheme}://{user}:{pass}@{host}:5671/foo"); + var uri1 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/bar"); + var uri2 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/foo"); + + List uris = [uri0, uri1, uri2]; + Assert.ThrowsAny(() => new ClusterConnectionSettings(uris)); + } + + [Fact] + public void BuilderThrowsWhenUriAndUrisBothSet() + { + const string scheme = "amqps"; + const string host = "rabbitmq-host.foo.baz.com"; + string user = RandomString(10); + string pass = RandomString(10); + string containerId = RandomString(10); + + var uri0 = new Uri($"{scheme}://{user}:{pass}@{host}:5671/%2Ffrazzle"); + var uri1 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle"); + var uri2 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle"); + + List uris = [uri0, uri1, uri2]; + Assert.ThrowsAny(() => + { + ConnectionSettingsBuilder.Create().Uri(uri0).Uris(uris); + }); + } + + private class TestBackoffDelayPolicy : IBackOffDelayPolicy + { + public int CurrentAttempt => 1; + public int Delay() => 1; + public bool IsActive() => true; + public void Reset() { } + } + + private class TestRecoveryConfiguration : IRecoveryConfiguration + { + private readonly IBackOffDelayPolicy _backOffDelayPolicy; - await Assert.ThrowsAsync(async () => - await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().User("wrong_user").Build())); + public TestRecoveryConfiguration(IBackOffDelayPolicy backOffDelayPolicy) + { + _backOffDelayPolicy = backOffDelayPolicy; + } - // TODO check inner exception is a SocketException - await Assert.ThrowsAnyAsync(async () => - await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().Port(1234).Build())); + public IRecoveryConfiguration Activated(bool activated) => this; + public IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy) => this; + public IBackOffDelayPolicy GetBackOffDelayPolicy() => _backOffDelayPolicy; + public bool IsActivated() => true; + public bool IsTopologyActive() => true; + public IRecoveryConfiguration Topology(bool activated) => this; } } diff --git a/Tests/ConnectionTests/ConnectionTests.cs b/Tests/ConnectionTests/ConnectionTests.cs index d81215e..0f6404e 100644 --- a/Tests/ConnectionTests/ConnectionTests.cs +++ b/Tests/ConnectionTests/ConnectionTests.cs @@ -12,6 +12,27 @@ namespace Tests.ConnectionTests; public class ConnectionTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { + [Fact] + public async Task RaiseErrorsIfTheParametersAreNotValid() + { + await Assert.ThrowsAsync(async () => + await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().VirtualHost("wrong_vhost").Build())); + + // TODO check inner exception is a SocketException + await Assert.ThrowsAnyAsync(async () => + await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().Host("wrong_host").Build())); + + await Assert.ThrowsAsync(async () => + await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().Password("wrong_password").Build())); + + await Assert.ThrowsAsync(async () => + await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().User("wrong_user").Build())); + + // TODO check inner exception is a SocketException + await Assert.ThrowsAnyAsync(async () => + await AmqpConnection.CreateAsync(ConnectionSettingsBuilder.Create().Port(1234).Build())); + } + [Fact] public async Task ThrowAmqpClosedExceptionWhenItemIsClosed() {