Skip to content

Commit 0538354

Browse files
committed
IPublisher XML doc
Also do some small refactoring of `PublishAsync`.
1 parent 670aac4 commit 0538354

File tree

4 files changed

+96
-73
lines changed

4 files changed

+96
-73
lines changed

RabbitMQ.AMQP.Client/IPublisher.cs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,40 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5-
using System;
65
using System.Threading;
76
using System.Threading.Tasks;
87

98
namespace RabbitMQ.AMQP.Client
109
{
11-
public class PublisherException : Exception
12-
{
13-
public PublisherException(string message) : base(message)
14-
{
15-
}
16-
}
17-
1810
/// <summary>
1911
/// Represents the status of a publish operation.
20-
/// Accepted: The message was accepted for publication.
21-
/// Rejected: The message was rejected by the broker.
22-
/// Released: The message was released by the broker.
12+
/// See <see href="https://www.rabbitmq.com/docs/amqp#outcomes">AMQP Outcomes</see>.
2313
/// </summary>
2414
public enum OutcomeState
2515
{
16+
/// <summary>
17+
/// The message has been accepted by the broker.
18+
/// </summary>
2619
Accepted,
20+
21+
/// <summary>
22+
/// At least one queue the message was routed to rejected the message. This happens when the
23+
/// queue length is exceeded and the queue's overflow behaviour is set to reject-publish or when
24+
/// a target classic queue is unavailable.
25+
/// </summary>
2726
Rejected,
28-
Released,
27+
28+
/// <summary>
29+
/// The broker could not route the message to any queue.
30+
/// This is likely to be due to a topology misconfiguration.
31+
/// </summary>
32+
Released
2933
}
3034

3135
/// <summary>
3236
/// PublishOutcome represents the outcome of a publish operation.
3337
/// It contains the state of the outcome and an error if the outcome is not successful.
3438
/// </summary>
35-
3639
public class PublishOutcome
3740
{
3841
public PublishOutcome(OutcomeState state, Error? error)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 60 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -122,83 +122,83 @@ public Task<PublishResult> PublishAsync(IMessage message, CancellationToken canc
122122
TaskCompletionSource<PublishResult> publishResultTcs =
123123
Utils.CreateTaskCompletionSource<PublishResult>();
124124

125-
try
125+
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
126+
127+
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
126128
{
127-
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
129+
// Note: sometimes `inMessage` is null 🤔
130+
Debug.Assert(Object.ReferenceEquals(this, state));
128131

129-
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
132+
if (false == Object.ReferenceEquals(_senderLink, sender))
130133
{
131-
// Note: sometimes `message` is null 🤔
132-
Debug.Assert(Object.ReferenceEquals(this, state));
133-
134-
if (false == Object.ReferenceEquals(_senderLink, sender))
135-
{
136-
// TODO log this case?
137-
}
138-
139-
PublishOutcome publishOutcome;
140-
switch (outcome)
141-
{
142-
case Rejected rejectedOutcome:
143-
{
144-
const OutcomeState publishState = OutcomeState.Rejected;
145-
publishOutcome = new PublishOutcome(publishState,
146-
Utils.ConvertError(rejectedOutcome.Error));
147-
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
148-
break;
149-
}
150-
case Released:
151-
{
152-
const OutcomeState publishState = OutcomeState.Released;
153-
publishOutcome = new PublishOutcome(publishState, null);
154-
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.RELEASED);
155-
break;
156-
}
157-
case Accepted:
158-
{
159-
const OutcomeState publishState = OutcomeState.Accepted;
160-
publishOutcome = new PublishOutcome(publishState, null);
161-
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.ACCEPTED);
162-
break;
163-
}
164-
default:
165-
{
166-
throw new NotSupportedException();
167-
}
168-
}
134+
// TODO log this case?
135+
}
169136

170-
// TODO cancellation token
171-
if (_metricsReporter is not null && stopwatch is not null)
172-
{
173-
stopwatch.Stop();
174-
_metricsReporter.Published(stopwatch.Elapsed);
175-
}
137+
PublishOutcome publishOutcome;
138+
switch (outcome)
139+
{
140+
case Rejected rejectedOutcome:
141+
{
142+
const OutcomeState publishState = OutcomeState.Rejected;
143+
publishOutcome = new PublishOutcome(publishState,
144+
Utils.ConvertError(rejectedOutcome.Error));
145+
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
146+
break;
147+
}
148+
case Released:
149+
{
150+
const OutcomeState publishState = OutcomeState.Released;
151+
publishOutcome = new PublishOutcome(publishState, null);
152+
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.RELEASED);
153+
break;
154+
}
155+
case Accepted:
156+
{
157+
const OutcomeState publishState = OutcomeState.Accepted;
158+
publishOutcome = new PublishOutcome(publishState, null);
159+
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.ACCEPTED);
160+
break;
161+
}
162+
default:
163+
{
164+
throw new NotSupportedException();
165+
}
166+
}
176167

