diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
index 565fa2e..c2a101f 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
@@ -101,7 +101,7 @@ await base.OpenAsync()
///
///
///
- public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default)
+ public Task PublishAsync(IMessage message, CancellationToken cancellationToken = default)
{
ThrowIfClosed();
@@ -119,17 +119,17 @@ public async Task PublishAsync(IMessage message, CancellationToke
stopwatch.Start();
}
+ TaskCompletionSource publishResultTcs =
+ Utils.CreateTaskCompletionSource();
+
try
{
- TaskCompletionSource messagePublishedTcs =
- Utils.CreateTaskCompletionSource();
-
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
{
// Note: sometimes `message` is null 🤔
- System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state));
+ Debug.Assert(Object.ReferenceEquals(this, state));
if (false == Object.ReferenceEquals(_senderLink, sender))
{
@@ -167,7 +167,15 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
}
}
- messagePublishedTcs.SetResult(publishOutcome);
+ // TODO cancellation token
+ if (_metricsReporter is not null && stopwatch is not null)
+ {
+ stopwatch.Stop();
+ _metricsReporter.Published(stopwatch.Elapsed);
+ }
+
+ var publishResult = new PublishResult(message, publishOutcome);
+ publishResultTcs.SetResult(publishResult);
}
/*
@@ -176,25 +184,16 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
*/
_senderLink.Send(nativeMessage, OutcomeCallback, this);
- // TODO cancellation token
- // PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken)
- PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
- .ConfigureAwait(false);
-
- if (_metricsReporter is not null && stopwatch is not null)
- {
- stopwatch.Stop();
- _metricsReporter.Published(stopwatch.Elapsed);
- }
-
- return new PublishResult(message, publishOutcome);
+ return publishResultTcs.Task;
}
catch (AmqpException ex)
{
stopwatch?.Stop();
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error));
- return new PublishResult(message, publishOutcome);
+ var publishResult = new PublishResult(message, publishOutcome);
+ publishResultTcs.SetResult(publishResult);
+ return publishResultTcs.Task;
}
catch (Exception e)
{