Skip to content

Commit 3aafb34

Browse files
committed
Refresh token
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent decdd5c commit 3aafb34

File tree

8 files changed

+69
-12
lines changed

8 files changed

+69
-12
lines changed

RabbitMQ.AMQP.Client/ConnectionSettings.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ protected Address InitAddress(string host, int port, string? user, string? passw
253253
scheme: scheme);
254254
}
255255

256+
internal void UpdateOAuthPassword(string? password)
257+
{
258+
_address = new Address(_address.Host, _address.Port, _address.User, password, _address.Path, _address.Scheme);
259+
}
260+
256261
public ConnectionSettings(string scheme,
257262
string host,
258263
int port,

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System.Collections.Generic;
6+
using System.Threading.Tasks;
67

78
namespace RabbitMQ.AMQP.Client
89
{
@@ -60,5 +61,7 @@ public interface IConnection : ILifeCycle
6061
/// Get or set the Connection ID. Used by <see cref="IEnvironment"/>
6162
/// </summary>
6263
public long Id { get; set; }
64+
65+
public Task RefreshTokenAsync(string token);
6366
}
6467
}

RabbitMQ.AMQP.Client/IManagement.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System.Data;
56
using System.Threading;
67
using System.Threading.Tasks;
78

@@ -61,6 +62,8 @@ Task<IQueueInfo> GetQueueInfoAsync(string queueName,
6162
/// </summary>
6263
/// <returns>A builder for <see cref="IBindingSpecification"/></returns>
6364
IBindingSpecification Binding();
65+
66+
public Task RefreshTokenAsync(string token);
6467
}
6568

6669
internal interface IManagementTopology

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ public IEnumerable<IConsumer> Consumers
164164
/// </summary>
165165
public long Id { get; set; }
166166

