diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index 780cd9a..66e24a7 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -11,7 +11,12 @@ jobs: env: NUGET_CERT_REVOCATION_MODE: offline steps: - - uses: actions/checkout@v4 + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/cache@v4 with: # Note: the cache path is relative to the workspace directory @@ -26,6 +31,8 @@ jobs: key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj','Directory.Packages.props') }} restore-keys: | ${{ runner.os }}-v0-nuget- + - name: Dotnet Version + run: dotnet --version - name: Build (Debug) run: dotnet build ${{ github.workspace }}\Build.csproj - name: Verify @@ -54,7 +61,12 @@ jobs: build-ubuntu: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/cache@v4 with: path: | @@ -63,6 +75,8 @@ jobs: key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj','Directory.Packages.props') }} restore-keys: | ${{ runner.os }}-v0-nuget- + - name: Dotnet Version + run: dotnet --version - name: Build (Debug) run: dotnet build ${{ github.workspace }}/Build.csproj - name: Verify diff --git a/RabbitMQ.AMQP.Client/IConsumer.cs b/RabbitMQ.AMQP.Client/IConsumer.cs index df90db8..88290d5 100644 --- a/RabbitMQ.AMQP.Client/IConsumer.cs +++ b/RabbitMQ.AMQP.Client/IConsumer.cs @@ -114,5 +114,28 @@ public interface IContext /// Message annotations to combine with existing ones. /// void Requeue(Dictionary annotations); + + /// + /// Create a batch context to accumulate message contexts and settle them at once. + /// The message context the batch context is created from is not added to the batch + /// context. + /// @return the created batch context + /// + IBatchContext Batch(); + } + + public interface IBatchContext : IContext + { + /// + /// Add a message context to the batch context. + /// @param context the message context to add + /// + void Add(IContext context); + + /// + /// Get the current number of message contexts in the batch context. + /// @return current number of message contexts in the batch + /// + int Count(); } } diff --git a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs index 543b0fc..52efcc4 100644 --- a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs +++ b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs @@ -2,7 +2,9 @@ // and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System; using System.Collections.Generic; +using System.Threading; using Amqp; using Amqp.Types; @@ -145,5 +147,148 @@ public void Requeue(Dictionary annotations) _message.Dispose(); } } + + public IBatchContext Batch() + { + if (_link.IsClosed) + { + throw new ConsumerException("Link is closed"); + } + + return new BatchDeliveryContext(); + } + } + + /// + /// BatchDeliveryContext is a client side helper class that allows + /// accumulating multiple message contexts and settling them at once. + /// It is thread-safe and can be used from multiple threads. + /// + public class BatchDeliveryContext : IBatchContext + { + private readonly List _contexts = new(); + private readonly SemaphoreSlim _semaphore = new(1, 1); + + /// + /// Accept all messages in the batch context (AMQP 1.0 accepted outcome). + /// + public void Accept() + { + _semaphore.Wait(); + try + { + foreach (var context in _contexts) + { + context.Accept(); + } + + _contexts.Clear(); + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Discard all messages in the batch context (AMQP 1.0 rejected outcome). + /// + public void Discard() + { + _semaphore.Wait(); + try + { + foreach (var context in _contexts) + { + context.Discard(); + } + + _contexts.Clear(); + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Discard all messages in the batch context with annotations + /// + public void Discard(Dictionary annotations) + { + _semaphore.Wait(); + try + { + Utils.ValidateMessageAnnotations(annotations); + + foreach (var context in _contexts) + { + context.Discard(annotations); + } + + _contexts.Clear(); + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Requeue all messages in the batch context (AMQP 1.0 released outcome). + /// + public void Requeue() + { + _semaphore.Wait(); + try + { + foreach (var context in _contexts) + { + context.Requeue(); + } + + _contexts.Clear(); + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Requeue all messages in the batch context with annotations + /// + public void Requeue(Dictionary annotations) + { + _semaphore.Wait(); + try + { + foreach (var context in _contexts) + { + context.Requeue(annotations); + } + + _contexts.Clear(); + } + finally + { + _semaphore.Release(); + } + } + + public IBatchContext Batch() => this; + + /// + /// Add a message context to the batch context. + /// + public void Add(IContext context) + { + _contexts.Add(context); + } + + /// + /// Returns the number of message contexts in the batch context. + /// + public int Count() => _contexts.Count; } } diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 79912fa..4850878 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -124,6 +124,9 @@ RabbitMQ.AMQP.Client.IBackOffDelayPolicy.CurrentAttempt.get -> int RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Delay() -> int RabbitMQ.AMQP.Client.IBackOffDelayPolicy.IsActive() -> bool RabbitMQ.AMQP.Client.IBackOffDelayPolicy.Reset() -> void +RabbitMQ.AMQP.Client.IBatchContext +RabbitMQ.AMQP.Client.IBatchContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void +RabbitMQ.AMQP.Client.IBatchContext.Count() -> int RabbitMQ.AMQP.Client.IBindingSpecification RabbitMQ.AMQP.Client.IBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification! RabbitMQ.AMQP.Client.IBindingSpecification.Arguments(System.Collections.Generic.Dictionary! arguments) -> RabbitMQ.AMQP.Client.IBindingSpecification! @@ -202,6 +205,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder! RabbitMQ.AMQP.Client.IContext RabbitMQ.AMQP.Client.IContext.Accept() -> void +RabbitMQ.AMQP.Client.IContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext! RabbitMQ.AMQP.Client.IContext.Discard() -> void RabbitMQ.AMQP.Client.IContext.Discard(System.Collections.Generic.Dictionary! annotations) -> void RabbitMQ.AMQP.Client.IContext.Requeue() -> void @@ -510,6 +514,16 @@ RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initial RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Accept() -> void +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Add(RabbitMQ.AMQP.Client.IContext! context) -> void +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Batch() -> RabbitMQ.AMQP.Client.IBatchContext! +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.BatchDeliveryContext() -> void +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Count() -> int +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard() -> void +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Discard(System.Collections.Generic.Dictionary! annotations) -> void +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue() -> void +RabbitMQ.AMQP.Client.Impl.BatchDeliveryContext.Requeue(System.Collections.Generic.Dictionary! annotations) -> void RabbitMQ.AMQP.Client.Impl.BindingSpecification RabbitMQ.AMQP.Client.Impl.BindingSpecification.ArgsToMap() -> Amqp.Types.Map! RabbitMQ.AMQP.Client.Impl.BindingSpecification.BindingSpecification() -> void diff --git a/Tests/Consumer/ConsumerDispositionTests.cs b/Tests/Consumer/ConsumerDispositionTests.cs new file mode 100644 index 0000000..82592f5 --- /dev/null +++ b/Tests/Consumer/ConsumerDispositionTests.cs @@ -0,0 +1,266 @@ +// This source code is dual-licensed under the Apache License, version 2.0, +// and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests.Consumer +{ + public class ConsumerDispositionTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + [Fact] + public async Task BatchAcceptDisposition() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue().Name(_queueName); + await queueSpec.DeclareAsync(); + const int batchSize = 18; + await PublishAsync(queueSpec, batchSize); + BatchDeliveryContext batch = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, _) => + { + Assert.NotNull(batch); + batch.Add(context); + if (batch.Count() != batchSize) + { + return Task.CompletedTask; + } + + batch.Accept(); + tcs.SetResult(true); + + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + Assert.NotNull(consumer); + await tcs.Task; + + Assert.Equal(0, consumer.UnsettledMessageCount); + await WaitUntilQueueMessageCount(queueSpec, 0); + await queueSpec.DeleteAsync(); + await consumer.CloseAsync(); + } + + [Fact] + public async Task BatchDiscardDisposition() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue().Name(_queueName); + await queueSpec.DeclareAsync(); + const int batchSize = 18; + await PublishAsync(queueSpec, batchSize); + BatchDeliveryContext batch = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, _) => + { + Assert.NotNull(batch); + batch.Add(context); + if (batch.Count() != batchSize) + { + return Task.CompletedTask; + } + + batch.Discard(); + tcs.SetResult(true); + + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + Assert.NotNull(consumer); + await tcs.Task; + + Assert.Equal(0, consumer.UnsettledMessageCount); + await WaitUntilQueueMessageCount(queueSpec, 0); + await queueSpec.DeleteAsync(); + await consumer.CloseAsync(); + } + + [Fact] + public async Task BatchDiscardAnnotationDisposition() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue().Name(_queueName); + await queueSpec.DeclareAsync(); + const int batchSize = 18; + await PublishAsync(queueSpec, batchSize); + BatchDeliveryContext batch = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, _) => + { + Assert.NotNull(batch); + batch.Add(context); + if (batch.Count() != batchSize) + { + return Task.CompletedTask; + } + + batch.Discard(new Dictionary() + { + { "x-opt-annotation-key", "annotation-value" }, + { "x-opt-annotation1-key", "annotation1-value" } + }); + tcs.SetResult(true); + + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + Assert.NotNull(consumer); + await tcs.Task; + + Assert.Equal(0, consumer.UnsettledMessageCount); + await WaitUntilQueueMessageCount(queueSpec, 0); + await queueSpec.DeleteAsync(); + await consumer.CloseAsync(); + } + + [Fact] + public async Task BatchRequeueDisposition() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + IQueueSpecification queueSpec = _management.Queue().Name(_queueName); + await queueSpec.DeclareAsync(); + const int batchSize = 18; + await PublishAsync(queueSpec, batchSize); + BatchDeliveryContext batch = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, _) => + { + Assert.NotNull(batch); + batch.Add(context); + if (batch.Count() != batchSize) + { + return Task.CompletedTask; + } + + batch.Requeue(); + tcs.SetResult(true); + + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + Assert.NotNull(consumer); + await tcs.Task; + await consumer.CloseAsync(); + Assert.Equal(0, consumer.UnsettledMessageCount); + await WaitUntilQueueMessageCount(queueSpec, 18); + await queueSpec.DeleteAsync(); + } + + [Fact] + public async Task BatchRequeueAnnotationsDisposition() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + IQueueSpecification queueSpec = _management.Queue().Name(_queueName); + await queueSpec.DeclareAsync(); + const int batchSize = 18; + await PublishAsync(queueSpec, batchSize); + BatchDeliveryContext batch = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, _) => + { + Assert.NotNull(batch); + batch.Add(context); + if (batch.Count() != batchSize) + { + return Task.CompletedTask; + } + + const string annotationKey = "x-opt-annotation-key"; + const string annotationValue = "annotation-value"; + + const string annotationKey1 = "x-opt-annotation1-key"; + const string annotationValue1 = "annotation1-value"; + Assert.Equal(batchSize, batch.Count()); + batch.Requeue(new Dictionary() + { + { annotationKey, annotationValue }, { annotationKey1, annotationValue1 } + }); + Assert.Equal(0, batch.Count()); + + tcs.SetResult(true); + + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + Assert.NotNull(consumer); + await tcs.Task.WaitAsync(TimeSpan.FromSeconds(20)); + await consumer.CloseAsync(); + await WaitUntilQueueMessageCount(queueSpec, 18); + await queueSpec.DeleteAsync(); + } + + [Fact] + public async Task MixBatchAcceptAndDiscardDisposition() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue().Name(_queueName); + await queueSpec.DeclareAsync(); + const int batchSize = 18; + await PublishAsync(queueSpec, batchSize * 2); + BatchDeliveryContext batch = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + bool acceptNext = true; // Flag to alternate between accept and discard + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(queueSpec) + .MessageHandler((context, _) => + { + Assert.NotNull(batch); + batch.Add(context); + if (batch.Count() == batchSize && acceptNext) + { + Assert.Equal(batchSize, batch.Count()); + batch.Accept(); + acceptNext = false; // Switch to discard next + } + else if (batch.Count() == batchSize && !acceptNext) + { + Assert.Equal(batchSize, batch.Count()); + batch.Discard(); + tcs.SetResult(true); + } + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + Assert.NotNull(consumer); + await tcs.Task; + + Assert.Equal(0, consumer.UnsettledMessageCount); + await WaitUntilQueueMessageCount(queueSpec, 0); + await queueSpec.DeleteAsync(); + await consumer.CloseAsync(); + } + } +} diff --git a/global.json b/global.json new file mode 100644 index 0000000..4b2bd02 --- /dev/null +++ b/global.json @@ -0,0 +1,7 @@ +{ + "sdk": { + "version": "8.0.0", + "allowPrerelease": false, + "rollForward": "latestFeature" + } +} \ No newline at end of file