diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index 3bc28aacae..3510ac070e 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -1194,11 +1194,28 @@ public void BasicPublish(string exchange, } } - _Private_BasicPublish(exchange, - routingKey, - mandatory, - basicProperties, - body); + try + { + _Private_BasicPublish(exchange, + routingKey, + mandatory, + basicProperties, + body); + } + catch + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + NextPublishSeqNo--; + _pendingDeliveryTags.RemoveLast(); + _deliveryTagsCountdown.Reset(_pendingDeliveryTags.Count); + } + } + + throw; + } } public void UpdateSecret(string newSecret, string reason) diff --git a/projects/Unit/TestConfirmSelect.cs b/projects/Unit/TestConfirmSelect.cs index 3886addad8..06b10169d2 100644 --- a/projects/Unit/TestConfirmSelect.cs +++ b/projects/Unit/TestConfirmSelect.cs @@ -29,6 +29,7 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- +using System; using NUnit.Framework; namespace RabbitMQ.Client.Unit @@ -63,5 +64,61 @@ protected void Publish() { Model.BasicPublish("", "amq.fanout", null, encoding.GetBytes("message")); } + + [Test] + [TestCase(255)] + [TestCase(256)] + public void TestDeliveryTagDiverged_GH1043(int correlationIdLength) + { + bool.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_VERBOSE"), out bool verbose); + + byte[] body = RandomMessageBody(); + + Model.ExchangeDeclare("sample", "fanout", autoDelete: true); + if (verbose) + { + Model.BasicAcks += (s, e) => Console.WriteLine("Acked {0}", e.DeliveryTag); + } + Model.ConfirmSelect(); + + IBasicProperties properties = Model.CreateBasicProperties(); + if (verbose) + { + Console.WriteLine("Client delivery tag {0}", Model.NextPublishSeqNo); + } + Model.BasicPublish(exchange: "sample", routingKey: string.Empty, properties, body); + Model.WaitForConfirmsOrDie(); + + try + { + properties = Model.CreateBasicProperties(); + properties.CorrelationId = new string('o', correlationIdLength); + if (verbose) + { + Console.WriteLine("Client delivery tag {0}", Model.NextPublishSeqNo); + } + Model.BasicPublish("sample", string.Empty, properties, body); + Model.WaitForConfirmsOrDie(); + } + catch (Exception e) + { + if (verbose) + { + Console.WriteLine("Error when trying to publish with long string: {0}", e.Message); + } + } + + properties = Model.CreateBasicProperties(); + if (verbose) + { + Console.WriteLine("Client delivery tag {0}", Model.NextPublishSeqNo); + } + Model.BasicPublish("sample", string.Empty, properties, body); + Model.WaitForConfirmsOrDie(); + if (verbose) + { + Console.WriteLine("I'm done..."); + } + } } }