Skip to content

Commit 488c2fb

Browse files
authored
Change consumer options to uniform all the AMQP 1.0 clients interfaces (#144)
This pull request refactors the consumer builder API to use a unified enum-based approach for configuring consumer features, replacing the previous boolean-based methods. The change consolidates DirectReplyTo(bool) and PreSettled(bool) methods into a single Feature(ConsumerFeature) method that accepts an enum value. Changes: * Introduced ConsumerSettleStrategy enum with three values: ExplicitSettle, DirectReplyTo, and PreSettled * Replaced IConsumerBuilder.DirectReplyTo(bool) and IConsumerBuilder.PreSettled(bool) with IConsumerBuilder.SettleStrategy(ConsumerSettleStrategy) * Updated all consumer creation code across tests, examples, and internal implementations to use the new API Reviewed changes --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 4a9f27b commit 488c2fb

File tree

8 files changed

+67
-76
lines changed

8 files changed

+67
-76
lines changed

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,43 @@ public enum StreamOffsetSpecification
1515
Next
1616
}
1717

18+
/// <summary>
19+
/// ConsumerSettleStrategy defines the Consumer Settle available for Classic and Quorum queue consumers.
20+
/// </summary>
21+
public enum ConsumerSettleStrategy
22+
{
23+
// ExplicitSettle: means that the consumer will be created with the default settings.
24+
// message settle mode will be the default one (explicit settle)
25+
// via Context. See IContext for more details on message settlement.
26+
// ExplicitSettle is the default behavior if no feature is specified.
27+
ExplicitSettle,
28+
29+
// DirectReplyTo: Enables Direct Reply-To consumer behavior.
30+
// Feature in RabbitMQ, allowing for simplified request-reply messaging patterns.
31+
// Docs: https://www.rabbitmq.com/docs/direct-reply-to#usage-amqp
32+
// with DirectReplyTo presettled is by default.
33+
DirectReplyTo,
34+
35+
// Presettled: Deliveries are pre-settled.
36+
// When enabled, messages are automatically settled when received,
37+
// meaning they cannot be redelivered if processing fails.
38+
PreSettled,
39+
}
40+
1841
// TODO IAddressBuilder<IConsumerBuilder>?
1942
public interface IConsumerBuilder
2043
{
2144
IConsumerBuilder Queue(IQueueSpecification queueSpecification);
2245
IConsumerBuilder Queue(string? queueName);
2346

24-
/// <summary>
25-
/// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
26-
/// The server must also support direct reply-to.
27-
/// This feature allows the server to send the reply directly to the client without going through a reply queue.
28-
/// This can improve performance and reduce latency.
29-
/// Default is false.
30-
/// https://www.rabbitmq.com/docs/direct-reply-to
31-
/// </summary>
32-
IConsumerBuilder DirectReplyTo(bool directReplyTo);
33-
3447
IConsumerBuilder MessageHandler(MessageHandler handler);
3548

3649
IConsumerBuilder InitialCredits(int initialCredits);
37-
50+
3851
/// <summary>
39-
/// If pre-settled is enabled, the receiver will use ReceiverSettleMode.Second,
40-
/// meaning messages are pre-settled and the receiver does not need to explicitly settle them.
41-
/// Default is false.
52+
/// SettleStrategy: ConsumerSettleStrategy for more details on the Consumer Settle that can be enabled for the consumer.
4253
/// </summary>
43-
IConsumerBuilder PreSettled(bool preSettled);
54+
IConsumerBuilder SettleStrategy(ConsumerSettleStrategy settleStrategy);
4455

4556
/// <summary>
4657
/// SubscriptionListener interface callback to add behavior before a subscription is created.

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ public override async Task OpenAsync()
7575

7676
Attach attach;
7777

78-
if (_configuration.DirectReplyTo)
78+
if (_configuration.SettleStrategy == ConsumerSettleStrategy.DirectReplyTo)
7979
{
8080
attach = Utils.CreateDirectReplyToAttach(_id, _configuration.Filters);
8181
}
8282
else
8383
{
8484
string address = AddressBuilderHelper.AddressBuilder().Queue(_configuration.Queue).Address();
8585
attach = Utils.CreateAttach(address, DeliveryMode.AtLeastOnce, _id,
86-
_configuration.Filters, _configuration.PreSettled);
86+
_configuration.Filters, _configuration.SettleStrategy == ConsumerSettleStrategy.PreSettled);
8787
}
8888

