Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 320 additions & 59 deletions RabbitMQ.AMQP.Client/ConnectionSettings.cs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions RabbitMQ.AMQP.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ public static class Consts
/// <code>uint.MinValue</code> means "no limit"
/// </summary>
public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024

/// <summary>
/// The default virtual host, <c>/</c>
/// </summary>
public const string DefaultVirtualHost = "/";
}
}
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/IEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface IEnvironment
/// Close all connections.
/// </summary>
/// <returns></returns>
// TODO cancellation token
Task CloseAsync();
}
}
24 changes: 6 additions & 18 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly AmqpManagement _management;
private readonly RecordingTopologyListener _recordingTopologyListener = new();

internal readonly ConnectionSettings _connectionSettings;
private readonly ConnectionSettings _connectionSettings;
private readonly IMetricsReporter? _metricsReporter;

// TODO this is coupled with publishers and consumers
internal readonly AmqpSessionManagement _nativePubSubSessions;

private readonly Dictionary<string, object> _connectionProperties = new();
Expand Down Expand Up @@ -350,23 +352,9 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened)

try
{
ConnectionSettings connectionSettings;
if (_connectionSettings is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException(
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
else
{
// TODO
// There is absolutely NO POINT in having an interface if this
// is what will be done!
connectionSettings = (ConnectionSettings)_connectionSettings;
Address address = connectionSettings.Address;
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
.ConfigureAwait(false);
}
Address address = _connectionSettings.Address;
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
.ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
6 changes: 5 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public Task<IConnection> CreateConnectionAsync()
public ReadOnlyCollection<IConnection> GetConnections() =>
new(_connections.Values.ToList());

public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
// TODO cancellation token
public Task CloseAsync()
{
return Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
}
}
}
23 changes: 20 additions & 3 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Thre
abstract RabbitMQ.AMQP.Client.Impl.StreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
const RabbitMQ.AMQP.Client.Consts.Bindings = "bindings" -> string!
const RabbitMQ.AMQP.Client.Consts.DefaultMaxFrameSize = 0 -> uint
const RabbitMQ.AMQP.Client.Consts.DefaultVirtualHost = "/" -> string!
const RabbitMQ.AMQP.Client.Consts.Exchanges = "exchanges" -> string!
const RabbitMQ.AMQP.Client.Consts.Key = "key" -> string!
const RabbitMQ.AMQP.Client.Consts.Messages = "messages" -> string!
const RabbitMQ.AMQP.Client.Consts.Queues = "queues" -> string!
const RabbitMQ.AMQP.Client.MetricsReporter.MeterName = "RabbitMQ.Amqp" -> string!
const RabbitMQ.AMQP.Client.MetricsReporter.MetricPrefix = "rabbitmq.amqp" -> string!
override RabbitMQ.AMQP.Client.BackOffDelayPolicy.ToString() -> string!
override RabbitMQ.AMQP.Client.ClusterConnectionSettings.Equals(object? obj) -> bool
override RabbitMQ.AMQP.Client.ClusterConnectionSettings.GetHashCode() -> int
override RabbitMQ.AMQP.Client.ConnectionSettings.Equals(object? obj) -> bool
override RabbitMQ.AMQP.Client.ConnectionSettings.GetHashCode() -> int
override RabbitMQ.AMQP.Client.ConnectionSettings.ToString() -> string!
Expand Down Expand Up @@ -53,12 +56,15 @@ RabbitMQ.AMQP.Client.ClassicQueueMode.Lazy = 1 -> RabbitMQ.AMQP.Client.ClassicQu
RabbitMQ.AMQP.Client.ClassicQueueVersion
RabbitMQ.AMQP.Client.ClassicQueueVersion.V1 = 0 -> RabbitMQ.AMQP.Client.ClassicQueueVersion
RabbitMQ.AMQP.Client.ClassicQueueVersion.V2 = 1 -> RabbitMQ.AMQP.Client.ClassicQueueVersion
RabbitMQ.AMQP.Client.ClusterConnectionSettings
RabbitMQ.AMQP.Client.ClusterConnectionSettings.ClusterConnectionSettings(System.Collections.Generic.IEnumerable<System.Uri!>! uris, RabbitMQ.AMQP.Client.IUriSelector? uriSelector = null, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.ConnectionException
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message) -> void
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message, System.Exception! innerException) -> void
RabbitMQ.AMQP.Client.ConnectionSettings
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! containerId, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration, uint maxFrameSize = 0, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user = null, string? password = null, string? virtualHost = null, string! containerId = "", RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ContainerId.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings.Host.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings.MaxFrameSize.get -> uint
Expand All @@ -69,10 +75,11 @@ RabbitMQ.AMQP.Client.ConnectionSettings.Recovery.get -> RabbitMQ.AMQP.Client.IRe
RabbitMQ.AMQP.Client.ConnectionSettings.SaslMechanism.get -> RabbitMQ.AMQP.Client.SaslMechanism!
RabbitMQ.AMQP.Client.ConnectionSettings.Scheme.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings.TlsSettings.get -> RabbitMQ.AMQP.Client.TlsSettings?
RabbitMQ.AMQP.Client.ConnectionSettings.Uris.get -> System.Collections.Generic.IEnumerable<System.Uri!>?
RabbitMQ.AMQP.Client.ConnectionSettings.User.get -> string?
RabbitMQ.AMQP.Client.ConnectionSettings.UseSsl.get -> bool
RabbitMQ.AMQP.Client.ConnectionSettings.VirtualHost.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings._address -> Amqp.Address!
RabbitMQ.AMQP.Client.ConnectionSettings._virtualHost -> string!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Build() -> RabbitMQ.AMQP.Client.ConnectionSettings!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.ConnectionSettingsBuilder() -> void
Expand All @@ -84,7 +91,10 @@ RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Port(int port) -> RabbitMQ.AMQP.C
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.SaslMechanism(RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Scheme(string! scheme) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.TlsSettings(RabbitMQ.AMQP.Client.TlsSettings! tlsSettings) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uri(System.Uri! uri) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uris(System.Collections.Generic.IEnumerable<System.Uri!>! uris) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.UriSelector(RabbitMQ.AMQP.Client.IUriSelector! uriSelector) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.User(string! user) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.VirtualHost(string! virtualHost) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.Consts
Expand Down Expand Up @@ -668,6 +678,8 @@ RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterS
RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
RabbitMQ.AMQP.Client.IUriSelector
RabbitMQ.AMQP.Client.IUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
RabbitMQ.AMQP.Client.LifeCycleCallBack
RabbitMQ.AMQP.Client.MessageHandler
RabbitMQ.AMQP.Client.MetricsReporter
Expand Down Expand Up @@ -711,6 +723,9 @@ RabbitMQ.AMQP.Client.QueueType.STREAM = 2 -> RabbitMQ.AMQP.Client.QueueType
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
RabbitMQ.AMQP.Client.RandomUriSelector
RabbitMQ.AMQP.Client.RandomUriSelector.RandomUriSelector() -> void
RabbitMQ.AMQP.Client.RandomUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
RabbitMQ.AMQP.Client.RecoveryConfiguration
RabbitMQ.AMQP.Client.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
RabbitMQ.AMQP.Client.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
Expand Down Expand Up @@ -752,6 +767,8 @@ static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Cli
static RabbitMQ.AMQP.Client.ByteCapacity.Kb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
static RabbitMQ.AMQP.Client.ByteCapacity.Mb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
static RabbitMQ.AMQP.Client.ByteCapacity.Tb(long terabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
static RabbitMQ.AMQP.Client.ConnectionSettings.ProcessUriSegmentsForVirtualHost(System.Uri! uri) -> string!
static RabbitMQ.AMQP.Client.ConnectionSettings.ProcessUserInfo(System.Uri! uri) -> (string? user, string? password)
static RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Create() -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
static RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper.AddressBuilder() -> RabbitMQ.AMQP.Client.Impl.AddressBuilder!
static RabbitMQ.AMQP.Client.Impl.AmqpConnection.CreateAsync(RabbitMQ.AMQP.Client.ConnectionSettings! connectionSettings, RabbitMQ.AMQP.Client.IMetricsReporter? metricsReporter = null) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConnection!>!
Expand Down
13 changes: 13 additions & 0 deletions RabbitMQ.AMQP.Client/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,19 @@ internal static bool CompareMap(Map map1, Map map2)
return true;
}

internal static bool IsValidScheme(string scheme)
{
if (scheme.Equals("amqp", StringComparison.InvariantCultureIgnoreCase) ||
scheme.Equals("amqps", StringComparison.InvariantCultureIgnoreCase))
{
return true;
}
else
{
return false;
}
}

internal static void ValidateMessageAnnotations(Dictionary<string, object> annotations)
{
foreach (KeyValuePair<string, object> kvp in annotations)
Expand Down
19 changes: 10 additions & 9 deletions Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -15,7 +16,7 @@ public class ClusterTests(ITestOutputHelper testOutputHelper)
: IntegrationTest(testOutputHelper, setupConnectionAndManagement: false)
{
[SkippableFact]
public Task CreateConnectionWithEnvironmentAndMultipleUris()
public async Task CreateConnectionWithEnvironmentAndMultipleUris()
{
Skip.IfNot(IsCluster);

Expand All @@ -31,16 +32,16 @@ public Task CreateConnectionWithEnvironmentAndMultipleUris()
connectionSettingBuilder.Uris(uris);
ConnectionSettings connectionSettings = connectionSettingBuilder.Build();

/*
IEnvironment env = AmqpEnvironment.Create(ConnectionSettingBuilder.Create().Build());
IConnection connection = await env.CreateConnectionAsync();
Assert.NotNull(connection);
IEnvironment env = AmqpEnvironment.Create(connectionSettings);

// Note: by using _connection, the test will dispose the object on teardown
_connection = await env.CreateConnectionAsync();
Assert.NotNull(_connection);
Assert.NotEmpty(env.GetConnections());

await env.CloseAsync();
Assert.Equal(State.Closed, connection.State);
Assert.Empty(env.GetConnections());
*/

return Task.CompletedTask;
Assert.Equal(State.Closed, _connection.State);
Assert.Empty(env.GetConnections());
}
}
Loading
Loading