Skip to content

Commit c3262ca

Browse files
authored
Implement disposition frame with ranges (#122)
* Implement batch disposition * Pin dotnet version on the ci --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 643d998 commit c3262ca

File tree

6 files changed

+471
-2
lines changed

6 files changed

+471
-2
lines changed

.github/workflows/wf_build-and-test.yaml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ jobs:
1111
env:
1212
NUGET_CERT_REVOCATION_MODE: offline
1313
steps:
14-
- uses: actions/checkout@v4
14+
- name: Clone repository
15+
uses: actions/checkout@v4
16+
- name: Setup .NET SDK
17+
uses: actions/setup-dotnet@v4
18+
with:
19+
global-json-file: global.json
1520
- uses: actions/cache@v4
1621
with:
1722
# Note: the cache path is relative to the workspace directory
@@ -26,6 +31,8 @@ jobs:
2631
key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj','Directory.Packages.props') }}
2732
restore-keys: |
2833
${{ runner.os }}-v0-nuget-
34+
- name: Dotnet Version
35+
run: dotnet --version
2936
- name: Build (Debug)
3037
run: dotnet build ${{ github.workspace }}\Build.csproj
3138
- name: Verify
@@ -54,7 +61,12 @@ jobs:
5461
build-ubuntu:
5562
runs-on: ubuntu-latest
5663
steps:
57-
- uses: actions/checkout@v4
64+
- name: Clone repository
65+
uses: actions/checkout@v4
66+
- name: Setup .NET SDK
67+
uses: actions/setup-dotnet@v4
68+
with:
69+
global-json-file: global.json
5870
- uses: actions/cache@v4
5971
with:
6072
path: |
@@ -63,6 +75,8 @@ jobs:
6375
key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj','Directory.Packages.props') }}
6476
restore-keys: |
6577
${{ runner.os }}-v0-nuget-
78+
- name: Dotnet Version
79+
run: dotnet --version
6680
- name: Build (Debug)
6781
run: dotnet build ${{ github.workspace }}/Build.csproj
6882
- name: Verify

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,5 +114,28 @@ public interface IContext
114114
/// <param name="annotations">Message annotations to combine with existing ones.</param>
115115
///</summary>
116116
void Requeue(Dictionary<string, object> annotations);
117+
118+
/// <summary>
119+
/// Create a batch context to accumulate message contexts and settle them at once.
120+
/// The message context the batch context is created from is <b>not</b> added to the batch
121+
/// context.
122+
/// @return the created batch context
123+
/// </summary>
124+
IBatchContext Batch();
125+
}
126+
127+
public interface IBatchContext : IContext
128+
{
129+
/// <summary>
130+
/// Add a message context to the batch context.
131+
/// @param context the message context to add
132+
/// </summary>
133+
void Add(IContext context);
134+
135+
/// <summary>
136+
/// Get the current number of message contexts in the batch context.
137+
/// @return current number of message contexts in the batch
138+
/// </summary>
139+
int Count();
117140
}
118141
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Collections.Generic;
7+
using System.Threading;
68
using Amqp;
79
using Amqp.Types;
810

