From 00e227aca7d688e7b8758e9a3979dfc334a174df Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 21 Feb 2025 14:57:45 -0800 Subject: [PATCH] Demonstrate handling of `Channel.Close` Related to https://github.com/rabbitmq/rabbitmq-server/discussions/13387 During the delivery callback for a message, immediately publish with an invalid value for message expiration. This causes an immediate channel closure. * Add logging, use `Try...` methods for `tcs`. --- .../Impl/Connection.Receive.cs | 2 +- .../RabbitMQ.Client/Impl/SessionManager.cs | 5 + .../Test/Integration/GH/TestGitHubIssues.cs | 164 ++++++++++++++++++ 3 files changed, 170 insertions(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs index a5d90ff7b..ae67544e3 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs @@ -181,7 +181,7 @@ await _session0.HandleFrameAsync(frame, cancellationToken) // frames for non-zero channels (and any inbound // commands on channel zero that aren't // Connection.CloseOk) must be discarded. - if (_closeReason is null) + if (CloseReason is null) { // No close reason, not quiescing the // connection. Handle the frame. (Of course, the diff --git a/projects/RabbitMQ.Client/Impl/SessionManager.cs b/projects/RabbitMQ.Client/Impl/SessionManager.cs index 7c6e35635..1ee0566a5 100644 --- a/projects/RabbitMQ.Client/Impl/SessionManager.cs +++ b/projects/RabbitMQ.Client/Impl/SessionManager.cs @@ -100,6 +100,11 @@ public ISession Lookup(int number) { lock (_sessionMap) { + /* + * Note: rabbitmq/rabbitmq-server#13337 + * When investigating the above issue, a couple KeyNotFoundExceptions + * were thrown here during test shutdown. No reliable reproducer. + */ return _sessionMap[number]; } } diff --git a/projects/Test/Integration/GH/TestGitHubIssues.cs b/projects/Test/Integration/GH/TestGitHubIssues.cs index 45ca4e8f0..dfee6e720 100644 --- a/projects/Test/Integration/GH/TestGitHubIssues.cs +++ b/projects/Test/Integration/GH/TestGitHubIssues.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; +using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; @@ -165,5 +167,167 @@ await Assert.ThrowsAnyAsync( async () => await _connFactory.CreateConnectionAsync()); Assert.IsAssignableFrom(ex.InnerException); } + + [Fact] + public async Task SendInvalidPublishMaybeClosesConnection_GH13387() + { + const int messageCount = 200; + + _connFactory = new ConnectionFactory(); + _conn = await _connFactory.CreateConnectionAsync(); + + var opts = new CreateChannelOptions( + publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true); + _channel = await _conn.CreateChannelAsync(opts); + + await _channel.BasicQosAsync(0, 10, false); + + string queueName = GenerateQueueName(); + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + Assert.Equal(queueName, q.QueueName); + + byte[] body = Encoding.ASCII.GetBytes("incoming message"); + var publishTasks = new List(); + for (int i = 0; i < messageCount; i++) + { + ValueTask pt = _channel.BasicPublishAsync( + exchange: string.Empty, + routingKey: queueName, + body: body); + publishTasks.Add(pt); + if (i % 20 == 0) + { + foreach (ValueTask t in publishTasks) + { + await t; + } + publishTasks.Clear(); + } + } + + foreach (ValueTask t in publishTasks) + { + await t; + } + publishTasks.Clear(); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _conn.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) => + { + if (IsVerbose) + { + _output.WriteLine("_conn.CallbackExceptionAsync: {0}", args.Exception); + } + tcs.TrySetException(args.Exception); + return Task.CompletedTask; + }; + + _conn.ConnectionShutdownAsync += (object sender, ShutdownEventArgs args) => + { + if (args.Exception is not null) + { + if (IsVerbose) + { + _output.WriteLine("_conn.ConnectionShutdownAsync: {0}", args.Exception); + } + tcs.TrySetException(args.Exception); + } + else + { + if (IsVerbose) + { + _output.WriteLine("_conn.ConnectionShutdownAsync"); + } + tcs.TrySetResult(false); + } + return Task.CompletedTask; + }; + + _channel.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) => + { + if (IsVerbose) + { + _output.WriteLine("_channel.CallbackExceptionAsync: {0}", args.Exception); + } + tcs.TrySetException(args.Exception); + return Task.CompletedTask; + }; + + _channel.ChannelShutdownAsync += (object sender, ShutdownEventArgs args) => + { + if (args.Exception is not null) + { + if (IsVerbose) + { + _output.WriteLine("_channel.ChannelShutdownAsync: {0}", args.Exception); + } + tcs.TrySetException(args.Exception); + } + else + { + if (IsVerbose) + { + _output.WriteLine("_channel.ChannelShutdownAsync"); + } + tcs.TrySetResult(false); + } + return Task.CompletedTask; + }; + + var ackExceptions = new List(); + var publishExceptions = new List(); + var props = new BasicProperties { Expiration = "-1" }; + int receivedCounter = 0; + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) => + { + var c = (AsyncEventingBasicConsumer)sender; + IChannel ch = c.Channel; + try + { + await ch.BasicAckAsync(args.DeliveryTag, false); + } + catch (Exception ex) + { + ackExceptions.Add(ex); + } + + try + { + await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + mandatory: true, basicProperties: props, body: body); + } + catch (Exception ex) + { + publishExceptions.Add(ex); + } + + if (Interlocked.Increment(ref receivedCounter) >= messageCount) + { + tcs.SetResult(true); + } + }; + + consumer.ShutdownAsync += (object sender, ShutdownEventArgs args) => + { + if (IsVerbose) + { + _output.WriteLine("consumer.ShutdownAsync"); + } + return Task.CompletedTask; + }; + + await _channel.BasicConsumeAsync(queueName, false, consumer); + + await tcs.Task; + + if (IsVerbose) + { + _output.WriteLine("saw {0} ackExceptions", ackExceptions.Count); + _output.WriteLine("saw {0} publishExceptions", publishExceptions.Count); + } + } } }