Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ await base.OpenAsync()
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="NotSupportedException"></exception>
/// <exception cref="PublisherException"></exception>
public async Task<PublishResult> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
public Task<PublishResult> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
{
ThrowIfClosed();

Expand All @@ -119,17 +119,17 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
stopwatch.Start();
}

TaskCompletionSource<PublishResult> publishResultTcs =
Utils.CreateTaskCompletionSource<PublishResult>();

try
{
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
Utils.CreateTaskCompletionSource<PublishOutcome>();

Message nativeMessage = ((AmqpMessage)message).NativeMessage;

void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
{
// Note: sometimes `message` is null 🤔
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state));
Debug.Assert(Object.ReferenceEquals(this, state));

if (false == Object.ReferenceEquals(_senderLink, sender))
{
Expand Down Expand Up @@ -167,7 +167,15 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
}
}

messagePublishedTcs.SetResult(publishOutcome);
// TODO cancellation token
if (_metricsReporter is not null && stopwatch is not null)
{
stopwatch.Stop();
_metricsReporter.Published(stopwatch.Elapsed);
}

var publishResult = new PublishResult(message, publishOutcome);
publishResultTcs.SetResult(publishResult);
}

/*
Expand All @@ -176,25 +184,16 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
*/
_senderLink.Send(nativeMessage, OutcomeCallback, this);

// TODO cancellation token
// PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken)
PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
.ConfigureAwait(false);

if (_metricsReporter is not null && stopwatch is not null)
{
stopwatch.Stop();
_metricsReporter.Published(stopwatch.Elapsed);
}

return new PublishResult(message, publishOutcome);
return publishResultTcs.Task;
}
catch (AmqpException ex)
{
stopwatch?.Stop();
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error));
return new PublishResult(message, publishOutcome);
var publishResult = new PublishResult(message, publishOutcome);
publishResultTcs.SetResult(publishResult);
return publishResultTcs.Task;
}
catch (Exception e)
{
Expand Down