diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 59ad8f11a..de581f0c4 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -80,7 +80,7 @@ await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) => await m.QueueBindAsync(q, x, rk); await m.BasicPublishAsync(x, rk, _messageBody); - Assert.True(await TestConnectionRecoveryBase.WaitForConfirmsWithCancellationAsync(m)); + Assert.True(await WaitForConfirmsWithCancellationAsync(m)); await m.ExchangeDeclarePassiveAsync(x); }); } @@ -234,11 +234,11 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) return tcs; } - protected static Task WaitForConfirmsWithCancellationAsync(IChannel m) + protected static Task WaitForConfirmsWithCancellationAsync(IChannel channel) { using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) { - return m.WaitForConfirmsAsync(cts.Token); + return channel.WaitForConfirmsAsync(cts.Token); } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index 726bf5979..5cf9ab2c3 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -128,8 +128,6 @@ await TestConcurrentOperationsAsync(async () => Assert.True(await tcs.Task); }, Iterations); - - _output.WriteLine("@@@@@@@@ PUBLISH ACK COUNT: {0}", publishAckCount); } } } diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index e6e7e786f..c13d66ee4 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -270,6 +270,12 @@ public async Task TestCreateChannelOnClosedAutorecoveringConnectionDoesNotHang() [Fact] public async Task TestTopologyRecoveryConsumerFilter() { + const string exchange = "topology.recovery.exchange"; + const string queueWithRecoveredConsumer = "topology.recovery.queue.1"; + const string queueWithIgnoredConsumer = "topology.recovery.queue.2"; + const string binding1 = "recovered.binding.1"; + const string binding2 = "recovered.binding.2"; + var filter = new TopologyRecoveryFilter { ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered") @@ -280,17 +286,13 @@ public async Task TestTopologyRecoveryConsumerFilter() using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter)) { conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); + conn.ConnectionRecoveryError += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); + conn.CallbackException += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); using (IChannel ch = await conn.CreateChannelAsync()) { await ch.ConfirmSelectAsync(); - string exchange = "topology.recovery.exchange"; - string queueWithRecoveredConsumer = "topology.recovery.queue.1"; - string queueWithIgnoredConsumer = "topology.recovery.queue.2"; - string binding1 = "recovered.binding.1"; - string binding2 = "recovered.binding.2"; - await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false); await ch.QueueDeclareAsync(queueWithIgnoredConsumer, false, false, false); @@ -325,6 +327,7 @@ public async Task TestTopologyRecoveryConsumerFilter() Assert.True(ch.IsOpen); await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message")); await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message")); + await WaitForConfirmsWithCancellationAsync(ch); await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerRecoveryTcs.Task); diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index 8fdfb1d8c..791e52e2d 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -228,21 +228,24 @@ public async Task TestTopologyRecoveryBindingFilter() [Fact] public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() { + const string exchange = "topology.recovery.exchange"; + const string queue1 = "topology.recovery.queue.1"; + const string queue2 = "topology.recovery.queue.2"; + const string binding1 = "recovered.binding"; + const string binding2 = "filtered.binding"; + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter(); AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); + conn.ConnectionRecoveryError += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); + conn.CallbackException += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); + IChannel ch = await conn.CreateChannelAsync(); try { await ch.ConfirmSelectAsync(); - string exchange = "topology.recovery.exchange"; - string queue1 = "topology.recovery.queue.1"; - string queue2 = "topology.recovery.queue.2"; - string binding1 = "recovered.binding"; - string binding2 = "filtered.binding"; - await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queue1, false, false, false); await ch.QueueDeclareAsync(queue2, false, false, false); @@ -281,16 +284,14 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() await ch.QueueDeclarePassiveAsync(queue1); await ch.QueueDeclarePassiveAsync(queue2); - await ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message")); - // await ch.WaitForConfirmsOrDieAsync(); - - await ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message")); - // await ch.WaitForConfirmsOrDieAsync(); + var pt1 = ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message")); + var pt2 = ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message")); + await WaitForConfirmsWithCancellationAsync(ch); + await Task.WhenAll(pt1.AsTask(), pt2.AsTask()).WaitAsync(WaitSpan); - await consumerReceivedTcs1.Task.WaitAsync(TimeSpan.FromSeconds(5)); - await consumerReceivedTcs2.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(consumerReceivedTcs1.Task.IsCompletedSuccessfully()); - Assert.True(consumerReceivedTcs2.Task.IsCompletedSuccessfully()); + await Task.WhenAll(consumerReceivedTcs1.Task, consumerReceivedTcs2.Task).WaitAsync(WaitSpan); + Assert.True(await consumerReceivedTcs1.Task); + Assert.True(await consumerReceivedTcs2.Task); } finally {