Skip to content

Commit cf6a74c

Browse files
committed
* Add TCS for server-originated channel closure.
1 parent e6cfbf1 commit cf6a74c

File tree

2 files changed

+101
-53
lines changed

2 files changed

+101
-53
lines changed

projects/Applications/GH-1749/Program.cs

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
using System.Runtime.ExceptionServices;
3535
using RabbitMQ.Client;
3636
using RabbitMQ.Client.Events;
37+
using RabbitMQ.Client.Exceptions;
3738

3839
namespace GH_1749
3940
{
@@ -74,7 +75,10 @@ static async Task Main(string[] args)
7475
ConnectionFactory connectionFactory = new()
7576
{
7677
HostName = hostname,
77-
AutomaticRecoveryEnabled = true,
78+
AutomaticRecoveryEnabled = false,
79+
TopologyRecoveryEnabled = false,
80+
RequestedConnectionTimeout = TimeSpan.FromSeconds(600),
81+
RequestedHeartbeat = TimeSpan.FromSeconds(600),
7882
UserName = "guest",
7983
Password = "guest",
8084
ClientProvidedName = ConnectionClientProvidedName
@@ -86,7 +90,6 @@ static async Task Main(string[] args)
8690
connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) =>
8791
{
8892
Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea);
89-
_ = CloseConnectionAsync();
9093
return Task.CompletedTask;
9194
};
9295

@@ -113,33 +116,15 @@ static async Task Main(string[] args)
113116
channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync;
114117
channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
115118

116-
QueueDeclareOk queue = await channel.QueueDeclareAsync();
117-
118-
var consumer = new GH1749Consumer(channel);
119-
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);
120-
121-
_ = CloseConnectionAsync();
122-
123-
Console.WriteLine("{0} [INFO] consumer is running", Util.Now);
124-
Console.ReadLine();
125-
}
126-
127-
static async Task CloseConnectionAsync()
128-
{
129-
if (s_util is null)
130-
{
131-
throw new NullReferenceException("s_util");
132-
}
133-
134119
try
135120
{
136-
Console.WriteLine("{0} [INFO] start closing connection: {1}", Now, ConnectionClientProvidedName);
137-
await s_util.CloseConnectionAsync(ConnectionClientProvidedName);
138-
Console.WriteLine("{0} [INFO] done closing connection: {1}", Now, ConnectionClientProvidedName);
121+
await channel.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
139122
}
140-
catch (Exception ex)
123+
catch (OperationInterruptedException)
141124
{
142-
Console.Error.WriteLine("{0} [ERROR] error while closing connection: {1}", Now, ex);
125+
await channel.DisposeAsync();
126+
// rabbitmq-dotnet-client-1749
127+
// await Task.Delay(2000);
143128
}
144129
}
145130

@@ -156,6 +141,8 @@ private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArg
156141
{
157142
Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea);
158143
return Task.CompletedTask;
144+
// rabbitmq-dotnet-client-1749
145+
// return Task.Delay(1000);
159146
}
160147

161148
private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e)

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,14 @@ internal partial class Channel : IChannel, IRecoverable
5757
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
5858

5959
private ShutdownEventArgs? _closeReason;
60-
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
60+
private TaskCompletionSource<bool>? _serverOriginatedChannelCloseTcs;
6161

6262
internal readonly IConsumerDispatcher ConsumerDispatcher;
6363

6464
private bool _disposedValue;
6565
private bool _isDisposing;
66-
private readonly object _isDisposingLock = new();
66+
67+
private readonly object _locker = new();
6768

