diff --git a/projects/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Applications/PublisherConfirms/PublisherConfirms.cs index e683e283c..b23af91a0 100644 --- a/projects/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Applications/PublisherConfirms/PublisherConfirms.cs @@ -30,13 +30,13 @@ //--------------------------------------------------------------------------- using System; -using System.Buffers.Binary; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; const ushort MAX_OUTSTANDING_CONFIRMS = 256; @@ -124,11 +124,33 @@ async Task PublishMessagesInBatchAsync() var sw = new Stopwatch(); sw.Start(); + channel.BasicReturnAsync += (sender, ea) => + { + ulong sequenceNumber = 0; + + IReadOnlyBasicProperties props = ea.BasicProperties; + if (props.Headers is not null) + { + object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader]; + if (maybeSeqNum is long longSequenceNumber) + { + sequenceNumber = (ulong)longSequenceNumber; + } + } + + return Console.Out.WriteLineAsync($"{DateTime.Now} [INFO] message sequence number '{sequenceNumber}' has been basic.return-ed"); + }; + var publishTasks = new List(); for (int i = 0; i < MESSAGE_COUNT; i++) { + string rk = queueName; + if (i % 1000 == 0) + { + rk = Guid.NewGuid().ToString(); + } byte[] body = Encoding.UTF8.GetBytes(i.ToString()); - publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props)); + publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props)); outstandingMessageCount++; if (outstandingMessageCount == batchSize) @@ -139,9 +161,13 @@ async Task PublishMessagesInBatchAsync() { await pt; } + catch (PublishException pex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'"); + } catch (Exception ex) { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'"); } } publishTasks.Clear(); @@ -157,9 +183,13 @@ async Task PublishMessagesInBatchAsync() { await pt; } + catch (PublishException pex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'"); + } catch (Exception ex) { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'"); } } publishTasks.Clear(); @@ -236,7 +266,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) } } - channel.BasicReturnAsync += (sender, ea) => + channel.BasicReturnAsync += async (sender, ea) => { ulong sequenceNumber = 0; @@ -244,14 +274,15 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) if (props.Headers is not null) { object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader]; - if (maybeSeqNum is not null) + if (maybeSeqNum is long longSequenceNumber) { - sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum); + sequenceNumber = (ulong)longSequenceNumber; } } - Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed"); - return CleanOutstandingConfirms(sequenceNumber, false); + await Console.Out.WriteLineAsync($"{DateTime.Now} [INFO] message sequence number '{sequenceNumber}' has been basic.return-ed"); + + await CleanOutstandingConfirms(sequenceNumber, false); }; channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); @@ -290,13 +321,21 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) // This will cause a basic.return, for fun rk = Guid.NewGuid().ToString(); } + + var msgProps = new BasicProperties + { + Persistent = true, + Headers = new Dictionary() + }; + + msgProps.Headers.Add(Constants.PublishSequenceNumberHeader, (long)nextPublishSeqNo); + (ulong, ValueTask) data = - (nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props)); + (nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: msgProps)); publishTasks.Add(data); } using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - // await Task.WhenAll(publishTasks).WaitAsync(cts.Token); foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks) { try diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 47c87a177..9ed4e0de1 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -36,7 +36,6 @@ using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Framing; -using RabbitMQ.Client.Util; namespace RabbitMQ.Client.Impl { @@ -217,9 +216,14 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers { if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled) { - byte[] publishSequenceNumberBytes = new byte[8]; - NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber); - headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes; + if (publishSequenceNumber > long.MaxValue) + { + headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumber.ToString(); + } + else + { + headers[Constants.PublishSequenceNumberHeader] = (long)publishSequenceNumber; + } } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 4ad9475df..d2ca45247 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -34,13 +34,13 @@ using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Text; using System.Threading; using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; -using RabbitMQ.Client.Util; namespace RabbitMQ.Client.Impl { @@ -233,10 +233,20 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent) ulong publishSequenceNumber = 0; IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties; object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader]; - if (maybeSeqNum != null && - maybeSeqNum is byte[] seqNumBytes) + if (maybeSeqNum != null) { - publishSequenceNumber = NetworkOrderDeserializer.ReadUInt64(seqNumBytes); + switch (maybeSeqNum) + { + case long seqNumLong: + publishSequenceNumber = (ulong)seqNumLong; + break; + case string seqNumString: + publishSequenceNumber = ulong.Parse(seqNumString); + break; + case byte[] seqNumBytes: + publishSequenceNumber = ulong.Parse(Encoding.ASCII.GetString(seqNumBytes)); + break; + } } HandleNack(publishSequenceNumber, multiple: false, isReturn: true);