Skip to content

Commit a2a0967

Browse files
committed
* Add basic test to see what dispose after a channel exception does.
* Modify `CreateChannel` app to try and trigger GH1751
1 parent 30a244d commit a2a0967

File tree

3 files changed

+79
-18
lines changed

3 files changed

+79
-18
lines changed

projects/Applications/CreateChannel/Program.cs

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Diagnostics;
34-
using System.Threading;
3535
using System.Threading.Tasks;
3636

3737
using RabbitMQ.Client;
38+
using RabbitMQ.Client.Exceptions;
3839

3940
namespace CreateChannel
4041
{
@@ -44,49 +45,89 @@ public static class Program
4445
private const int ChannelsToOpen = 50;
4546

4647
private static int channelsOpened;
47-
private static AutoResetEvent doneEvent;
4848

4949
public static async Task Main()
5050
{
51-
doneEvent = new AutoResetEvent(false);
51+
var doneTcs = new TaskCompletionSource<bool>();
5252

5353
var connectionFactory = new ConnectionFactory { };
5454
await using IConnection connection = await connectionFactory.CreateConnectionAsync();
5555

5656
var watch = Stopwatch.StartNew();
57-
_ = Task.Run(async () =>
57+
var workTask = Task.Run(async () =>
5858
{
59-
var channels = new IChannel[ChannelsToOpen];
60-
for (int i = 0; i < Repeats; i++)
59+
try
6160
{
62-
for (int j = 0; j < channels.Length; j++)
61+
var channelOpenTasks = new List<Task<IChannel>>();
62+
var channelDisposeTasks = new List<ValueTask>();
63+
var channels = new List<IChannel>();
64+
for (int i = 0; i < Repeats; i++)
6365
{
64-
channels[j] = await connection.CreateChannelAsync();
65-
channelsOpened++;
66-
}
66+
for (int j = 0; j < ChannelsToOpen; j++)
67+
{
68+
channelOpenTasks.Add(connection.CreateChannelAsync());
69+
}
6770

68-
for (int j = 0; j < channels.Length; j++)
69-
{
70-
await channels[j].DisposeAsync();
71+
for (int j = 0; j < channelOpenTasks.Count; j++)
72+
{
73+
IChannel ch = await channelOpenTasks[j];
74+
if (j % 8 == 0)
75+
{
76+
try
77+
{
78+
await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
79+
}
80+
catch (OperationInterruptedException)
81+
{
82+
await ch.DisposeAsync();
83+
}
84+
catch (Exception ex)
85+
{
86+
_ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}");
87+
}
88+
}
89+
else
90+
{
91+
channels.Add(ch);
92+
channelsOpened++;
93+
}
94+
}
95+
channelOpenTasks.Clear();
96+
97+
for (int j = 0; j < channels.Count; j++)
98+
{
99+
channelDisposeTasks.Add(channels[j].DisposeAsync());
100+
}
101+
102+
for (int j = 0; j < channels.Count; j++)
103+
{
104+
await channelDisposeTasks[j];
105+
}
106+
channelDisposeTasks.Clear();
71107
}
72-
}
73108

74-
doneEvent.Set();
109+
doneTcs.SetResult(true);
110+
}
111+
catch (Exception ex)
112+
{
113+
doneTcs.SetException(ex);
114+
}
75115
});
76116

77117
Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
78118
Console.WriteLine();
79119
Console.WriteLine("Opened");
80-
while (!doneEvent.WaitOne(500))
120+
while (false == doneTcs.Task.IsCompleted)
81121
{
82122
Console.WriteLine($"{channelsOpened,5}");
123+
await Task.Delay(150);
83124
}
84125
watch.Stop();
85126
Console.WriteLine($"{channelsOpened,5}");
86127
Console.WriteLine();
87-
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
128+
Console.WriteLine($"Took {watch.Elapsed}");
88129

89-
Console.ReadLine();
130+
await workTask;
90131
}
91132
}
92133
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
714714

715715
protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
716716
{
717+
// TODO add check for Disposing / Disposed
717718
var channelClose = new ChannelClose(cmd.MethodSpan);
718719
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
719720
channelClose._replyCode,
@@ -730,11 +731,14 @@ await ModelSendAsync(in method, cancellationToken)
730731

731732
await Session.NotifyAsync(cancellationToken)
732733
.ConfigureAwait(false);
734+
733735
return true;
734736
}
735737

736738
protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
737739
{
740+
// TODO add check for Disposing / Disposed
741+
738742
/*
739743
* Note:
740744
* This call _must_ come before completing the async continuation

projects/Test/Integration/TestQueueDeclare.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync()
5757
Assert.Equal(q, passiveDeclareResult.QueueName);
5858
}
5959

60+
[Fact]
61+
public async Task TestPassiveQueueDeclareException_GH1749()
62+
{
63+
string q = GenerateQueueName();
64+
try
65+
{
66+
await _channel.QueueDeclarePassiveAsync(q);
67+
}
68+
catch (Exception ex)
69+
{
70+
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex);
71+
await _channel.DisposeAsync();
72+
_channel = null;
73+
}
74+
}
75+
6076
[Fact]
6177
public async Task TestConcurrentQueueDeclareAndBindAsync()
6278
{

0 commit comments

Comments
 (0)