8989
void OnAttached(ILink argLink, Attach argAttach)
@@ -176,14 +176,14 @@ private async Task ProcessMessages()
176176
continue;
177177
}
178178

179-
IContext context = _configuration.PreSettled switch
179+
IContext context = _configuration.SettleStrategy switch
180180
{
181-
true => new PreSettledDeliveryContext(),
182-
false => new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter,
181+
ConsumerSettleStrategy.PreSettled => new PreSettledDeliveryContext(),
182+
_ => new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter,
183183
_metricsReporter)
184184
};
185185

186-
if (!_configuration.PreSettled)
186+
if (_configuration.SettleStrategy != ConsumerSettleStrategy.PreSettled)
187187
{
188188
_unsettledMessageCounter.Increment();
189189
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ namespace RabbitMQ.AMQP.Client.Impl
1616
/// </summary>
1717
internal sealed class ConsumerConfiguration
1818
{
19-
2019
public string? Queue { get; set; } = null;
2120
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
2221

@@ -25,22 +24,7 @@ internal sealed class ConsumerConfiguration
2524
// TODO is a MessageHandler *really* optional???
2625
public MessageHandler? Handler { get; set; }
2726

28-
/// <summary>
29-
/// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
30-
/// The server must also support direct reply-to.
31-
/// This feature allows the server to send the reply directly to the client without going through a reply queue.
32-
/// This can improve performance and reduce latency.
33-
/// Default is false.
34-
/// https://www.rabbitmq.com/docs/direct-reply-to
35-
/// </summary>
36-
public bool DirectReplyTo { get; set; }
37-
38-
/// <summary>
39-
/// If pre-settled is enabled, the receiver will use ReceiverSettleMode.Second,
40-
/// meaning messages are pre-settled and the receiver does not need to explicitly settle them.
41-
/// Default is false.
42-
/// </summary>
43-
public bool PreSettled { get; set; }
27+
public ConsumerSettleStrategy SettleStrategy { get; set; } = ConsumerSettleStrategy.ExplicitSettle;
4428

4529
// TODO re-name to ListenerContextAction? Callback?
4630
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
@@ -79,21 +63,15 @@ public IConsumerBuilder MessageHandler(MessageHandler handler)
7963
return this;
8064
}
8165

82-
public IConsumerBuilder DirectReplyTo(bool directReplyTo)
83-
{
84-
_configuration.DirectReplyTo = directReplyTo;
85-
return this;
86-
}
87-
8866
public IConsumerBuilder InitialCredits(int initialCredits)
8967
{
9068
_configuration.InitialCredits = initialCredits;
9169
return this;
9270
}
9371

94-
public IConsumerBuilder PreSettled(bool preSettled)
72+
public IConsumerBuilder SettleStrategy(ConsumerSettleStrategy settleStrategy)
9573
{
96-
_configuration.PreSettled = preSettled;
74+
_configuration.SettleStrategy = settleStrategy;
9775
return this;
9876
}
9977

@@ -340,7 +318,6 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v
340318

341319
private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue)
342320
{
343-
344321
DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter);
345322
Map propertiesFilter = (Map)propertiesFilterValue.Value;
346323
// Note: you MUST use a symbol as the key
@@ -350,7 +327,6 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa
350327

351328
private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue)
352329
{
353-
354330
DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter);
355331
Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value;
356332
// Note: do NOT put a symbol as the key

RabbitMQ.AMQP.Client/Impl/AmqpRequester.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,14 @@ public override async Task OpenAsync()
177177
// The user is always free to create custom Requester and Responder
178178
bool canApplyDirectReplyTo = isDirectReplyToSupported &&
179179
string.IsNullOrEmpty(_configuration.ReplyToQueue);
180+
ConsumerSettleStrategy consumerSettleStrategy = canApplyDirectReplyTo
181+
? ConsumerSettleStrategy.DirectReplyTo
182+
: ConsumerSettleStrategy.ExplicitSettle;
180183