@@ -145,5 +147,148 @@ public void Requeue(Dictionary<string, object> annotations)
145147
_message.Dispose();
146148
}
147149
}
150+
151+
public IBatchContext Batch()
152+
{
153+
if (_link.IsClosed)
154+
{
155+
throw new ConsumerException("Link is closed");
156+
}
157+
158+
return new BatchDeliveryContext();
159+
}
160+
}
161+
162+
///<summary>
163+
/// BatchDeliveryContext is a client side helper class that allows
164+
/// accumulating multiple message contexts and settling them at once.
165+
/// It is thread-safe and can be used from multiple threads.
166+
/// </summary>
167+
public class BatchDeliveryContext : IBatchContext
168+
{
169+
private readonly List<IContext> _contexts = new();
170+
private readonly SemaphoreSlim _semaphore = new(1, 1);
171+
172+
///<summary>
173+
/// Accept all messages in the batch context (AMQP 1.0 <c>accepted</c> outcome).
174+
/// </summary>
175+
public void Accept()
176+
{
177+
_semaphore.Wait();
178+
try
179+
{
180+
foreach (var context in _contexts)
181+
{
182+
context.Accept();
183+
}
184+
185+
_contexts.Clear();
186+
}
187+
finally
188+
{
189+
_semaphore.Release();
190+
}
191+
}
192+
193+
///<summary>
194+
/// Discard all messages in the batch context (AMQP 1.0 <c>rejected</c> outcome).
195+
/// </summary>
196+
public void Discard()
197+
{
198+
_semaphore.Wait();
199+
try
200+
{
201+
foreach (var context in _contexts)
202+
{
203+
context.Discard();
204+
}
205+
206+
_contexts.Clear();
207+
}
208+
finally
209+
{
210+
_semaphore.Release();
211+
}
212+
}
213+
214+
///<summary>
215+
/// Discard all messages in the batch context with annotations
216+
/// </summary>
217+
public void Discard(Dictionary<string, object> annotations)
218+
{
219+
_semaphore.Wait();
220+
try
221+
{
222+
Utils.ValidateMessageAnnotations(annotations);
223+
224+
foreach (var context in _contexts)
225+
{
226+
context.Discard(annotations);
227+
}
228+
229+
_contexts.Clear();
230+
}
231+
finally
232+
{
233+
_semaphore.Release();
234+
}
235+
}
236+
237+
///<summary>
238+
/// Requeue all messages in the batch context (AMQP 1.0 <c>released</c> outcome).
239+
/// </summary>
240+
public void Requeue()
241+
{
242+
_semaphore.Wait();
243+
try
244+
{
245+
foreach (var context in _contexts)
246+
{
247+
context.Requeue();
248+
}
249+
250+
_contexts.Clear();
251+
}
252+
finally
253+
{
254+
_semaphore.Release();
255+
}
256+
}
257+
258+
///<summary>
259+
/// Requeue all messages in the batch context with annotations
260+
/// </summary>
261+
public void Requeue(Dictionary<string, object> annotations)
262+
{
263+
_semaphore.Wait();
264+
try
265+
{
266+
foreach (var context in _contexts)
267+
{
268+
context.Requeue(annotations);
269+
}
270+
271+
_contexts.Clear();
272+
}
273+
finally
274+
{
275+
_semaphore.Release();
276+
}
277+
}
278+
279+
public IBatchContext Batch() => this;
280+
281+
///<summary>
282+
/// Add a message context to the batch context.
283+
/// </summary>
284+
public void Add(IContext context)
285+
{
286+
_contexts.Add(context);
287+
}
288+
289+
///<summary>
290+
/// Returns the number of message contexts in the batch context.
291+
/// </summary>
292+
public int Count() => _contexts.Count;
148293
}
149294
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ RabbitMQ.AMQP.Client.IBackOffDelayPolicy.CurrentAttempt.get -> int
124124
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Delay() -> int
125125
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.IsActive() -> bool
126126
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Reset() -> void
127+
RabbitMQ.AMQP.Client.IBatchContext
128+
RabbitMQ.AMQP.Client.IBatchContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
129+
RabbitMQ.AMQP.Client.IBatchContext.Count() -> int
127130
RabbitMQ.AMQP.Client.IBindingSpecification
128131
RabbitMQ.AMQP.Client.IBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification!
129132
RabbitMQ.AMQP.Client.IBindingSpecification.Arguments(System.Collections.Generic.Dictionary<string!, object!>! arguments) -> RabbitMQ.AMQP.Client.IBindingSpecification!
@@ -202,6 +205,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer
202205
RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
203206
RabbitMQ.AMQP.Client.IContext
204207
RabbitMQ.AMQP.Client.IContext.Accept() -> void
208+
RabbitMQ.AMQP.Client.IContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext!
205209
RabbitMQ.AMQP.Client.IContext.Discard() -> void
206210
RabbitMQ.AMQP.Client.IContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
207211
RabbitMQ.AMQP.Client.IContext.Requeue() -> void
@@ -510,6 +514,16 @@ RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initial
510514
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
511515
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
512516
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
517+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext
518+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Accept() -> void
519+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
520+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext!
521+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.BatchDeliveryContext() -> void
522+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Count() -> int
523+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard() -> void
524+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
525+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue() -> void
526+
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
513527
RabbitMQ.AMQP.Client.Impl.BindingSpecification
514528
RabbitMQ.AMQP.Client.Impl.BindingSpecification.ArgsToMap() -> Amqp.Types.Map!
515529
RabbitMQ.AMQP.Client.Impl.BindingSpecification.BindingSpecification() -> void

0 commit comments

Comments
 (0)