From c3d3e201a5ad5e573b59fb809588cc21880155b1 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 17 Mar 2025 15:34:01 -0700 Subject: [PATCH] Make `PublishAsync` actually async. Fixes #111 This required a small modification to when the `TaskCompletionSource` representing the operation was populated. --- RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs | 37 +++++++++++----------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs index 565fa2e..c2a101f 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs @@ -101,7 +101,7 @@ await base.OpenAsync() /// /// /// - public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) + public Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) { ThrowIfClosed(); @@ -119,17 +119,17 @@ public async Task PublishAsync(IMessage message, CancellationToke stopwatch.Start(); } + TaskCompletionSource publishResultTcs = + Utils.CreateTaskCompletionSource(); + try { - TaskCompletionSource messagePublishedTcs = - Utils.CreateTaskCompletionSource(); - Message nativeMessage = ((AmqpMessage)message).NativeMessage; void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state) { // Note: sometimes `message` is null 🤔 - System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state)); + Debug.Assert(Object.ReferenceEquals(this, state)); if (false == Object.ReferenceEquals(_senderLink, sender)) { @@ -167,7 +167,15 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st } } - messagePublishedTcs.SetResult(publishOutcome); + // TODO cancellation token + if (_metricsReporter is not null && stopwatch is not null) + { + stopwatch.Stop(); + _metricsReporter.Published(stopwatch.Elapsed); + } + + var publishResult = new PublishResult(message, publishOutcome); + publishResultTcs.SetResult(publishResult); } /* @@ -176,25 +184,16 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st */ _senderLink.Send(nativeMessage, OutcomeCallback, this); - // TODO cancellation token - // PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken) - PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) - .ConfigureAwait(false); - - if (_metricsReporter is not null && stopwatch is not null) - { - stopwatch.Stop(); - _metricsReporter.Published(stopwatch.Elapsed); - } - - return new PublishResult(message, publishOutcome); + return publishResultTcs.Task; } catch (AmqpException ex) { stopwatch?.Stop(); _metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED); var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error)); - return new PublishResult(message, publishOutcome); + var publishResult = new PublishResult(message, publishOutcome); + publishResultTcs.SetResult(publishResult); + return publishResultTcs.Task; } catch (Exception e) {