-
Notifications
You must be signed in to change notification settings - Fork 613
Description
We have a situation in which the IModel is no longer usable for publishing and waiting for publish confirms with WaitForConfirmsOrDie() method. We have injected CorrelationId to the IBasicProperties object when publish a message for distributed tracing. Sometime our correlationId was too big, so the error occurred when the attempt to publish an event was made. We have stuck on the method WaitForConfirmsOrDie(), it blocks the thread and didn't return. It turns out that in such situation the delivery tag tracked on the client has been diverged from delivery tag tracked by the server. We have fixed the issue currently by moving correlationId to headers, nevertheless, I think it is a bug on the RabbitMQ.Client side.
When client generates a new delivery tag it should revert it back if it failed to send the message.
Given the example, we will never see "I'm done..." in the console:
Client delivery tag 1
Acked 1
Client delivery tag 2
Error when trying to publish with long string: Value exceeds the maximum allowed length of 255 bytes. (Parameter 'val')
Actual value was oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo.
Client delivery tag 3
Acked 2
So, here the reproducible example (tried with 5.1.2 and 6.2.1 version - behavior is the same):
static void Main(string[] args)
{
var connectionFactory = new ConnectionFactory(); // Configure here your RabbitMQ
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare("sample", "fanout", autoDelete: true);
channel.BasicAcks += (s, e) => Console.WriteLine("Acked {0}", e.DeliveryTag);
channel.ConfirmSelect();
var properties = channel.CreateBasicProperties();
Console.WriteLine("Client delivery tag {0}", channel.NextPublishSeqNo);
channel.BasicPublish("sample", string.Empty, properties, new byte[1] { 42 });
channel.WaitForConfirmsOrDie();
properties = channel.CreateBasicProperties();
try
{
properties.CorrelationId = new string('o', 256);
Console.WriteLine("Client delivery tag {0}", channel.NextPublishSeqNo);
channel.BasicPublish("sample", string.Empty, properties, new byte[1] { 42 });
channel.WaitForConfirmsOrDie();
}
catch (Exception e)
{
Console.WriteLine("Error when trying to publish with long string: {0}", e.Message);
}
properties = channel.CreateBasicProperties();
Console.WriteLine("Client delivery tag {0}", channel.NextPublishSeqNo);
channel.BasicPublish("sample", string.Empty, properties, new byte[1] { 42 });
channel.WaitForConfirmsOrDie();
Console.WriteLine("I'm done...");
}