6869
public Channel(ISession session, CreateChannelOptions createChannelOptions)
6970
{
@@ -86,6 +87,8 @@ public Channel(ISession session, CreateChannelOptions createChannelOptions)
8687

8788
internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
8889

90+
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
91+
8992
public TimeSpan ContinuationTimeout { get; set; }
9093

9194
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -548,7 +551,7 @@ protected virtual void Dispose(bool disposing)
548551

549552
if (disposing)
550553
{
551-
lock (_isDisposingLock)
554+
lock (_locker)
552555
{
553556
if (_isDisposing)
554557
{
@@ -564,13 +567,20 @@ protected virtual void Dispose(bool disposing)
564567
this.AbortAsync().GetAwaiter().GetResult();
565568
}
566569

567-
ConsumerDispatcher.Dispose();
568-
_rpcSemaphore.Dispose();
569-
_confirmSemaphore.Dispose();
570-
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
570+
MaybeWaitForServerOriginatedClose();
571571
}
572572
finally
573573
{
574+
try
575+
{
576+
ConsumerDispatcher.Dispose();
577+
_rpcSemaphore.Dispose();
578+
_confirmSemaphore.Dispose();
579+
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
580+
}
581+
catch
582+
{
583+
}
574584
_disposedValue = true;
575585
_isDisposing = false;
576586
}
@@ -586,7 +596,7 @@ protected virtual async ValueTask DisposeAsyncCore(bool disposing)
586596

587597
if (disposing)
588598
{
589-
lock (_isDisposingLock)
599+
lock (_locker)
590600
{
591601
if (_isDisposing)
592602
{
@@ -603,18 +613,26 @@ await this.AbortAsync()
603613
.ConfigureAwait(false);
604614
}
605615

606-
ConsumerDispatcher.Dispose();
607-
_rpcSemaphore.Dispose();
608-
_confirmSemaphore.Dispose();
609-
610616
if (_outstandingPublisherConfirmationsRateLimiter is not null)
611617
{
612618
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
613619
.ConfigureAwait(false);
614620
}
621+
622+
await MaybeWaitForServerOriginatedCloseAsync()
623+
.ConfigureAwait(false);
615624
}
616625
finally
617626
{
627+
try
628+
{
629+
ConsumerDispatcher.Dispose();
630+
_rpcSemaphore.Dispose();
631+
_confirmSemaphore.Dispose();
632+
}
633+
catch
634+
{
635+
}
618636
_disposedValue = true;
619637
_isDisposing = false;
620638
}
@@ -714,25 +732,39 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
714732

715733
protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
716734
{
717-
// TODO add check for Disposing / Disposed
718-
var channelClose = new ChannelClose(cmd.MethodSpan);
719-
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
720-
channelClose._replyCode,
721-
channelClose._replyText,
722-
channelClose._classId,
723-
channelClose._methodId));
724-
725-
await Session.CloseAsync(_closeReason, notify: false)
726-
.ConfigureAwait(false);
735+
lock (_locker)
736+
{
737+
_serverOriginatedChannelCloseTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
738+
}
727739

728-
var method = new ChannelCloseOk();
729-
await ModelSendAsync(in method, cancellationToken)
730-
.ConfigureAwait(false);
740+
try
741+
{
742+
// TODO add check for Disposing / Disposed
743+
var channelClose = new ChannelClose(cmd.MethodSpan);
744+
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
745+
channelClose._replyCode,
746+
channelClose._replyText,
747+
channelClose._classId,
748+
channelClose._methodId));
731749

732-
await Session.NotifyAsync(cancellationToken)
733-
.ConfigureAwait(false);
750+
await Session.CloseAsync(_closeReason, notify: false)
751+
.ConfigureAwait(false);
734752

735-
return true;
753+
var method = new ChannelCloseOk();
754+
await ModelSendAsync(in method, cancellationToken)
755+
.ConfigureAwait(false);
756+
757+
await Session.NotifyAsync(cancellationToken)
758+
.ConfigureAwait(false);
759+
760+
_serverOriginatedChannelCloseTcs.TrySetResult(true);
761+
return true;
762+
}
763+
catch (Exception ex)
764+
{
765+
_serverOriginatedChannelCloseTcs.TrySetException(ex);
766+
throw;
767+
}
736768
}
737769

738770
protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
@@ -1654,5 +1686,34 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
16541686
}
16551687
}
16561688
}
1689+
1690+
private void MaybeWaitForServerOriginatedClose()
1691+
{
1692+
if (_serverOriginatedChannelCloseTcs is not null)
1693+
{
1694+
try
1695+
{
1696+
_serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5));
1697+
}
1698+
catch
1699+
{
1700+
}
1701+
}
1702+
}
1703+
1704+
private async Task MaybeWaitForServerOriginatedCloseAsync()
1705+
{
1706+
if (_serverOriginatedChannelCloseTcs is not null)
1707+
{
1708+
try
1709+
{
1710+
await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
1711+
.ConfigureAwait(false);
1712+
}
1713+
catch
1714+
{
1715+
}
1716+
}
1717+
}
16571718
}
16581719
}

0 commit comments

Comments
 (0)