167+
public async Task RefreshTokenAsync(string token)
168+
{
169+
await _management.RefreshTokenAsync(token)
170+
.ConfigureAwait(false);
171+
_connectionSettings.UpdateOAuthPassword(token);
172+
}
173+
167174
// TODO cancellation token
168175
public override async Task OpenAsync()
169176
{
@@ -249,7 +256,8 @@ internal void AddPublisher(Guid id, IPublisher consumer)
249256
if (false == _publishersDict.TryAdd(id, consumer))
250257
{
251258
// TODO create "internal bug" exception type?
252-
throw new InvalidOperationException("could not add publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
259+
throw new InvalidOperationException(
260+
"could not add publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
253261
}
254262
}
255263

@@ -258,7 +266,8 @@ internal void RemovePublisher(Guid id)
258266
if (false == _publishersDict.TryRemove(id, out _))
259267
{
260268
// TODO create "internal bug" exception type?
261-
throw new InvalidOperationException("could not remove publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
269+
throw new InvalidOperationException(
270+
"could not remove publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
262271
}
263272
}
264273

@@ -268,7 +277,8 @@ internal void AddConsumer(Guid id, IConsumer consumer)
268277
if (false == _consumersDict.TryAdd(id, consumer))
269278
{
270279
// TODO create "internal bug" exception type?
271-
throw new InvalidOperationException("could not add consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
280+
throw new InvalidOperationException(
281+
"could not add consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
272282
}
273283
}
274284

@@ -277,7 +287,8 @@ internal void RemoveConsumer(Guid id)
277287
if (false == _consumersDict.TryRemove(id, out _))
278288
{
279289
// TODO create "internal bug" exception type?
280-
throw new InvalidOperationException("could not remove consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
290+
throw new InvalidOperationException(
291+
"could not remove consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
281292
}
282293
}
283294

@@ -639,6 +650,7 @@ private void HandleProperties(Fields properties)
639650
{
640651
value = (string)kvp.Value;
641652
}
653+
642654
_connectionProperties[key] = value;
643655
}
644656

@@ -647,6 +659,7 @@ private void HandleProperties(Fields properties)
647659
{
648660
// TODO Java client throws exception here
649661
}
662+
650663
_areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion);
651664
}
652665
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System;
66
using System.Collections.Concurrent;
77
using System.Linq;
8+
using System.Text;
89
using System.Threading;
910
using System.Threading.Tasks;
1011
using Amqp;
@@ -46,6 +47,7 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
4647
internal const string Post = "POST";
4748
internal const string Delete = "DELETE";
4849
private const string ReplyTo = "$me";
50+
private const string AuthTokens = "/auth/tokens";
4951

5052
protected readonly TaskCompletionSource<bool> _managementSessionClosedTcs =
5153
Utils.CreateTaskCompletionSource<bool>();
@@ -134,6 +136,14 @@ public IBindingSpecification Binding()
134136
return new AmqpBindingSpecification(this);
135137
}
136138

139+
public async Task RefreshTokenAsync(string token)
140+
{
141+
int[] expectedResponseCodes = { Code204 };
142+
_ = await RequestAsync( Encoding.ASCII.GetBytes(token),
143+
AuthTokens, Put, expectedResponseCodes)
144+
.ConfigureAwait(false);
145+
}
146+
137147
/// <summary>
138148
/// Open the management session.
139149
/// </summary>

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ RabbitMQ.AMQP.Client.IConnection.Management() -> RabbitMQ.AMQP.Client.IManagemen
156156
RabbitMQ.AMQP.Client.IConnection.Properties.get -> System.Collections.Generic.IReadOnlyDictionary<string!, object!>!
157157
RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
158158
RabbitMQ.AMQP.Client.IConnection.Publishers.get -> System.Collections.Generic.IEnumerable<RabbitMQ.AMQP.Client.IPublisher!>!
159+
RabbitMQ.AMQP.Client.IConnection.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
159160
RabbitMQ.AMQP.Client.IConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
160161
RabbitMQ.AMQP.Client.IConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
161162
RabbitMQ.AMQP.Client.IConsumer
@@ -240,6 +241,7 @@ RabbitMQ.AMQP.Client.IManagement.GetQueueInfoAsync(RabbitMQ.AMQP.Client.IQueueSp
240241
RabbitMQ.AMQP.Client.IManagement.GetQueueInfoAsync(string! queueName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IQueueInfo!>!
241242
RabbitMQ.AMQP.Client.IManagement.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
242243
RabbitMQ.AMQP.Client.IManagement.Queue(string! name) -> RabbitMQ.AMQP.Client.IQueueSpecification!
244+
RabbitMQ.AMQP.Client.IManagement.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
243245
RabbitMQ.AMQP.Client.IMessage
244246
RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime() -> System.DateTime
245247
RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
@@ -344,6 +346,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Management() -> RabbitMQ.AMQP.Client.IM
344346
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Properties.get -> System.Collections.Generic.IReadOnlyDictionary<string!, object!>!
345347
RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
346348
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Publishers.get -> System.Collections.Generic.IEnumerable<RabbitMQ.AMQP.Client.IPublisher!>!
349+
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
347350
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
348351
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
349352
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
@@ -386,6 +389,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpManagement.GetQueueInfoAsync(RabbitMQ.AMQP.Client.
386389
RabbitMQ.AMQP.Client.Impl.AmqpManagement.GetQueueInfoAsync(string! queueName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IQueueInfo!>!
387390
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
388391
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(string! name) -> RabbitMQ.AMQP.Client.IQueueSpecification!
392+
RabbitMQ.AMQP.Client.Impl.AmqpManagement.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
389393
RabbitMQ.AMQP.Client.Impl.AmqpMessage
390394
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime
391395
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!

Tests/OAuth2Tests.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,24 @@ public async Task ConnectToRabbitMqWithOAuth2TokenShouldSuccess()
4444
await connection.CloseAsync();
4545
}
4646

47+
48+
[SkippableFact]
49+
public async Task RefreshTokenShouldNotDisconnectTheClient()
50+
{
51+
Skip.IfNot(IsCluster);
52+
IConnection connection = await AmqpConnection.CreateAsync(
53+
ConnectionSettingsBuilder.Create()
54+
.Host("localhost")
55+
.Port(5672)
56+
.OAuth2Options(new OAuth2Options(GenerateToken(DateTime.UtcNow.AddMilliseconds(1_000))))
57+
.Build());
58+
await connection.RefreshTokenAsync(GenerateToken(DateTime.UtcNow.AddMinutes(5)));
59+
Thread.Sleep(TimeSpan.FromSeconds(1));
60+
Assert.NotNull(connection);
61+
Assert.Equal(State.Open, connection.State);
62+
await connection.CloseAsync();
63+
}
64+
4765
[SkippableFact]
4866
public async Task ConnectToRabbitMqWithOAuth2TokenShouldDisconnectAfterTimeout()
4967
{
@@ -65,8 +83,8 @@ public async Task ConnectToRabbitMqWithOAuth2TokenShouldDisconnectAfterTimeout()
6583
State? stateFrom = null;
6684
State? stateTo = null;
6785
Error? stateError = null;
68-
TaskCompletionSource<bool> tcs = new();
69-
connection.ChangeState += (sender, from, to, error) =>
86+
TaskCompletionSource<bool> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
87+
connection.ChangeState += (_, from, to, error) =>
7088
{
7189
stateFrom = from;
7290
stateTo = to;
@@ -106,9 +124,7 @@ private static string GenerateToken(DateTime duration)
106124
claims: claims,
107125
expires: duration,
108126
signingCredentials: creds
109-
);
110-
111-
token.Header["kid"] = "token-key";
127+
) { Header = { ["kid"] = "token-key" } };
112128

113129
var tokenHandler = new JwtSecurityTokenHandler();
114130
return tokenHandler.WriteToken(token);

Tests/TlsConnectionTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ public override Task DisposeAsync()
4646
return base.DisposeAsync();
4747
}
4848

49-
[Fact]
50-
public async Task ConnectUsingTlsAndUserPassword()
49+
[Theory]
50+
[InlineData("/")]
51+
[InlineData("tls")]
52+
public async Task ConnectUsingTlsAndUserPassword(string virtualHost)
5153
{
5254
ConnectionSettings connectionSettings = _connectionSettingBuilder
5355
.Scheme("amqps")
5456
.Port(_port)
57+
.VirtualHost(virtualHost)
5558
.Build();
5659
Assert.True(connectionSettings.UseSsl);
5760
Assert.NotNull(connectionSettings.TlsSettings);
@@ -78,7 +81,7 @@ public async Task ConnectUsingTlsAndUserPassword()
7881
Assert.Equal(_port, connectionSettings.Port);
7982
Assert.Equal("guest", connectionSettings.User);
8083
Assert.Equal("guest", connectionSettings.Password);
81-
Assert.Equal("/", connectionSettings.VirtualHost);
84+
Assert.Equal(virtualHost, connectionSettings.VirtualHost);
8285
Assert.Equal("amqps", connectionSettings.Scheme);
8386

8487
IConnection connection = await AmqpConnection.CreateAsync(connectionSettings);

0 commit comments

Comments
 (0)