177-
var publishResult = new PublishResult(message, publishOutcome);
178-
publishResultTcs.SetResult(publishResult);
168+
// TODO cancellation token
169+
if (_metricsReporter is not null && stopwatch is not null)
170+
{
171+
stopwatch.Stop();
172+
_metricsReporter.Published(stopwatch.Elapsed);
179173
}
180174

181-
/*
182-
* Note: do NOT use SendAsync here as it prevents the Closed event from
183-
* firing on the native connection. Bizarre, I know!
184-
*/
185-
_senderLink.Send(nativeMessage, OutcomeCallback, this);
175+
var publishResult = new PublishResult(message, publishOutcome);
176+
publishResultTcs.SetResult(publishResult);
177+
}
186178

187-
return publishResultTcs.Task;
179+
/*
180+
* Note: do NOT use SendAsync here as it prevents the Closed event from
181+
* firing on the native connection. Bizarre, I know!
182+
*/
183+
try
184+
{
185+
_senderLink.Send(nativeMessage, OutcomeCallback, this);
188186
}
189-
catch (AmqpException ex)
187+
catch (AmqpException amqpException)
190188
{
191189
stopwatch?.Stop();
192190
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
193-
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error));
191+
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(amqpException.Error));
194192
var publishResult = new PublishResult(message, publishOutcome);
195193
publishResultTcs.SetResult(publishResult);
196-
return publishResultTcs.Task;
197194
}
198-
catch (Exception e)
195+
catch (Exception ex)
199196
{
200-
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
197+
var publisherException = new PublisherException($"{ToString()} Failed to publish message", ex);
198+
publishResultTcs.SetException(publisherException);
201199
}
200+
201+
return publishResultTcs.Task;
202202
}
203203

204204
public override async Task CloseAsync()

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ RabbitMQ.AMQP.Client.PreconditionFailedException
725725
RabbitMQ.AMQP.Client.PreconditionFailedException.PreconditionFailedException(string! message) -> void
726726
RabbitMQ.AMQP.Client.PublisherException
727727
RabbitMQ.AMQP.Client.PublisherException.PublisherException(string! message) -> void
728+
RabbitMQ.AMQP.Client.PublisherException.PublisherException(string! message, System.Exception! innerException) -> void
728729
RabbitMQ.AMQP.Client.PublishOutcome
729730
RabbitMQ.AMQP.Client.PublishOutcome.Error.get -> RabbitMQ.AMQP.Client.Error?
730731
RabbitMQ.AMQP.Client.PublishOutcome.PublishOutcome(RabbitMQ.AMQP.Client.OutcomeState state, RabbitMQ.AMQP.Client.Error? error) -> void
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// This source code is dual-licensed under the Apache License, version 2.0,
2+
// and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System;
6+
7+
namespace RabbitMQ.AMQP.Client
8+
{
9+
public class PublisherException : Exception
10+
{
11+
public PublisherException(string message) : base(message)
12+
{
13+
}
14+
15+
public PublisherException(string message, Exception innerException) : base(message, innerException)
16+
{
17+
}
18+
}
19+
}

0 commit comments

Comments
 (0)