Skip to content

Implement disposition frame with ranges #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/wf_build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ jobs:
env:
NUGET_CERT_REVOCATION_MODE: offline
steps:
- uses: actions/checkout@v4
- name: Clone repository
uses: actions/checkout@v4
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
global-json-file: global.json
- uses: actions/cache@v4
with:
# Note: the cache path is relative to the workspace directory
Expand All @@ -26,6 +31,8 @@ jobs:
key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj','Directory.Packages.props') }}
restore-keys: |
${{ runner.os }}-v0-nuget-
- name: Dotnet Version
run: dotnet --version
- name: Build (Debug)
run: dotnet build ${{ github.workspace }}\Build.csproj
- name: Verify
Expand Down Expand Up @@ -54,7 +61,12 @@ jobs:
build-ubuntu:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Clone repository
uses: actions/checkout@v4
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
global-json-file: global.json
- uses: actions/cache@v4
with:
path: |
Expand All @@ -63,6 +75,8 @@ jobs:
key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj','Directory.Packages.props') }}
restore-keys: |
${{ runner.os }}-v0-nuget-
- name: Dotnet Version
run: dotnet --version
- name: Build (Debug)
run: dotnet build ${{ github.workspace }}/Build.csproj
- name: Verify
Expand Down
23 changes: 23 additions & 0 deletions RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,28 @@ public interface IContext
/// <param name="annotations">Message annotations to combine with existing ones.</param>
///</summary>
void Requeue(Dictionary<string, object> annotations);

/// <summary>
/// Create a batch context to accumulate message contexts and settle them at once.
/// The message context the batch context is created from is <b>not</b> added to the batch
/// context.
/// @return the created batch context
/// </summary>
IBatchContext Batch();
}

public interface IBatchContext : IContext
{
/// <summary>
/// Add a message context to the batch context.
/// @param context the message context to add
/// </summary>
void Add(IContext context);

/// <summary>
/// Get the current number of message contexts in the batch context.
/// @return current number of message contexts in the batch
/// </summary>
int Count();
}
}
145 changes: 145 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Generic;
using System.Threading;
using Amqp;
using Amqp.Types;

Expand Down Expand Up @@ -145,5 +147,148 @@ public void Requeue(Dictionary<string, object> annotations)
_message.Dispose();
}
}

public IBatchContext Batch()
{
if (_link.IsClosed)
{
throw new ConsumerException("Link is closed");
}

return new BatchDeliveryContext();
}
}

///<summary>
/// BatchDeliveryContext is a client side helper class that allows
/// accumulating multiple message contexts and settling them at once.
/// It is thread-safe and can be used from multiple threads.
/// </summary>
public class BatchDeliveryContext : IBatchContext
{
private readonly List<IContext> _contexts = new();
private readonly SemaphoreSlim _semaphore = new(1, 1);

///<summary>
/// Accept all messages in the batch context (AMQP 1.0 <c>accepted</c> outcome).
/// </summary>
public void Accept()
{
_semaphore.Wait();
try
{
foreach (var context in _contexts)
{
context.Accept();
}

_contexts.Clear();
}
finally
{
_semaphore.Release();
}
}

///<summary>
/// Discard all messages in the batch context (AMQP 1.0 <c>rejected</c> outcome).
/// </summary>
public void Discard()
{
_semaphore.Wait();
try
{
foreach (var context in _contexts)
{
context.Discard();
}

_contexts.Clear();
}
finally
{
_semaphore.Release();
}
}

///<summary>
/// Discard all messages in the batch context with annotations
/// </summary>
public void Discard(Dictionary<string, object> annotations)
{
_semaphore.Wait();
try
{
Utils.ValidateMessageAnnotations(annotations);

foreach (var context in _contexts)
{
context.Discard(annotations);
}

_contexts.Clear();
}
finally
{
_semaphore.Release();
}
}

///<summary>
/// Requeue all messages in the batch context (AMQP 1.0 <c>released</c> outcome).
/// </summary>
public void Requeue()
{
_semaphore.Wait();
try
{
foreach (var context in _contexts)
{
context.Requeue();
}

_contexts.Clear();
}
finally
{
_semaphore.Release();
}
}

///<summary>
/// Requeue all messages in the batch context with annotations
/// </summary>
public void Requeue(Dictionary<string, object> annotations)
{
_semaphore.Wait();
try
{
foreach (var context in _contexts)
{
context.Requeue(annotations);
}

_contexts.Clear();
}
finally
{
_semaphore.Release();
}
}

public IBatchContext Batch() => this;

///<summary>
/// Add a message context to the batch context.
/// </summary>
public void Add(IContext context)
{
_contexts.Add(context);
}

///<summary>
/// Returns the number of message contexts in the batch context.
/// </summary>
public int Count() => _contexts.Count;
}
}
14 changes: 14 additions & 0 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ RabbitMQ.AMQP.Client.IBackOffDelayPolicy.CurrentAttempt.get -> int
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Delay() -> int
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.IsActive() -> bool
RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Reset() -> void
RabbitMQ.AMQP.Client.IBatchContext
RabbitMQ.AMQP.Client.IBatchContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
RabbitMQ.AMQP.Client.IBatchContext.Count() -> int
RabbitMQ.AMQP.Client.IBindingSpecification
RabbitMQ.AMQP.Client.IBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification!
RabbitMQ.AMQP.Client.IBindingSpecification.Arguments(System.Collections.Generic.Dictionary<string!, object!>! arguments) -> RabbitMQ.AMQP.Client.IBindingSpecification!
Expand Down Expand Up @@ -202,6 +205,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer
RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IContext
RabbitMQ.AMQP.Client.IContext.Accept() -> void
RabbitMQ.AMQP.Client.IContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext!
RabbitMQ.AMQP.Client.IContext.Discard() -> void
RabbitMQ.AMQP.Client.IContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
RabbitMQ.AMQP.Client.IContext.Requeue() -> void
Expand Down Expand Up @@ -510,6 +514,16 @@ RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initial
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Accept() -> void
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext!
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.BatchDeliveryContext() -> void
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Count() -> int
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard() -> void
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue() -> void
RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> void
RabbitMQ.AMQP.Client.Impl.BindingSpecification
RabbitMQ.AMQP.Client.Impl.BindingSpecification.ArgsToMap() -> Amqp.Types.Map!
RabbitMQ.AMQP.Client.Impl.BindingSpecification.BindingSpecification() -> void
Expand Down
Loading