Skip to content

Commit 9d040f9

Browse files
committed
Implement batch disposition
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent a6e068c commit 9d040f9

File tree

4 files changed

+317
-24
lines changed

4 files changed

+317
-24
lines changed

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,17 +115,15 @@ public interface IContext
115115
///</summary>
116116
void Requeue(Dictionary<string, object> annotations);
117117

118-
119118
/// <summary>
120119
/// Create a batch context to accumulate message contexts and settle them at once.
121120
/// The message context the batch context is created from is <b>not</b> added to the batch
122121
/// context.
123122
/// @return the created batch context
124123
/// </summary>
125-
IBatchContext Batch(int batchSizeHint);
124+
IBatchContext Batch();
126125
}
127126

128-
129127
public interface IBatchContext : IContext
130128
{
131129
/// <summary>

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Collections.Generic;
7+
using System.Threading;
68
using Amqp;
79
using Amqp.Types;
810

@@ -146,40 +148,123 @@ public void Requeue(Dictionary<string, object> annotations)
146148
}
147149
}
148150

149-
public IBatchContext Batch(int batchSizeHint) => throw new System.NotImplementedException();
151+
public IBatchContext Batch()
152+
{
153+
if (_link.IsClosed)
154+
{
155+
throw new ConsumerException("Link is closed");
156+
}
157+
158+
return new BatchDeliveryContext();
159+
}
150160
}
151161

152-
public class BatchContext : IBatchContext
162+
internal class BatchDeliveryContext : IBatchContext
153163
{
164+
private readonly List<IContext> _contexts = new();
165+
private readonly SemaphoreSlim _semaphore = new(1, 1);
166+
154167
public void Accept()
155168
{
156-
throw new System.NotImplementedException();
169+
_semaphore.Wait();
170+
try
171+
{
172+
foreach (var context in _contexts)
173+
{
174+
context.Accept();
175+
}
176+
177+
_contexts.Clear();
178+
}
179+
finally
180+
{
181+
_semaphore.Release();
182+
}
157183
}
158184

159185
public void Discard()
160186
{
161-
throw new System.NotImplementedException();
187+
_semaphore.Wait();
188+
try
189+
{
190+
foreach (var context in _contexts)
191+
{
192+
context.Discard();
193+
}
194+
195+
_contexts.Clear();
196+
}
197+
finally
198+
{
199+
_semaphore.Release();
200+
}
162201
}
163202

164203
public void Discard(Dictionary<string, object> annotations)
165204
{
166-
throw new System.NotImplementedException();
205+
_semaphore.Wait();
206+
try
207+
{
208+
Utils.ValidateMessageAnnotations(annotations);
209+
210+
foreach (var context in _contexts)
211+
{
212+
context.Discard(annotations);
213+
}
214+
215+
_contexts.Clear();
216+
}
217+
finally
218+
{
219+
_semaphore.Release();
220+
}
167221
}
168222

169223
public void Requeue()
170224
{
171-
throw new System.NotImplementedException();
225+
_semaphore.Wait();
226+
try
227+
{
228+
foreach (var context in _contexts)
229+
{
230+
context.Requeue();
231+
}
232+
233+
_contexts.Clear();
234+
}
235+
finally
236+
{
237+
_semaphore.Release();
238+
}
172239
}
173240

174241
public void Requeue(Dictionary<string, object> annotations)
175242
{
176-
throw new System.NotImplementedException();
243+
_semaphore.Wait();
244+
try
245+
{
246+
Utils.ValidateMessageAnnotations(annotations);
247+
248+
foreach (var context in _contexts)
249+
{
250+
context.Requeue(annotations);
251+
}
252+
253+
_contexts.Clear();
254+
}
255+
finally
256+
{
257+
_semaphore.Release();
258+
}
177259
}
178260

179-
public IBatchContext Batch(int batchSizeHint) => throw new System.NotImplementedException();
261+
public IBatchContext Batch() => this;
180262

181-
public void Add(IContext context) => throw new System.NotImplementedException();
263+
public void Add(IContext context)
264+
{
265+
_contexts.Add(context);
266+
}
182267

183-
public int Size() => throw new System.NotImplementedException();
268+
public int Size() => _contexts.Count;
184269
}
185270
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer
205205
RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
206206
RabbitMQ.AMQP.Client.IContext
207207
RabbitMQ.AMQP.Client.IContext.Accept() -> void
208-
RabbitMQ.AMQP.Client.IContext.Batch(int batchSizeHint) -> RabbitMQ.AMQP.Client.IBatchContext!
208+
RabbitMQ.AMQP.Client.IContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext!
209209
RabbitMQ.AMQP.Client.IContext.Discard() -> void
210210
RabbitMQ.AMQP.Client.IContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
211211
RabbitMQ.AMQP.Client.IContext.Requeue() -> void
@@ -511,16 +511,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initial
511511
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
512512
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
513513
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
514-
RabbitMQ.AMQP.Client.Impl.BatchContext
515-
RabbitMQ.AMQP.Client.Impl.BatchContext.Accept() -> void
516-
RabbitMQ.AMQP.Client.Impl.BatchContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
517-
RabbitMQ.AMQP.Client.Impl.BatchContext.Batch(int batchSizeHint) -> RabbitMQ.AMQP.Client.IBatchContext!
518-
RabbitMQ.AMQP.Client.Impl.BatchContext.BatchContext() -> void
519-
RabbitMQ.AMQP.Client.Impl.BatchContext.Discard() -> void
520-
RabbitMQ.AMQP.Client.Impl.BatchContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
521-
RabbitMQ.AMQP.Client.Impl.BatchContext.Requeue() -> void
522-
RabbitMQ.AMQP.Client.Impl.BatchContext.Requeue(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
523-
RabbitMQ.AMQP.Client.Impl.BatchContext.Size() -> int
524514
RabbitMQ.AMQP.Client.Impl.BindingSpecification
525515
RabbitMQ.AMQP.Client.Impl.BindingSpecification.ArgsToMap() -> Amqp.Types.Map!
526516
RabbitMQ.AMQP.Client.Impl.BindingSpecification.BindingSpecification() -> void

0 commit comments

Comments
 (0)