181184
_publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false);
182185
_consumer = await _configuration.Connection.ConsumerBuilder()
183186
.Queue(queueReplyTo)
184-
.DirectReplyTo(canApplyDirectReplyTo)
187+
.SettleStrategy(consumerSettleStrategy)
185188
.MessageHandler((context, message) =>
186189
{
187190
// TODO MessageHandler funcs should catch all exceptions

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.VirtualHost(string! virtualHost)
104104
RabbitMQ.AMQP.Client.Consts
105105
RabbitMQ.AMQP.Client.ConsumerException
106106
RabbitMQ.AMQP.Client.ConsumerException.ConsumerException(string! message) -> void
107+
RabbitMQ.AMQP.Client.ConsumerSettleStrategy
108+
RabbitMQ.AMQP.Client.ConsumerSettleStrategy.DirectReplyTo = 1 -> RabbitMQ.AMQP.Client.ConsumerSettleStrategy
109+
RabbitMQ.AMQP.Client.ConsumerSettleStrategy.ExplicitSettle = 0 -> RabbitMQ.AMQP.Client.ConsumerSettleStrategy
110+
RabbitMQ.AMQP.Client.ConsumerSettleStrategy.PreSettled = 2 -> RabbitMQ.AMQP.Client.ConsumerSettleStrategy
107111
RabbitMQ.AMQP.Client.Error
108112
RabbitMQ.AMQP.Client.Error.Description.get -> string?
109113
RabbitMQ.AMQP.Client.Error.Error(string? errorCode, string? description) -> void
@@ -176,7 +180,6 @@ RabbitMQ.AMQP.Client.IConsumer.Unpause() -> void
176180
RabbitMQ.AMQP.Client.IConsumer.UnsettledMessageCount.get -> long
177181
RabbitMQ.AMQP.Client.IConsumerBuilder
178182
RabbitMQ.AMQP.Client.IConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
179-
RabbitMQ.AMQP.Client.IConsumerBuilder.DirectReplyTo(bool directReplyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
180183
RabbitMQ.AMQP.Client.IConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
181184
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions
182185
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -209,9 +212,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext
209212
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.ListenerContext(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions) -> void
210213
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.StreamOptions.get -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
211214
RabbitMQ.AMQP.Client.IConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
212-
RabbitMQ.AMQP.Client.IConsumerBuilder.PreSettled(bool preSettled) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
213215
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpecification) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
214216
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string? queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
217+
RabbitMQ.AMQP.Client.IConsumerBuilder.SettleStrategy(RabbitMQ.AMQP.Client.ConsumerSettleStrategy settleStrategy) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
215218
RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
216219
RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
217220
RabbitMQ.AMQP.Client.IContext
@@ -373,12 +376,11 @@ RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
373376
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder
374377
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.AmqpConsumerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, RabbitMQ.AMQP.Client.IMetricsReporter? metricsReporter) -> void
375378
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
376-
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.DirectReplyTo(bool directReplyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
377379
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
378380
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
379-
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.PreSettled(bool preSettled) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
380381
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
381382
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(string? queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
383+
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.SettleStrategy(RabbitMQ.AMQP.Client.ConsumerSettleStrategy settleStrategy) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
382384
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
383385
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! context) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
384386
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment

Tests/Consumer/PreSettledConsumerTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public async Task PreSettledConsumerCanReceiveMessages()
2727
TaskCompletionSource<IMessage> tcs = new();
2828
IConsumer consumer = await _connection.ConsumerBuilder()
2929
.Queue(queueSpec)
30-
.PreSettled(true)
30+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
3131
.MessageHandler((context, message) =>
3232
{
3333
// With PreSettled, messages are already settled, so we don't need to call Accept()
@@ -64,7 +64,7 @@ public async Task PreSettledConsumerAcceptThrowsException()
6464
TaskCompletionSource<InvalidOperationException> tcs = new();
6565
IConsumer consumer = await _connection.ConsumerBuilder()
6666
.Queue(queueSpec)
67-
.PreSettled(true)
67+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
6868
.MessageHandler((context, message) =>
6969
{
7070
try
@@ -104,7 +104,7 @@ public async Task PreSettledConsumerDiscardThrowsException()
104104
TaskCompletionSource<InvalidOperationException> tcs = new();
105105
IConsumer consumer = await _connection.ConsumerBuilder()
106106
.Queue(queueSpec)
107-
.PreSettled(true)
107+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
108108
.MessageHandler((context, message) =>
109109
{
110110
try
@@ -144,7 +144,7 @@ public async Task PreSettledConsumerDiscardWithAnnotationsThrowsException()
144144
TaskCompletionSource<InvalidOperationException> tcs = new();
145145
IConsumer consumer = await _connection.ConsumerBuilder()
146146
.Queue(queueSpec)
147-
.PreSettled(true)
147+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
148148
.MessageHandler((context, message) =>
149149
{
150150
try
@@ -185,7 +185,7 @@ public async Task PreSettledConsumerRequeueThrowsException()
185185
TaskCompletionSource<InvalidOperationException> tcs = new();
186186
IConsumer consumer = await _connection.ConsumerBuilder()
187187
.Queue(queueSpec)
188-
.PreSettled(true)
188+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
189189
.MessageHandler((context, message) =>
190190
{
191191
try
@@ -225,7 +225,7 @@ public async Task PreSettledConsumerRequeueWithAnnotationsThrowsException()
225225
TaskCompletionSource<InvalidOperationException> tcs = new();
226226
IConsumer consumer = await _connection.ConsumerBuilder()
227227
.Queue(queueSpec)
228-
.PreSettled(true)
228+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
229229
.MessageHandler((context, message) =>
230230
{
231231
try
@@ -266,7 +266,7 @@ public async Task PreSettledConsumerBatchThrowsException()
266266
TaskCompletionSource<InvalidOperationException> tcs = new();
267267
IConsumer consumer = await _connection.ConsumerBuilder()
268268
.Queue(queueSpec)
269-
.PreSettled(true)
269+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
270270
.MessageHandler((context, message) =>
271271
{
272272
try
@@ -308,7 +308,7 @@ public async Task PreSettledConsumerUnsettledMessageCountIsZero()
308308
int receivedCount = 0;
309309
IConsumer consumer = await _connection.ConsumerBuilder()
310310
.Queue(queueSpec)
311-
.PreSettled(true)
311+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
312312
.MessageHandler((context, message) =>
313313
{
314314
receivedCount++;
@@ -347,7 +347,7 @@ public async Task NonPreSettledConsumerUnsettledMessageCountIncrements()
347347
int receivedCount = 0;
348348
IConsumer consumer = await _connection.ConsumerBuilder()
349349
.Queue(queueSpec)
350-
.PreSettled(false) // Explicitly set to false
350+
.SettleStrategy(ConsumerSettleStrategy.ExplicitSettle) // Explicitly avoid pre-settled feature
351351
.MessageHandler((context, message) =>
352352
{
353353
receivedCount++;
@@ -387,7 +387,7 @@ public async Task PreSettledConsumerCanReceiveMultipleMessages()
387387
int receivedCount = 0;
388388
IConsumer consumer = await _connection.ConsumerBuilder()
389389
.Queue(queueSpec)
390-
.PreSettled(true)
390+
.SettleStrategy(ConsumerSettleStrategy.PreSettled)
391391
.MessageHandler((context, message) =>
392392
{
393393
receivedCount++;

Tests/DirectReply/DirectReplyTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async Task ValidateDirectReplyQName()
2323
"DirectReply is not supported by the connection.");
2424

2525
IConsumer consumer = await _connection.ConsumerBuilder()
26-
.DirectReplyTo(true)
26+
.SettleStrategy(ConsumerSettleStrategy.DirectReplyTo)
2727
.MessageHandler((IContext _, IMessage _) => Task.CompletedTask)
2828
.BuildAndStartAsync();
2929

@@ -42,7 +42,7 @@ public async Task UseDirectReplyToReceiveAMessage()
4242
"DirectReply is not supported by the connection.");
4343

4444
IConsumer consumer = await _connection.ConsumerBuilder()
45-
.DirectReplyTo(true)
45+
.SettleStrategy(ConsumerSettleStrategy.DirectReplyTo)
4646
.MessageHandler((IContext _, IMessage msg) =>
4747
{
4848
tcs.SetResult(msg);

0 commit comments

Comments
 (0)