diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh
index fefb6061..ab369a8e 100755
--- a/.ci/ubuntu/cluster/gha-setup.sh
+++ b/.ci/ubuntu/cluster/gha-setup.sh
@@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}
-readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
+readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
if [[ ! -v GITHUB_ACTIONS ]]
then
diff --git a/.ci/ubuntu/cluster/rmq/Dockerfile b/.ci/ubuntu/cluster/rmq/Dockerfile
index f9b8a9f2..6ddf9508 100644
--- a/.ci/ubuntu/cluster/rmq/Dockerfile
+++ b/.ci/ubuntu/cluster/rmq/Dockerfile
@@ -1,4 +1,4 @@
-ARG RABBITMQ_DOCKER_TAG=pivotalrabbitmq/rabbitmq:main
+ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.1.0-beta.4-management-alpine
FROM ${RABBITMQ_DOCKER_TAG}
diff --git a/.ci/ubuntu/cluster/rmq/advanced.config b/.ci/ubuntu/cluster/rmq/advanced.config
index f7da6b6a..6a147313 100644
--- a/.ci/ubuntu/cluster/rmq/advanced.config
+++ b/.ci/ubuntu/cluster/rmq/advanced.config
@@ -1,3 +1,14 @@
[
- {kernel, [{net_ticktime, 15}]}
-].
+ {kernel, [{net_ticktime, 15}]},
+ {rabbitmq_auth_backend_oauth2, [{key_config,
+ [{signing_keys,
+ #{<<"token-key">> =>
+ {map,
+ #{<<"alg">> => <<"HS256">>,
+ <<"k">> => <<"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH">>,
+ <<"kid">> => <<"token-key">>,
+ <<"kty">> => <<"oct">>,
+ <<"use">> => <<"sig">>,
+ <<"value">> => <<"token-key">>}}}}]},
+ {resource_server_id,<<"rabbitmq">>}]}
+].
\ No newline at end of file
diff --git a/.ci/ubuntu/cluster/rmq/enabled_plugins b/.ci/ubuntu/cluster/rmq/enabled_plugins
index b6b496a2..1f453bbd 100644
--- a/.ci/ubuntu/cluster/rmq/enabled_plugins
+++ b/.ci/ubuntu/cluster/rmq/enabled_plugins
@@ -1 +1 @@
-[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_top].
+[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top,rabbitmq_auth_backend_oauth2].
diff --git a/.ci/ubuntu/cluster/rmq/rabbitmq.conf b/.ci/ubuntu/cluster/rmq/rabbitmq.conf
index 5e13dcbe..18518fa3 100644
--- a/.ci/ubuntu/cluster/rmq/rabbitmq.conf
+++ b/.ci/ubuntu/cluster/rmq/rabbitmq.conf
@@ -28,3 +28,7 @@ cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.0 = rabbit@rmq0.local
cluster_formation.classic_config.nodes.1 = rabbit@rmq1.local
cluster_formation.classic_config.nodes.2 = rabbit@rmq2.local
+
+
+auth_backends.1 = internal
+auth_backends.2 = rabbit_auth_backend_oauth2
\ No newline at end of file
diff --git a/.ci/ubuntu/one-node/advanced.config b/.ci/ubuntu/one-node/advanced.config
new file mode 100644
index 00000000..fd7d0fd9
--- /dev/null
+++ b/.ci/ubuntu/one-node/advanced.config
@@ -0,0 +1,13 @@
+[
+ {rabbitmq_auth_backend_oauth2, [{key_config,
+ [{signing_keys,
+ #{<<"token-key">> =>
+ {map,
+ #{<<"alg">> => <<"HS256">>,
+ <<"k">> => <<"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH">>,
+ <<"kid">> => <<"token-key">>,
+ <<"kty">> => <<"oct">>,
+ <<"use">> => <<"sig">>,
+ <<"value">> => <<"token-key">>}}}}]},
+ {resource_server_id,<<"rabbitmq">>}]}
+].
\ No newline at end of file
diff --git a/.ci/ubuntu/one-node/enabled_plugins b/.ci/ubuntu/one-node/enabled_plugins
index 2e81f164..1f453bbd 100644
--- a/.ci/ubuntu/one-node/enabled_plugins
+++ b/.ci/ubuntu/one-node/enabled_plugins
@@ -1 +1 @@
-[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top].
+[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top,rabbitmq_auth_backend_oauth2].
diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh
index bbcbcaa2..a18b6dc1 100755
--- a/.ci/ubuntu/one-node/gha-setup.sh
+++ b/.ci/ubuntu/one-node/gha-setup.sh
@@ -8,12 +8,9 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
-if [[ $3 == 'arm' ]]
-then
- readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
-else
- readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
-fi
+
+readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
+
readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'
@@ -92,6 +89,7 @@ function start_rabbitmq
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/one-node/enabled_plugins:/etc/rabbitmq/enabled_plugins" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/one-node/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \
--volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \
+ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/one-node/advanced.config:/etc/rabbitmq/advanced.config:ro" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \
"$rabbitmq_image"
}
diff --git a/.ci/ubuntu/one-node/rabbitmq.conf b/.ci/ubuntu/one-node/rabbitmq.conf
index 829dcec9..6f2af2bc 100644
--- a/.ci/ubuntu/one-node/rabbitmq.conf
+++ b/.ci/ubuntu/one-node/rabbitmq.conf
@@ -24,3 +24,6 @@ ssl_options.fail_if_no_peer_cert = false
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = ANONYMOUS
auth_mechanisms.3 = EXTERNAL
+
+auth_backends.1 = internal
+auth_backends.2 = rabbit_auth_backend_oauth2
diff --git a/.ci/windows/advanced.config b/.ci/windows/advanced.config
new file mode 100644
index 00000000..fd7d0fd9
--- /dev/null
+++ b/.ci/windows/advanced.config
@@ -0,0 +1,13 @@
+[
+ {rabbitmq_auth_backend_oauth2, [{key_config,
+ [{signing_keys,
+ #{<<"token-key">> =>
+ {map,
+ #{<<"alg">> => <<"HS256">>,
+ <<"k">> => <<"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH">>,
+ <<"kid">> => <<"token-key">>,
+ <<"kty">> => <<"oct">>,
+ <<"use">> => <<"sig">>,
+ <<"value">> => <<"token-key">>}}}}]},
+ {resource_server_id,<<"rabbitmq">>}]}
+].
\ No newline at end of file
diff --git a/.ci/windows/enabled_plugins b/.ci/windows/enabled_plugins
index 2e81f164..1f453bbd 100644
--- a/.ci/windows/enabled_plugins
+++ b/.ci/windows/enabled_plugins
@@ -1 +1 @@
-[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top].
+[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top,rabbitmq_auth_backend_oauth2].
diff --git a/.ci/windows/gha-setup.ps1 b/.ci/windows/gha-setup.ps1
index 7e6e44b0..3aa22418 100644
--- a/.ci/windows/gha-setup.ps1
+++ b/.ci/windows/gha-setup.ps1
@@ -20,6 +20,11 @@ New-Variable -Name ca_certificate_file -Option Constant -Value `
New-Variable -Name enabled_plugins_file -Option Constant -Value `
(Resolve-Path -LiteralPath (Join-Path -Path $ci_windows_dir -ChildPath 'enabled_plugins'))
+New-Variable -Name advanced_config_file -Option Constant -Value `
+ (Resolve-Path -LiteralPath (Join-Path -Path $ci_windows_dir -ChildPath 'advanced.config'))
+
+
+
Write-Host "[INFO] importing CA cert from '$ca_certificate_file'"
Import-Certificate -Verbose -CertStoreLocation Cert:\LocalMachine\Root -FilePath $ca_certificate_file
@@ -145,6 +150,7 @@ $rabbitmq_appdata_dir = Join-Path -Path $env:AppData -ChildPath 'RabbitMQ'
New-Item -Path $rabbitmq_appdata_dir -ItemType Directory
$rabbitmq_conf_file = Join-Path -Path $rabbitmq_appdata_dir -ChildPath 'rabbitmq.conf'
$rabbitmq_enabled_plugins_file = Join-Path -Path $rabbitmq_appdata_dir -ChildPath 'enabled_plugins'
+$rabbitmq_advanced_config_file = Join-Path -Path $rabbitmq_appdata_dir -ChildPath 'advanced.config'
Write-Host "[INFO] Creating RabbitMQ configuration file in '$rabbitmq_appdata_dir'"
Get-Content $rabbitmq_conf_in_file | %{ $_ -replace '@@CERTS_DIR@@', $certs_dir } | %{ $_ -replace '\\', '/' } | Set-Content -LiteralPath $rabbitmq_conf_file
@@ -153,6 +159,10 @@ Get-Content $rabbitmq_conf_file
Write-Host "[INFO] Copying '$enabled_plugins_file' to '$rabbitmq_enabled_plugins_file'"
Copy-Item -Verbose -Force -LiteralPath $enabled_plugins_file -Destination $rabbitmq_enabled_plugins_file
+Write-Host "[INFO] Copying '$advanced_config_file' to '$rabbitmq_advanced_config_file'"
+Copy-Item -Verbose -Force -LiteralPath $advanced_config_file -Destination $rabbitmq_advanced_config_file
+
+
Write-Host '[INFO] Creating Erlang cookie files...'
function Set-ErlangCookie
diff --git a/.ci/windows/versions.json b/.ci/windows/versions.json
index e2506775..088c055d 100644
--- a/.ci/windows/versions.json
+++ b/.ci/windows/versions.json
@@ -1,4 +1,4 @@
{
"erlang": "27.2",
- "rabbitmq": "4.0.5"
+ "rabbitmq": "4.1.0-beta.4"
}
diff --git a/Directory.Packages.props b/Directory.Packages.props
index ad65a670..e0551b25 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -13,6 +13,7 @@
+
@@ -39,4 +40,4 @@
-
+
\ No newline at end of file
diff --git a/RabbitMQ.AMQP.Client/ConnectionSettings.cs b/RabbitMQ.AMQP.Client/ConnectionSettings.cs
index df71ea6e..7abb4d14 100644
--- a/RabbitMQ.AMQP.Client/ConnectionSettings.cs
+++ b/RabbitMQ.AMQP.Client/ConnectionSettings.cs
@@ -41,6 +41,7 @@ public class ConnectionSettingsBuilder
private Uri? _uri;
private List? _uris;
private IUriSelector? _uriSelector;
+ private OAuth2Options? _oAuth2Options;
public static ConnectionSettingsBuilder Create()
{
@@ -104,6 +105,7 @@ public ConnectionSettingsBuilder MaxFrameSize(uint maxFrameSize)
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
"maxFrameSize must be 0 (no limit) or greater than or equal to 512");
}
+
return this;
}
@@ -152,6 +154,12 @@ public ConnectionSettingsBuilder UriSelector(IUriSelector uriSelector)
return this;
}
+ public ConnectionSettingsBuilder OAuth2Options(OAuth2Options? oAuth2Options)
+ {
+ _oAuth2Options = oAuth2Options;
+ return this;
+ }
+
public ConnectionSettings Build()
{
// TODO this should do something similar to consolidate in the Java code
@@ -162,26 +170,26 @@ public ConnectionSettings Build()
_containerId, _saslMechanism,
_recoveryConfiguration,
_maxFrameSize,
- _tlsSettings);
+ _tlsSettings,
+ _oAuth2Options);
}
- else if (_uris is not null)
+
+ if (_uris is not null)
{
return new ClusterConnectionSettings(_uris,
_uriSelector,
_containerId, _saslMechanism,
_recoveryConfiguration,
_maxFrameSize,
- _tlsSettings);
- }
- else
- {
- return new ConnectionSettings(_scheme, _host, _port, _user,
- _password, _virtualHost,
- _containerId, _saslMechanism,
- _recoveryConfiguration,
- _maxFrameSize,
- _tlsSettings);
+ _tlsSettings, _oAuth2Options);
}
+
+ return new ConnectionSettings(_scheme, _host, _port, _user,
+ _password, _virtualHost,
+ _containerId, _saslMechanism,
+ _recoveryConfiguration,
+ _maxFrameSize,
+ _tlsSettings, _oAuth2Options);
}
private void ValidateUris()
@@ -200,10 +208,11 @@ public class ConnectionSettings : IEquatable
{
protected Address _address = new("amqp://localhost:5672");
protected string _virtualHost = Consts.DefaultVirtualHost;
+ private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain;
+ private readonly OAuth2Options? _oAuth2Options = null;
private readonly string _containerId = string.Empty;
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
private readonly TlsSettings? _tlsSettings;
- private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain;
private readonly IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration();
public ConnectionSettings(Uri uri,
@@ -211,8 +220,9 @@ public ConnectionSettings(Uri uri,
SaslMechanism? saslMechanism = null,
IRecoveryConfiguration? recoveryConfiguration = null,
uint? maxFrameSize = null,
- TlsSettings? tlsSettings = null)
- : this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings)
+ TlsSettings? tlsSettings = null,
+ OAuth2Options? oAuth2Options = null)
+ : this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings, oAuth2Options)
{
(string? user, string? password) = ProcessUserInfo(uri);
@@ -224,14 +234,28 @@ public ConnectionSettings(Uri uri,
throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp' or 'amqps'");
}
- _address = new Address(host: uri.Host,
- port: uri.Port,
+ _address = InitAddress(uri.Host, uri.Port, user, password, scheme);
+ _tlsSettings = InitTlsSettings();
+ }
+
+ protected Address InitAddress(string host, int port, string? user, string? password, string scheme)
+ {
+ if (_oAuth2Options is not null)
+ {
+ return new Address(host, port, "", _oAuth2Options.Token, "/", scheme);
+ }
+
+ return new Address(host,
+ port: port,
user: user,
password: password,
path: "/",
scheme: scheme);
+ }
- _tlsSettings = InitTlsSettings();
+ internal void UpdateOAuthPassword(string? password)
+ {
+ _address = new Address(_address.Host, _address.Port, _address.User, password, _address.Path, _address.Scheme);
}
public ConnectionSettings(string scheme,
@@ -244,21 +268,16 @@ public ConnectionSettings(string scheme,
SaslMechanism? saslMechanism = null,
IRecoveryConfiguration? recoveryConfiguration = null,
uint? maxFrameSize = null,
- TlsSettings? tlsSettings = null)
- : this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings)
+ TlsSettings? tlsSettings = null,
+ OAuth2Options? oAuth2Options = null)
+ : this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings, oAuth2Options)
{
if (false == Utils.IsValidScheme(scheme))
{
throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp' or 'amqps'");
}
- _address = new Address(host: host,
- port: port,
- user: user,
- password: password,
- path: "/",
- scheme: scheme);
-
+ _address = InitAddress(host, port, user, password, scheme);
if (virtualHost is not null)
{
_virtualHost = virtualHost;
@@ -272,7 +291,8 @@ protected ConnectionSettings(
SaslMechanism? saslMechanism = null,
IRecoveryConfiguration? recoveryConfiguration = null,
uint? maxFrameSize = null,
- TlsSettings? tlsSettings = null)
+ TlsSettings? tlsSettings = null,
+ OAuth2Options? oAuth2Options = null)
{
if (containerId is not null)
{
@@ -284,6 +304,13 @@ protected ConnectionSettings(
_saslMechanism = saslMechanism;
}
+ if (oAuth2Options is not null)
+ {
+ // If OAuth2Options is set, then SaslMechanism must be Plain
+ _oAuth2Options = oAuth2Options;
+ _saslMechanism = SaslMechanism.Plain;
+ }
+
if (recoveryConfiguration is not null)
{
_recoveryConfiguration = recoveryConfiguration;
@@ -408,7 +435,8 @@ protected static string ProcessUriSegmentsForVirtualHost(Uri uri)
// that has at least the path segment "/"
if (uri.Segments.Length > 2)
{
- throw new ArgumentException($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}");
+ throw new ArgumentException(
+ $"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}");
}
if (uri.Segments.Length == 2)
@@ -454,8 +482,9 @@ public ClusterConnectionSettings(IEnumerable uris,
SaslMechanism? saslMechanism = null,
IRecoveryConfiguration? recoveryConfiguration = null,
uint? maxFrameSize = null,
- TlsSettings? tlsSettings = null)
- : base(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings)
+ TlsSettings? tlsSettings = null,
+ OAuth2Options? oAuth2Options = null)
+ : base(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings, oAuth2Options)
{
_uris = uris.ToList();
if (_uris.Count == 0)
@@ -463,7 +492,7 @@ public ClusterConnectionSettings(IEnumerable uris,
throw new ArgumentOutOfRangeException(nameof(uris), "At least one Uri is required.");
}
- _uriToAddress = new(_uris.Count);
+ _uriToAddress = new Dictionary(_uris.Count);
if (uriSelector is not null)
{
@@ -492,16 +521,12 @@ public ClusterConnectionSettings(IEnumerable uris,
string thisVirtualHost = ProcessUriSegmentsForVirtualHost(uri);
if (false == thisVirtualHost.Equals(tmpVirtualHost, StringComparison.InvariantCultureIgnoreCase))
{
- throw new ArgumentException($"All AMQP URIs must use the same virtual host. Expected '{tmpVirtualHost}', got '{thisVirtualHost}'");
+ throw new ArgumentException(
+ $"All AMQP URIs must use the same virtual host. Expected '{tmpVirtualHost}', got '{thisVirtualHost}'");
}
}
- var address = new Address(host: uri.Host,
- port: uri.Port,
- user: user,
- password: password,
- path: "/",
- scheme: scheme);
+ var address = InitAddress(uri.Host, uri.Port, user, password, scheme);
_uriToAddress[uri] = address;
@@ -551,6 +576,7 @@ public override int GetHashCode()
{
hashCode ^= _uris[i].GetHashCode();
}
+
return hashCode;
}
@@ -578,7 +604,7 @@ public TlsSettings() : this(DefaultSslProtocols)
public TlsSettings(SslProtocols protocols)
{
Protocols = protocols;
- RemoteCertificateValidationCallback = trustEverythingCertValidationCallback;
+ RemoteCertificateValidationCallback = TrustEverythingCertValidationCallback;
LocalCertificateSelectionCallback = null;
}
@@ -594,10 +620,20 @@ public TlsSettings(SslProtocols protocols)
public LocalCertificateSelectionCallback? LocalCertificateSelectionCallback { get; set; }
- private bool trustEverythingCertValidationCallback(object sender, X509Certificate? certificate,
+ private bool TrustEverythingCertValidationCallback(object sender, X509Certificate? certificate,
X509Chain? chain, SslPolicyErrors sslPolicyErrors)
{
return (sslPolicyErrors & ~AcceptablePolicyErrors) == SslPolicyErrors.None;
}
}
+
+ public class OAuth2Options
+ {
+ public OAuth2Options(string token)
+ {
+ Token = token;
+ }
+
+ public string Token { get; set; }
+ }
}
diff --git a/RabbitMQ.AMQP.Client/IConnection.cs b/RabbitMQ.AMQP.Client/IConnection.cs
index 2c67b2b6..2837c60e 100644
--- a/RabbitMQ.AMQP.Client/IConnection.cs
+++ b/RabbitMQ.AMQP.Client/IConnection.cs
@@ -3,6 +3,7 @@
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
using System.Collections.Generic;
+using System.Threading.Tasks;
namespace RabbitMQ.AMQP.Client
{
@@ -60,5 +61,7 @@ public interface IConnection : ILifeCycle
/// Get or set the Connection ID. Used by
///
public long Id { get; set; }
+
+ public Task RefreshTokenAsync(string token);
}
}
diff --git a/RabbitMQ.AMQP.Client/IManagement.cs b/RabbitMQ.AMQP.Client/IManagement.cs
index 38cd6eca..37169903 100644
--- a/RabbitMQ.AMQP.Client/IManagement.cs
+++ b/RabbitMQ.AMQP.Client/IManagement.cs
@@ -2,6 +2,7 @@
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+using System.Data;
using System.Threading;
using System.Threading.Tasks;
@@ -61,6 +62,7 @@ Task GetQueueInfoAsync(string queueName,
///
/// A builder for
IBindingSpecification Binding();
+
}
internal interface IManagementTopology
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
index 74ed9fdb..88990430 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
@@ -6,6 +6,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
@@ -164,6 +165,22 @@ public IEnumerable Consumers
///
public long Id { get; set; }
+ ///
+ /// Refresh the OAuth token and update the connection settings.
+ ///
+ /// OAuth token
+ public async Task RefreshTokenAsync(string token)
+ {
+ // here we use the primitive RequestAsync method because we don't want to
+ // expose this method in the IManagement interface
+ // we need to update the connection settings with the new token
+ int[] expectedResponseCodes = { AmqpManagement.Code204 };
+ _ = await _management.RequestAsync(Encoding.ASCII.GetBytes(token),
+ AmqpManagement.AuthTokens, AmqpManagement.Put, expectedResponseCodes)
+ .ConfigureAwait(false);
+ _connectionSettings.UpdateOAuthPassword(token);
+ }
+
// TODO cancellation token
public override async Task OpenAsync()
{
@@ -249,7 +266,8 @@ internal void AddPublisher(Guid id, IPublisher consumer)
if (false == _publishersDict.TryAdd(id, consumer))
{
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("could not add publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "could not add publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
@@ -258,7 +276,8 @@ internal void RemovePublisher(Guid id)
if (false == _publishersDict.TryRemove(id, out _))
{
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("could not remove publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "could not remove publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
@@ -268,7 +287,8 @@ internal void AddConsumer(Guid id, IConsumer consumer)
if (false == _consumersDict.TryAdd(id, consumer))
{
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("could not add consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "could not add consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
@@ -277,7 +297,8 @@ internal void RemoveConsumer(Guid id)
if (false == _consumersDict.TryRemove(id, out _))
{
// TODO create "internal bug" exception type?
- throw new InvalidOperationException("could not remove consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
+ throw new InvalidOperationException(
+ "could not remove consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
@@ -639,6 +660,7 @@ private void HandleProperties(Fields properties)
{
value = (string)kvp.Value;
}
+
_connectionProperties[key] = value;
}
@@ -647,6 +669,7 @@ private void HandleProperties(Fields properties)
{
// TODO Java client throws exception here
}
+
_areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion);
}
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
index 43d5f841..3adc5edc 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
@@ -5,6 +5,7 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
@@ -46,6 +47,7 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
internal const string Post = "POST";
internal const string Delete = "DELETE";
private const string ReplyTo = "$me";
+ internal const string AuthTokens = "/auth/tokens";
protected readonly TaskCompletionSource _managementSessionClosedTcs =
Utils.CreateTaskCompletionSource();
diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
index 7b38b663..fa7564a7 100644
--- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
@@ -59,16 +59,17 @@ RabbitMQ.AMQP.Client.ClassicQueueVersion
RabbitMQ.AMQP.Client.ClassicQueueVersion.V1 = 0 -> RabbitMQ.AMQP.Client.ClassicQueueVersion
RabbitMQ.AMQP.Client.ClassicQueueVersion.V2 = 1 -> RabbitMQ.AMQP.Client.ClassicQueueVersion
RabbitMQ.AMQP.Client.ClusterConnectionSettings
-RabbitMQ.AMQP.Client.ClusterConnectionSettings.ClusterConnectionSettings(System.Collections.Generic.IEnumerable! uris, RabbitMQ.AMQP.Client.IUriSelector? uriSelector = null, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
+RabbitMQ.AMQP.Client.ClusterConnectionSettings.ClusterConnectionSettings(System.Collections.Generic.IEnumerable! uris, RabbitMQ.AMQP.Client.IUriSelector? uriSelector = null, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null, RabbitMQ.AMQP.Client.OAuth2Options? oAuth2Options = null) -> void
RabbitMQ.AMQP.Client.ConnectionException
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message) -> void
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message, System.Exception! innerException) -> void
RabbitMQ.AMQP.Client.ConnectionSettings
-RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user = null, string? password = null, string? virtualHost = null, string! containerId = "", RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
-RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
-RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
+RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user = null, string? password = null, string? virtualHost = null, string! containerId = "", RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null, RabbitMQ.AMQP.Client.OAuth2Options? oAuth2Options = null) -> void
+RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null, RabbitMQ.AMQP.Client.OAuth2Options? oAuth2Options = null) -> void
+RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null, RabbitMQ.AMQP.Client.OAuth2Options? oAuth2Options = null) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ContainerId.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings.Host.get -> string!
+RabbitMQ.AMQP.Client.ConnectionSettings.InitAddress(string! host, int port, string? user, string? password, string! scheme) -> Amqp.Address!
RabbitMQ.AMQP.Client.ConnectionSettings.MaxFrameSize.get -> uint
RabbitMQ.AMQP.Client.ConnectionSettings.Password.get -> string?
RabbitMQ.AMQP.Client.ConnectionSettings.Path.get -> string!
@@ -88,6 +89,7 @@ RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.ConnectionSettingsBuilder() -> vo
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.ContainerId(string! containerId) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Host(string! host) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.MaxFrameSize(uint maxFrameSize) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
+RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.OAuth2Options(RabbitMQ.AMQP.Client.OAuth2Options? oAuth2Options) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Password(string! password) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Port(int port) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
@@ -154,6 +156,7 @@ RabbitMQ.AMQP.Client.IConnection.Management() -> RabbitMQ.AMQP.Client.IManagemen
RabbitMQ.AMQP.Client.IConnection.Properties.get -> System.Collections.Generic.IReadOnlyDictionary!
RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
RabbitMQ.AMQP.Client.IConnection.Publishers.get -> System.Collections.Generic.IEnumerable!
+RabbitMQ.AMQP.Client.IConnection.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
RabbitMQ.AMQP.Client.IConsumer
@@ -342,6 +345,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Management() -> RabbitMQ.AMQP.Client.IM
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Properties.get -> System.Collections.Generic.IReadOnlyDictionary!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Publishers.get -> System.Collections.Generic.IEnumerable!
+RabbitMQ.AMQP.Client.Impl.AmqpConnection.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
@@ -698,6 +702,10 @@ RabbitMQ.AMQP.Client.MetricsReporter.PublisherClosed() -> void
RabbitMQ.AMQP.Client.MetricsReporter.PublisherOpened() -> void
RabbitMQ.AMQP.Client.ModelException
RabbitMQ.AMQP.Client.ModelException.ModelException(string! message) -> void
+RabbitMQ.AMQP.Client.OAuth2Options
+RabbitMQ.AMQP.Client.OAuth2Options.OAuth2Options(string! token) -> void
+RabbitMQ.AMQP.Client.OAuth2Options.Token.get -> string!
+RabbitMQ.AMQP.Client.OAuth2Options.Token.set -> void
RabbitMQ.AMQP.Client.OutcomeState
RabbitMQ.AMQP.Client.OutcomeState.Accepted = 0 -> RabbitMQ.AMQP.Client.OutcomeState
RabbitMQ.AMQP.Client.OutcomeState.Rejected = 1 -> RabbitMQ.AMQP.Client.OutcomeState
diff --git a/Tests/ConnectionRecoveryTests.cs b/Tests/ConnectionRecoveryTests.cs
index c3954a9c..29cab445 100644
--- a/Tests/ConnectionRecoveryTests.cs
+++ b/Tests/ConnectionRecoveryTests.cs
@@ -13,22 +13,6 @@
namespace Tests;
-internal class FakeBackOffDelayPolicyDisabled : IBackOffDelayPolicy
-{
- public int CurrentAttempt => 1;
- public int Delay() => 1;
- public bool IsActive() => false;
- public void Reset() { }
-}
-
-internal class FakeFastBackOffDelay : IBackOffDelayPolicy
-{
- public int CurrentAttempt => 1;
- public int Delay() => 200;
- public bool IsActive() => true;
- public void Reset() { }
-}
-
public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)
: IntegrationTest(testOutputHelper, setupConnectionAndManagement: false)
{
diff --git a/Tests/HttpApiClient.cs b/Tests/HttpApiClient.cs
index 14ccc4b7..e3df44f6 100644
--- a/Tests/HttpApiClient.cs
+++ b/Tests/HttpApiClient.cs
@@ -31,12 +31,18 @@ public ushort GetClusterSize()
/// Creates a user, without password, with full permissions
///
/// The user name.
- public async Task CreateUserAsync(string userName)
+ /// The virtual host
+ public async Task CreateUserAsync(string userName, string virtualHost)
{
var userInfo = new UserInfo(null, null, []);
await _managementClient.CreateUserAsync(userName, userInfo);
var permissionInfo = new PermissionInfo();
- await _managementClient.CreatePermissionAsync("/", userName, permissionInfo);
+ await _managementClient.CreatePermissionAsync(virtualHost, userName, permissionInfo);
+ }
+
+ public async Task CreateVhostAsync(string vhostName)
+ {
+ await _managementClient.CreateVhostAsync(vhostName);
}
public async Task CheckConnectionAsync(string containerId, bool checkOpened = true)
diff --git a/Tests/IntegrationTest.Static.cs b/Tests/IntegrationTest.Static.cs
index 8dd6554c..697280c9 100644
--- a/Tests/IntegrationTest.Static.cs
+++ b/Tests/IntegrationTest.Static.cs
@@ -296,6 +296,11 @@ private static string InitRabbitMqHost()
}
}
+ protected static Task CreateVhostAsync(string vhost)
+ {
+ return s_httpApiClient.CreateVhostAsync(vhost);
+ }
+
private static bool InitIsRunningInCI()
{
if (bool.TryParse(Environment.GetEnvironmentVariable("CI"), out bool ci))
diff --git a/Tests/OAuth2Tests.cs b/Tests/OAuth2Tests.cs
new file mode 100644
index 00000000..63b12363
--- /dev/null
+++ b/Tests/OAuth2Tests.cs
@@ -0,0 +1,176 @@
+// This source code is dual-licensed under the Apache License, version 2.0,
+// and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System;
+using System.Collections.Generic;
+using System.IdentityModel.Tokens.Jwt;
+using System.Linq;
+using System.Security.Claims;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp;
+using Microsoft.IdentityModel.Tokens;
+using RabbitMQ.AMQP.Client;
+using RabbitMQ.AMQP.Client.Impl;
+using Xunit;
+using Xunit.Abstractions;
+using IConnection = RabbitMQ.AMQP.Client.IConnection;
+
+namespace Tests
+{
+ public class OAuth2Tests(ITestOutputHelper testOutputHelper)
+ : IntegrationTest(testOutputHelper, setupConnectionAndManagement: false)
+ {
+ private const string Base64Key = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH";
+
+ private const string Audience = "rabbitmq";
+
+ //
+ [SkippableFact]
+ public async Task ConnectToRabbitMqWithOAuth2TokenShouldSuccess()
+ {
+ Skip.IfNot(IsCluster);
+ IConnection connection = await AmqpConnection.CreateAsync(
+ ConnectionSettingsBuilder.Create()
+ .Host("localhost")
+ .Port(5672)
+ .OAuth2Options(new OAuth2Options(GenerateToken(DateTime.UtcNow.AddMinutes(5))))
+ .Build());
+
+ Assert.NotNull(connection);
+ await connection.CloseAsync();
+ }
+
+ [SkippableFact]
+ public async Task RefreshTokenShouldNotDisconnectTheClient()
+ {
+ Skip.IfNot(IsCluster);
+ IConnection connection = await AmqpConnection.CreateAsync(
+ ConnectionSettingsBuilder.Create()
+ .Host("localhost")
+ .Port(5672)
+ .OAuth2Options(new OAuth2Options(GenerateToken(DateTime.UtcNow.AddMilliseconds(1_000))))
+ .Build());
+ await connection.RefreshTokenAsync(GenerateToken(DateTime.UtcNow.AddMinutes(5)));
+ Thread.Sleep(TimeSpan.FromSeconds(1));
+ Assert.NotNull(connection);
+ Assert.Equal(State.Open, connection.State);
+ await connection.CloseAsync();
+ }
+
+ [SkippableFact]
+ public async Task ConnectToRabbitMqWithOAuth2TokenShouldDisconnectAfterTimeout()
+ {
+ Skip.IfNot(IsCluster);
+ var l = new List
+ {
+ ConnectionSettingsBuilder.Create().Uris(new List { new("amqp://") }),
+ ConnectionSettingsBuilder.Create().Uri(new Uri("amqp://localhost:5672")),
+ ConnectionSettingsBuilder.Create().Host("localhost").Port(5672)
+ };
+
+ foreach (ConnectionSettingsBuilder builder in l)
+ {
+ IConnection connection = await AmqpConnection.CreateAsync(builder
+ .RecoveryConfiguration(new RecoveryConfiguration().Activated(false).Topology(false))
+ .OAuth2Options(new OAuth2Options(GenerateToken(DateTime.UtcNow.AddMilliseconds(1_000)))).Build());
+ Assert.NotNull(connection);
+ Assert.Equal(State.Open, connection.State);
+ State? stateFrom = null;
+ State? stateTo = null;
+ Error? stateError = null;
+ TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ connection.ChangeState += (_, from, to, error) =>
+ {
+ stateFrom = from;
+ stateTo = to;
+ stateError = error;
+ tcs.SetResult(true);
+ };
+
+ await tcs.Task;
+ Assert.NotNull(stateFrom);
+ Assert.NotNull(stateTo);
+ Assert.NotNull(stateError);
+ Assert.NotNull(stateError.ErrorCode);
+ Assert.Equal(State.Open, stateFrom);
+ Assert.Equal(State.Closed, stateTo);
+ Assert.Equal(State.Closed, connection.State);
+ Assert.Contains(stateError.ErrorCode, "amqp:unauthorized-access");
+ }
+ }
+
+ [SkippableFact]
+ public async Task ConnectionShouldReconnectWithTheNewToken()
+ {
+ var recoveryConfiguration = new RecoveryConfiguration();
+ recoveryConfiguration.Topology(false);
+ recoveryConfiguration.BackOffDelayPolicy(new FakeFastBackOffDelay());
+
+ Skip.IfNot(IsCluster);
+ IConnection connection = await AmqpConnection.CreateAsync(
+ ConnectionSettingsBuilder.Create()
+ .Host("localhost")
+ .Port(5672)
+ .RecoveryConfiguration(recoveryConfiguration)
+ .ContainerId(_containerId)
+ .OAuth2Options(new OAuth2Options(GenerateToken(DateTime.UtcNow.AddMilliseconds(1_500))))
+ .Build());
+ await connection.RefreshTokenAsync(GenerateToken(DateTime.UtcNow.AddMinutes(5)));
+ TaskCompletionSource twoRecoveryEventsSeenTcs = CreateTaskCompletionSource();
+ int recoveryEvents = 0;
+ connection.ChangeState += (sender, from, to, error) =>
+ {
+ if (Interlocked.Increment(ref recoveryEvents) == 2)
+ {
+ twoRecoveryEventsSeenTcs.SetResult(true);
+ }
+ };
+
+ Assert.Equal(State.Open, connection.State);
+ Thread.Sleep(TimeSpan.FromSeconds(1));
+ await WaitUntilConnectionIsKilledAndOpen(_containerId);
+ Assert.Equal(State.Open, connection.State);
+ await WhenTcsCompletes(twoRecoveryEventsSeenTcs);
+ await connection.CloseAsync();
+ }
+
+ private static string GenerateToken(DateTime duration)
+ {
+ byte[] decodedKey = Convert.FromBase64String(Base64Key);
+
+ var claims = new[]
+ {
+ new Claim(JwtRegisteredClaimNames.Iss, "unit_test"),
+ new Claim(JwtRegisteredClaimNames.Aud, Audience),
+ new Claim(JwtRegisteredClaimNames.Exp, new DateTimeOffset(duration).ToUnixTimeSeconds().ToString()),
+ new Claim("scope", "rabbitmq.configure:*/* rabbitmq.write:*/* rabbitmq.read:*/*"),
+ new Claim("random", GenerateRandomString(6))
+ };
+
+ var key = new SymmetricSecurityKey(decodedKey);
+ var creds = new SigningCredentials(key, SecurityAlgorithms.HmacSha256);
+
+ var token = new JwtSecurityToken(
+ claims: claims,
+ expires: duration,
+ signingCredentials: creds
+ )
+ { Header = { ["kid"] = "token-key" } };
+
+ var tokenHandler = new JwtSecurityTokenHandler();
+ return tokenHandler.WriteToken(token);
+ }
+
+ private static string GenerateRandomString(int length)
+ {
+ const string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ var random = new Random();
+ return new string(Enumerable.Repeat(chars, length)
+ .Select(s => s[random.Next(s.Length)]).ToArray());
+ }
+ }
+}
diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj
index 27a9c928..78339832 100644
--- a/Tests/Tests.csproj
+++ b/Tests/Tests.csproj
@@ -29,6 +29,7 @@
+
diff --git a/Tests/TlsConnectionTests.cs b/Tests/TlsConnectionTests.cs
index be573568..2082dfae 100644
--- a/Tests/TlsConnectionTests.cs
+++ b/Tests/TlsConnectionTests.cs
@@ -6,6 +6,7 @@
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
+using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
@@ -46,12 +47,16 @@ public override Task DisposeAsync()
return base.DisposeAsync();
}
- [Fact]
- public async Task ConnectUsingTlsAndUserPassword()
+ [Theory]
+ [InlineData("/my_tls_host")]
+ [InlineData("/")]
+ public async Task ConnectUsingTlsAndUserPassword(string virtualHost)
{
+ await CreateVhostAsync(virtualHost);
ConnectionSettings connectionSettings = _connectionSettingBuilder
.Scheme("amqps")
.Port(_port)
+ .VirtualHost(virtualHost)
.Build();
Assert.True(connectionSettings.UseSsl);
Assert.NotNull(connectionSettings.TlsSettings);
@@ -71,14 +76,15 @@ public async Task ConnectUsingTlsAndUserPassword()
else
{
connectionSettings.TlsSettings.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors;
- connectionSettings.TlsSettings.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
+ connectionSettings.TlsSettings.RemoteCertificateValidationCallback =
+ (sender, certificate, chain, errors) => true;
}
Assert.Equal("localhost", connectionSettings.Host);
Assert.Equal(_port, connectionSettings.Port);
Assert.Equal("guest", connectionSettings.User);
Assert.Equal("guest", connectionSettings.Password);
- Assert.Equal("/", connectionSettings.VirtualHost);
+ Assert.Equal(virtualHost, connectionSettings.VirtualHost);
Assert.Equal("amqps", connectionSettings.Scheme);
IConnection connection = await AmqpConnection.CreateAsync(connectionSettings);
@@ -87,18 +93,22 @@ public async Task ConnectUsingTlsAndUserPassword()
Assert.Equal(State.Closed, connection.State);
}
- [Fact]
- public async Task ConnectUsingTlsAndClientCertificate()
+ [Theory]
+ [InlineData("/my_tls_host_certificate")]
+ [InlineData("/")]
+ public async Task ConnectUsingTlsAndClientCertificate(string virtualHost)
{
+ await CreateVhostAsync(virtualHost);
string clientCertFile = GetClientCertFile();
var cert = new X509Certificate2(clientCertFile, "grapefruit");
- await CreateUserFromCertSubject(cert);
+ await CreateUserFromCertSubject(cert, virtualHost);
ConnectionSettings connectionSettings = _connectionSettingBuilder
.Scheme("amqps")
.SaslMechanism(SaslMechanism.External)
.Port(_port)
+ .VirtualHost(virtualHost)
.Build();
Assert.True(connectionSettings.UseSsl);
@@ -120,14 +130,15 @@ public async Task ConnectUsingTlsAndClientCertificate()
else
{
connectionSettings.TlsSettings.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors;
- connectionSettings.TlsSettings.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
+ connectionSettings.TlsSettings.RemoteCertificateValidationCallback =
+ (sender, certificate, chain, errors) => true;
}
Assert.Equal("localhost", connectionSettings.Host);
Assert.Equal(_port, connectionSettings.Port);
Assert.Null(connectionSettings.User);
Assert.Null(connectionSettings.Password);
- Assert.Equal("/", connectionSettings.VirtualHost);
+ Assert.Equal(virtualHost, connectionSettings.VirtualHost);
Assert.Equal("amqps", connectionSettings.Scheme);
Assert.Equal(SaslMechanism.External, connectionSettings.SaslMechanism);
@@ -137,10 +148,10 @@ public async Task ConnectUsingTlsAndClientCertificate()
Assert.Equal(State.Closed, connection.State);
}
- private Task CreateUserFromCertSubject(X509Certificate cert)
+ private Task CreateUserFromCertSubject(X509Certificate cert, string virtualHost)
{
string userName = cert.Subject.Trim().Replace(" ", string.Empty);
- return _httpApiClient.CreateUserAsync(userName);
+ return _httpApiClient.CreateUserAsync(userName, virtualHost);
}
private static string GetClientCertFile()
@@ -151,6 +162,7 @@ private static string GetClientCertFile()
{
clientCertFile = Path.GetFullPath(Path.Combine(cwd, "../../../../../.ci/certs/client_localhost.p12"));
}
+
Assert.True(File.Exists(clientCertFile));
return clientCertFile;
}
diff --git a/Tests/UtilsTests.cs b/Tests/UtilsTests.cs
index e2730e07..29e2144a 100644
--- a/Tests/UtilsTests.cs
+++ b/Tests/UtilsTests.cs
@@ -9,8 +9,25 @@
namespace Tests;
+internal class FakeBackOffDelayPolicyDisabled : IBackOffDelayPolicy
+{
+ public int CurrentAttempt => 1;
+ public int Delay() => 1;
+ public bool IsActive() => false;
+ public void Reset() { }
+}
+
+internal class FakeFastBackOffDelay : IBackOffDelayPolicy
+{
+ public int CurrentAttempt => 1;
+ public int Delay() => 200;
+ public bool IsActive() => true;
+ public void Reset() { }
+}
+
public class UtilsTests
{
+
[Fact]
public void ValidateMessageAnnotationsTest()
{
diff --git a/docs/Examples/OAuth2/OAuth2.csproj b/docs/Examples/OAuth2/OAuth2.csproj
new file mode 100644
index 00000000..36887dae
--- /dev/null
+++ b/docs/Examples/OAuth2/OAuth2.csproj
@@ -0,0 +1,18 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/Examples/OAuth2/Program.cs b/docs/Examples/OAuth2/Program.cs
new file mode 100644
index 00000000..a33541fe
--- /dev/null
+++ b/docs/Examples/OAuth2/Program.cs
@@ -0,0 +1,45 @@
+// See https://aka.ms/new-console-template for more information
+
+using System.Diagnostics;
+using OAuth2;
+using RabbitMQ.AMQP.Client;
+using RabbitMQ.AMQP.Client.Impl;
+using Trace = Amqp.Trace;
+using TraceLevel = Amqp.TraceLevel;
+
+Trace.TraceLevel = TraceLevel.Verbose;
+
+ConsoleTraceListener consoleListener = new();
+Trace.TraceListener = (l, f, a) =>
+ consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}");
+
+IConnection connection = await AmqpConnection.CreateAsync(
+ ConnectionSettingsBuilder.Create()
+ .Host("localhost")
+ .Port(5672)
+ .OAuth2Options(new OAuth2Options(Token.GenerateToken(DateTime.UtcNow.AddSeconds(5))))
+ .Build()).ConfigureAwait(false);
+
+Trace.WriteLine(TraceLevel.Information, $"Connected to the broker {connection} successfully");
+Trace.WriteLine(TraceLevel.Information, $"Connection status {connection.State}");
+
+CancellationTokenSource cts = new();
+
+_ = Task.Run(() =>
+{
+ while (!cts.IsCancellationRequested)
+ {
+ string token = Token.GenerateToken(DateTime.UtcNow.AddSeconds(5));
+ Trace.WriteLine(TraceLevel.Information, $"Token Refresh..{token}");
+ connection.RefreshTokenAsync(token).Wait();
+ Task.Delay(TimeSpan.FromSeconds(4), cts.Token).Wait();
+ }
+});
+
+Console.WriteLine("Connection state: " + connection.State);
+
+// press any key to exit
+Console.ReadKey();
+cts.Cancel();
+
+await connection.CloseAsync().ConfigureAwait(false);
diff --git a/docs/Examples/OAuth2/README.md b/docs/Examples/OAuth2/README.md
new file mode 100644
index 00000000..7e726ffc
--- /dev/null
+++ b/docs/Examples/OAuth2/README.md
@@ -0,0 +1,13 @@
+### OAuth2 Example
+
+This example demonstrates how to use the OAuth2 authentication mechanism.
+
+It is meant to be used with the RabbitMQ server configured for the CI.
+
+To run the example:
+ - make rabbitmq-server-start
+ - dotnet run
+
+
+The example will create a connection given the token and refresh the token every 4 seconds.
+
diff --git a/docs/Examples/OAuth2/Token.cs b/docs/Examples/OAuth2/Token.cs
new file mode 100644
index 00000000..b06d7820
--- /dev/null
+++ b/docs/Examples/OAuth2/Token.cs
@@ -0,0 +1,55 @@
+// This source code is dual-licensed under the Apache License, version 2.0,
+// and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System.IdentityModel.Tokens.Jwt;
+using System.Security.Claims;
+using System.Security.Cryptography;
+using System.Text;
+using Microsoft.IdentityModel.Tokens;
+
+namespace OAuth2
+{
+ public class Token
+ {
+ private const string Base64Key = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH";
+
+ private const string Audience = "rabbitmq";
+
+ public static string GenerateToken(DateTime duration)
+ {
+ byte[] decodedKey = Convert.FromBase64String(Base64Key);
+
+ var claims = new[]
+ {
+ new Claim(JwtRegisteredClaimNames.Iss, "unit_test"),
+ new Claim(JwtRegisteredClaimNames.Aud, Audience),
+ new Claim(JwtRegisteredClaimNames.Exp, new DateTimeOffset(duration).ToUnixTimeSeconds().ToString()),
+ new Claim("scope", "rabbitmq.configure:*/* rabbitmq.write:*/* rabbitmq.read:*/*"),
+ new Claim("random", GenerateRandomString(6))
+ };
+
+ var key = new SymmetricSecurityKey(decodedKey);
+ var creds = new SigningCredentials(key, SecurityAlgorithms.HmacSha256);
+
+ var token = new JwtSecurityToken(
+ claims: claims,
+ expires: duration,
+ signingCredentials: creds
+ );
+
+ token.Header["kid"] = "token-key";
+
+ var tokenHandler = new JwtSecurityTokenHandler();
+ return tokenHandler.WriteToken(token);
+ }
+
+ private static string GenerateRandomString(int length)
+ {
+ const string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ var random = new Random();
+ return new string(Enumerable.Repeat(chars, length)
+ .Select(s => s[random.Next(s.Length)]).ToArray());
+ }
+ }
+}
diff --git a/docs/Examples/README.md b/docs/Examples/README.md
index e212da89..8e3c9ba2 100644
--- a/docs/Examples/README.md
+++ b/docs/Examples/README.md
@@ -7,3 +7,4 @@ This directory contains examples of how to use the RabbitMQ AMQP 1.0 .NET client
- How to write a reliable client [here](./HAClient/)
- Performance Test [here](./PerformanceTest/). You can tune some parameters in the `Program.cs` file.
- OpenTelemetry Integration [here](./OpenTelemetryIntegration/)
+- OAuth2 Example [here](./OAuth2)
\ No newline at end of file
diff --git a/rabbitmq-amqp-dotnet-client.sln b/rabbitmq-amqp-dotnet-client.sln
index 94f7bf24..5098c1b1 100644
--- a/rabbitmq-amqp-dotnet-client.sln
+++ b/rabbitmq-amqp-dotnet-client.sln
@@ -33,6 +33,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetryIntegration",
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuth2", "docs\Examples\OAuth2\OAuth2.csproj", "{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -67,6 +69,10 @@ Global
{D74F49FC-2C9A-4227-8988-30925C509388}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D74F49FC-2C9A-4227-8988-30925C509388}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D74F49FC-2C9A-4227-8988-30925C509388}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -77,5 +83,6 @@ Global
{E2FBE920-8DAB-4B52-87AF-B781BF715A5A} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
{59CB2F07-4A5A-4871-8C97-02EC21C68D6B} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
{D74F49FC-2C9A-4227-8988-30925C509388} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
+ {C1EA4B66-E60E-4945-A4C6-91B433F9BA65} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
EndGlobalSection
EndGlobal