Skip to content

Commit b1a93ba

Browse files
committed
Implement batch disposition
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c64a6b7 commit b1a93ba

File tree

4 files changed

+72
-11
lines changed

4 files changed

+72
-11
lines changed

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,6 @@ public interface IBatchContext : IContext
136136
/// Get the current number of message contexts in the batch context.
137137
/// @return current number of message contexts in the batch
138138
/// </summary>
139-
int Size();
139+
int Count();
140140
}
141141
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ public IBatchContext Batch()
159159
}
160160
}
161161

162-
internal class BatchDeliveryContext : IBatchContext
162+
///<summary>
163+
/// BatchDeliveryContext is a client side helper class that allows
164+
/// accumulating multiple message contexts and settling them at once.
165+
/// It is thread-safe and can be used from multiple threads.
166+
/// </summary>
167+
public class BatchDeliveryContext : IBatchContext
163168
{
164169
private readonly List<IContext> _contexts = new();
165170
private readonly SemaphoreSlim _semaphore = new(1, 1);
@@ -265,6 +270,6 @@ public void Add(IContext context)
265270
_contexts.Add(context);
266271
}
267272

268-
public int Size() => _contexts.Count;
273+
public int Count() => _contexts.Count;
269274
}
270275
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ RabbitMQ.AMQP.Client.IBackOffDelayPolicy.IsActive() -> bool
126126
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Reset() -> void
127127
RabbitMQ.AMQP.Client.IBatchContext
128128
RabbitMQ.AMQP.Client.IBatchContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
129-
RabbitMQ.AMQP.Client.IBatchContext.Size() -> int
129+
RabbitMQ.AMQP.Client.IBatchContext.Count() -> int
130130
RabbitMQ.AMQP.Client.IBindingSpecification
131131
RabbitMQ.AMQP.Client.IBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification!
132132
RabbitMQ.AMQP.Client.IBindingSpecification.Arguments(System.Collections.Generic.Dictionary<string!, object!>! arguments) -> RabbitMQ.AMQP.Client.IBindingSpecification!
@@ -514,6 +514,16 @@ RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initial
514514
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
515515
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
516516
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
517+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext
518+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Accept() -> void
519+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
520+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext!
521+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.BatchDeliveryContext() -> void
522+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Count() -> int
523+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard() -> void
524+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
525+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue() -> void
526+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
517527
RabbitMQ.AMQP.Client.Impl.BindingSpecification
518528
RabbitMQ.AMQP.Client.Impl.BindingSpecification.ArgsToMap() -> Amqp.Types.Map!
519529
RabbitMQ.AMQP.Client.Impl.BindingSpecification.BindingSpecification() -> void

Tests/Consumer/ConsumerDispositionTests.cs

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Collections.Generic;
67
using System.Threading.Tasks;
78
using RabbitMQ.AMQP.Client;
@@ -31,7 +32,7 @@ public async Task BatchAcceptDisposition()
3132
{
3233
Assert.NotNull(batch);
3334
batch.Add(context);
34-
if (batch.Size() != batchSize)
35+
if (batch.Count() != batchSize)
3536
{
3637
return Task.CompletedTask;
3738
}
@@ -70,7 +71,7 @@ public async Task BatchDiscardDisposition()
7071
{
7172
Assert.NotNull(batch);
7273
batch.Add(context);
73-
if (batch.Size() != batchSize)
74+
if (batch.Count() != batchSize)
7475
{
7576
return Task.CompletedTask;
7677
}
@@ -109,7 +110,7 @@ public async Task BatchDiscardAnnotationDisposition()
109110
{
110111
Assert.NotNull(batch);
111112
batch.Add(context);
112-
if (batch.Size() != batchSize)
113+
if (batch.Count() != batchSize)
113114
{
114115
return Task.CompletedTask;
115116
}
@@ -151,7 +152,7 @@ public async Task BatchRequeueDisposition()
151152
{
152153
Assert.NotNull(batch);
153154
batch.Add(context);
154-
if (batch.Size() != batchSize)
155+
if (batch.Count() != batchSize)
155156
{
156157
return Task.CompletedTask;
157158
}
@@ -188,7 +189,7 @@ public async Task BatchRequeueAnnotationsDisposition()
188189
{
189190
Assert.NotNull(batch);
190191
batch.Add(context);
191-
if (batch.Size() != batchSize)
192+
if (batch.Count() != batchSize)
192193
{
193194
return Task.CompletedTask;
194195
}
@@ -198,23 +199,68 @@ public async Task BatchRequeueAnnotationsDisposition()
198199

199200
const string annotationKey1 = "x-opt-annotation1-key";
200201
const string annotationValue1 = "annotation1-value";
201-
202+
Assert.Equal(batchSize, batch.Count());
202203
batch.Requeue(new Dictionary<string, object>()
203204
{
204205
{ annotationKey, annotationValue }, { annotationKey1, annotationValue1 }
205206
});
207+
Assert.Equal(0, batch.Count());
208+
206209
tcs.SetResult(true);
207210

208211
return Task.CompletedTask;
209212
})
210213
.BuildAndStartAsync();
211214

212215
Assert.NotNull(consumer);
213-
await tcs.Task;
216+
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(20));
214217
await consumer.CloseAsync();
215218
await WaitUntilQueueMessageCount(queueSpec, 18);
219+
await queueSpec.DeleteAsync();
220+
}
221+
222+
[Fact]
223+
public async Task MixBatchAcceptAndDiscardDisposition()
224+
{
225+
Assert.NotNull(_connection);
226+
Assert.NotNull(_management);
227+
228+
IQueueSpecification queueSpec = _management.Queue().Name(_queueName);
229+
await queueSpec.DeclareAsync();
230+
const int batchSize = 18;
231+
await PublishAsync(queueSpec, batchSize * 2);
232+
BatchDeliveryContext batch = new();
233+
TaskCompletionSource<bool> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
234+
bool acceptNext = true; // Flag to alternate between accept and discard
235+
IConsumer consumer = await _connection.ConsumerBuilder()
236+
.Queue(queueSpec)
237+
.MessageHandler((context, _) =>
238+
{
239+
Assert.NotNull(batch);
240+
batch.Add(context);
241+
if (batch.Count() == batchSize && acceptNext)
242+
{
243+
Assert.Equal(batchSize, batch.Count());
244+
batch.Accept();
245+
acceptNext = false; // Switch to discard next
246+
}
247+
else if (batch.Count() == batchSize && !acceptNext)
248+
{
249+
Assert.Equal(batchSize, batch.Count());
250+
batch.Discard();
251+
tcs.SetResult(true);
252+
}
253+
return Task.CompletedTask;
254+
})
255+
.BuildAndStartAsync();
256+
257+
Assert.NotNull(consumer);
258+
await tcs.Task;
259+
216260
Assert.Equal(0, consumer.UnsettledMessageCount);
261+
await WaitUntilQueueMessageCount(queueSpec, 0);
217262
await queueSpec.DeleteAsync();
263+
await consumer.CloseAsync();
218264
}
219265
}
220266
}

0 commit comments

Comments
 (0)