From 7e8a78e564ade9f10ac2b2bf6091a3dd969f49a0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 25 Sep 2024 09:49:12 +0200 Subject: [PATCH 1/6] Annotations Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IConsumer.cs | 4 ++ RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs | 49 ++++++++++++++++++++ RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 2 + RabbitMQ.AMQP.Client/Utils.cs | 12 +++++ 4 files changed, 67 insertions(+) diff --git a/RabbitMQ.AMQP.Client/IConsumer.cs b/RabbitMQ.AMQP.Client/IConsumer.cs index a3c0b3a3..aac8507f 100644 --- a/RabbitMQ.AMQP.Client/IConsumer.cs +++ b/RabbitMQ.AMQP.Client/IConsumer.cs @@ -3,6 +3,7 @@ // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace RabbitMQ.AMQP.Client @@ -27,6 +28,9 @@ public interface IContext { Task AcceptAsync(); Task DiscardAsync(); + Task DiscardAsync(Dictionary annotations); + Task RequeueAsync(); + Task RequeueAsync(Dictionary annotations); } } diff --git a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs index 427554a0..a724d2e9 100644 --- a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs +++ b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs @@ -2,8 +2,10 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System.Collections.Generic; using System.Threading.Tasks; using Amqp; +using Amqp.Types; namespace RabbitMQ.AMQP.Client.Impl { @@ -54,6 +56,29 @@ public Task DiscardAsync() return rejectTask; } + public Task DiscardAsync(Dictionary annotations) + { + + if (_link.IsClosed) + { + throw new ConsumerException("Link is closed"); + } + + Task rejectTask = Task.Run(() => + { + Fields messageAnnotations = new(); + foreach (var kvp in annotations) + { + messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value); + } + _link.Modify(_message, true, true, messageAnnotations); + _unsettledMessageCounter.Decrement(); + _message.Dispose(); + }); + + return rejectTask; + + } public Task RequeueAsync() { if (_link.IsClosed) @@ -70,5 +95,29 @@ public Task RequeueAsync() return requeueTask; } + + public Task RequeueAsync(Dictionary annotations) + { + + if (_link.IsClosed) + { + throw new ConsumerException("Link is closed"); + } + + Task requeueTask = Task.Run(() => + { + Fields messageAnnotations = new(); + foreach (var kvp in annotations) + { + messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value); + } + _link.Modify(_message, false, false, messageAnnotations); + _unsettledMessageCounter.Decrement(); + _message.Dispose(); + }); + + return requeueTask; + + } } } diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index a361a8d3..519edd8c 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -125,7 +125,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer RabbitMQ.AMQP.Client.IContext RabbitMQ.AMQP.Client.IContext.AcceptAsync() -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IContext.DiscardAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IContext.DiscardAsync(System.Collections.Generic.Dictionary! annotations) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IContext.RequeueAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IContext.RequeueAsync(System.Collections.Generic.Dictionary! annotations) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IEntityInfo RabbitMQ.AMQP.Client.IEntityInfoSpecification RabbitMQ.AMQP.Client.IEntityInfoSpecification.DeclareAsync() -> System.Threading.Tasks.Task! diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs index f0d39a22..5c47290b 100644 --- a/RabbitMQ.AMQP.Client/Utils.cs +++ b/RabbitMQ.AMQP.Client/Utils.cs @@ -3,6 +3,8 @@ // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Collections.Generic; +using System.Linq; using System.Security.Cryptography; using System.Text; using System.Web; @@ -177,8 +179,18 @@ internal static bool CompareMap(Map map1, Map map2) return false; } } + return true; } + + internal static void CheckMessageAnnotations(Dictionary annotations) + { + foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-"))) + { + throw new ArgumentException( + $"Message annotation keys must start with 'x-': {kvp.Key}"); + } + } } // TODO why can't we use normal HTTP encoding? From 726e28322482a54fcfa9e57a5b96facbde721742 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 26 Sep 2024 09:19:54 +0200 Subject: [PATCH 2/6] add tests Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs | 2 +- RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs | 11 +-- RabbitMQ.AMQP.Client/Utils.cs | 2 +- Tests/Consumer/ConsumerOutcomeTests.cs | 86 ++++++++++++++++++++ 4 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 Tests/Consumer/ConsumerOutcomeTests.cs diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index c5e5d394..020d4646 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -130,7 +130,7 @@ public IMessage Annotation(string key, object value) public object Annotation(string key) { ThrowIfAnnotationsNotSet(); - return NativeMessage.MessageAnnotations[key]; + return NativeMessage.MessageAnnotations[new Symbol(key)]; } } } diff --git a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs index a724d2e9..e8f694d5 100644 --- a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs +++ b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs @@ -58,12 +58,13 @@ public Task DiscardAsync() public Task DiscardAsync(Dictionary annotations) { - if (_link.IsClosed) { throw new ConsumerException("Link is closed"); } + Utils.ValidateMessageAnnotations(annotations); + Task rejectTask = Task.Run(() => { Fields messageAnnotations = new(); @@ -71,14 +72,15 @@ public Task DiscardAsync(Dictionary annotations) { messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value); } + _link.Modify(_message, true, true, messageAnnotations); _unsettledMessageCounter.Decrement(); _message.Dispose(); }); return rejectTask; - } + public Task RequeueAsync() { if (_link.IsClosed) @@ -98,12 +100,11 @@ public Task RequeueAsync() public Task RequeueAsync(Dictionary annotations) { - if (_link.IsClosed) { throw new ConsumerException("Link is closed"); } - + Utils.ValidateMessageAnnotations(annotations); Task requeueTask = Task.Run(() => { Fields messageAnnotations = new(); @@ -111,13 +112,13 @@ public Task RequeueAsync(Dictionary annotations) { messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value); } + _link.Modify(_message, false, false, messageAnnotations); _unsettledMessageCounter.Decrement(); _message.Dispose(); }); return requeueTask; - } } } diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs index 5c47290b..1f5444fe 100644 --- a/RabbitMQ.AMQP.Client/Utils.cs +++ b/RabbitMQ.AMQP.Client/Utils.cs @@ -183,7 +183,7 @@ internal static bool CompareMap(Map map1, Map map2) return true; } - internal static void CheckMessageAnnotations(Dictionary annotations) + internal static void ValidateMessageAnnotations(Dictionary annotations) { foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-"))) { diff --git a/Tests/Consumer/ConsumerOutcomeTests.cs b/Tests/Consumer/ConsumerOutcomeTests.cs new file mode 100644 index 00000000..7ce1137b --- /dev/null +++ b/Tests/Consumer/ConsumerOutcomeTests.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests.Consumer; + +public class ConsumerOutcomeTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) +{ + [Fact] + public void ValidateAnnotations() + { + const string wrongAnnotationKey = "missing-the-start-x-annotation-key"; + const string annotationValue = "annotation-value"; + // This should throw an exception because the annotation key does not start with "x-" + Assert.Throws(() => + Utils.ValidateMessageAnnotations(new Dictionary + { + { wrongAnnotationKey, annotationValue } + })); + + const string correctAnnotationKey = "x-otp-annotation-key"; + // This should not throw an exception because the annotation key starts with "x-" + Utils.ValidateMessageAnnotations(new Dictionary + { + { correctAnnotationKey, annotationValue } + }); + } + + [Fact] + public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured() + { + string dlqQueueName = $"dlq_{_queueName}"; + await DeclareDeadLetterTopology(_queueName, dlqQueueName); + + + Assert.NotNull(_connection); + Assert.NotNull(_management); + + const string annotationKey = "x-opt-annotation-key"; + const string annotationValue = "annotation-value"; + TaskCompletionSource tcs = + new(TaskCreationOptions.RunContinuationsAsynchronously); + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler( + async (context, _) => + { + await context.DiscardAsync(new Dictionary { { annotationKey, annotationValue } }); + tcs.SetResult(true); + } + ).Queue(_queueName).BuildAndStartAsync(); + + IMessage message = new AmqpMessage($"message"); + PublishResult pr = await publisher.PublishAsync(message); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await consumer.CloseAsync(); + TaskCompletionSource tcsDl = + new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer dlConsumer = await _connection.ConsumerBuilder().MessageHandler(async (context, message1) => + { + await context.AcceptAsync(); + tcsDl.SetResult(message1); + }).Queue(dlqQueueName).BuildAndStartAsync(); + + IMessage mResult = await tcsDl.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.NotNull(mResult); + Assert.Equal(mResult.Annotation(annotationKey), annotationValue); + await dlConsumer.CloseAsync(); + } + + + private async Task DeclareDeadLetterTopology(string queueName, string dlxQueueName) + { + string dlx = $"{queueName}.dlx"; + Assert.NotNull(_management); + await _management.Queue().Name(queueName).Type(QueueType.QUORUM).DeadLetterExchange(dlx).DeclareAsync(); + await _management.Exchange(dlx).Type(ExchangeType.FANOUT).AutoDelete(true).DeclareAsync(); + await _management.Queue(dlxQueueName).Exclusive(true).DeclareAsync(); + await _management.Binding().SourceExchange(dlx).DestinationQueue(dlxQueueName).BindAsync(); + } +} From 06262eaba2de333f067abf75e8f5058edcf6fbf2 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 26 Sep 2024 09:43:00 +0200 Subject: [PATCH 3/6] add tests Signed-off-by: Gabriele Santomaggio --- .github/workflows/wf_build-and-test.yaml | 2 +- Tests/Consumer/ConsumerOutcomeTests.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index 539f9c29..f73acd80 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -64,7 +64,7 @@ jobs: id: start-rabbitmq run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh - name: Test - timeout-minutes: 15 + timeout-minutes: 20 run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" - name: Check for errors in RabbitMQ logs run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check diff --git a/Tests/Consumer/ConsumerOutcomeTests.cs b/Tests/Consumer/ConsumerOutcomeTests.cs index 7ce1137b..2aa96b62 100644 --- a/Tests/Consumer/ConsumerOutcomeTests.cs +++ b/Tests/Consumer/ConsumerOutcomeTests.cs @@ -21,7 +21,7 @@ public void ValidateAnnotations() { { wrongAnnotationKey, annotationValue } })); - + const string correctAnnotationKey = "x-otp-annotation-key"; // This should not throw an exception because the annotation key starts with "x-" Utils.ValidateMessageAnnotations(new Dictionary From 10246a5c098a89112e40ebe48e93f873d912aeaf Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 26 Sep 2024 15:53:40 +0200 Subject: [PATCH 4/6] Update RabbitMQ version from our dockerhub repo to the official repo Signed-off-by: Gabriele Santomaggio --- .ci/publish-documentation-to-github-pages.sh | 49 --------------- .ci/ubuntu/cluster/gha-setup.sh | 4 +- .ci/ubuntu/one-node/gha-setup.sh | 7 +-- .ci/windows/versions.json | 2 +- .github/workflows/wf_build-and-test.yaml | 4 +- Makefile | 31 +-------- RabbitMQ.AMQP.Client/IConsumer.cs | 52 +++++++++++++++ RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs | 1 - Tests/Consumer/ConsumerOutcomeTests.cs | 66 ++++++++++++++++++-- 9 files changed, 121 insertions(+), 95 deletions(-) delete mode 100755 .ci/publish-documentation-to-github-pages.sh diff --git a/.ci/publish-documentation-to-github-pages.sh b/.ci/publish-documentation-to-github-pages.sh deleted file mode 100755 index 76a54c28..00000000 --- a/.ci/publish-documentation-to-github-pages.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash - -. $(pwd)/release-versions.txt - -MESSAGE=$(git log -1 --pretty=%B) -./mvnw buildnumber:create pre-site --no-transfer-progress - -./mvnw javadoc:javadoc -Dmaven.javadoc.skip=false --no-transfer-progress - -if [ -e target/site/apidocs/element-list ] - then cp target/site/apidocs/element-list target/site/apidocs/package-list -fi - -RELEASE_VERSION=$(cat pom.xml | grep -oPm1 "(?<=)[^<]+") - -# GHA does shallow clones, so need the next 2 commands to have the gh-pages branch -git remote set-branches origin 'gh-pages' -git fetch -v - -git checkout gh-pages -mkdir -p $RELEASE_VERSION/htmlsingle -cp target/generated-docs/index.html $RELEASE_VERSION/htmlsingle -mkdir -p $RELEASE_VERSION/api -cp -r target/site/apidocs/* $RELEASE_VERSION/api/ -git add $RELEASE_VERSION/ - -if [[ $LATEST == "true" ]] - then - if [[ $RELEASE_VERSION == *[RCM]* ]] - then - DOC_DIR="milestone" - elif [[ $RELEASE_VERSION == *SNAPSHOT* ]] - then - DOC_DIR="snapshot" - else - DOC_DIR="stable" - fi - - mkdir -p $DOC_DIR/htmlsingle - cp target/generated-docs/index.html $DOC_DIR/htmlsingle - mkdir -p $DOC_DIR/api - cp -r target/site/apidocs/* $DOC_DIR/api/ - git add $DOC_DIR/ - -fi - -git commit -m "$MESSAGE" -git push origin gh-pages -git checkout main diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh index 3c37d04f..d2d8cc1a 100755 --- a/.ci/ubuntu/cluster/gha-setup.sh +++ b/.ci/ubuntu/cluster/gha-setup.sh @@ -21,9 +21,9 @@ function run_docker_compose if [[ $2 == 'arm' ]] then - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" + readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq1:4-management}" else - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" + readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq1:4-management}" fi if [[ ! -v GITHUB_ACTIONS ]] diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh index bbcbcaa2..c170a28b 100755 --- a/.ci/ubuntu/one-node/gha-setup.sh +++ b/.ci/ubuntu/one-node/gha-setup.sh @@ -8,12 +8,7 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -if [[ $3 == 'arm' ]] -then - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" -else - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" -fi +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}" readonly docker_name_prefix='rabbitmq-amqp-dotnet-client' diff --git a/.ci/windows/versions.json b/.ci/windows/versions.json index 0b6ecdbe..91caa981 100644 --- a/.ci/windows/versions.json +++ b/.ci/windows/versions.json @@ -1,4 +1,4 @@ { "erlang": "27.0.1", - "rabbitmq": "4.0.0-beta.6" + "rabbitmq": "4.0.2" } diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index f73acd80..dd7ed263 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -34,7 +34,7 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Test - timeout-minutes: 20 + timeout-minutes: 25 run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' - name: Check for errors in RabbitMQ logs run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 @@ -64,7 +64,7 @@ jobs: id: start-rabbitmq run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh - name: Test - timeout-minutes: 20 + timeout-minutes: 25 run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" - name: Check for errors in RabbitMQ logs run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check diff --git a/Makefile b/Makefile index fa1adbb3..03c88bf9 100644 --- a/Makefile +++ b/Makefile @@ -9,35 +9,8 @@ build: test: build dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" -rabbitmq-server-start-arm: - ./.ci/ubuntu/one-node/gha-setup.sh start pull arm +rabbitmq-server-start: + ./.ci/ubuntu/one-node/gha-setup.sh start pull rabbitmq-server-stop: ./.ci/ubuntu/one-node/gha-setup.sh stop - - -# TODO: -## publish the documentation on github pages -## you should execute this command only on the `main` branch -# publish-github-pages: -# ## Create the PDF -# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor-pdf index.adoc" -# ## Create the HTML -# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor index.adoc" -# ## copy the PDF and HTML to temp folder -# rm -rf docs/temp -# mkdir -p docs/temp -# cp docs/asciidoc/index.pdf docs/temp/dotnet-stream-client.pdf -# cp docs/asciidoc/index.html docs/temp/index.html -# ## check out the gh-pages branch -# git checkout gh-pages -# ## copy the PDF and HTML to the root folder -# mv docs/temp/dotnet-stream-client.pdf stable/dotnet-stream-client.pdf -# mv docs/temp/index.html stable/htmlsingle/index.html -# ## commit and push -# git add stable/dotnet-stream-client.pdf -# git add stable/htmlsingle/index.html -# git commit -m "Update the documentation" -# git push origin gh-pages -# ## go back to the main branch -# git checkout main diff --git a/RabbitMQ.AMQP.Client/IConsumer.cs b/RabbitMQ.AMQP.Client/IConsumer.cs index aac8507f..fa26d450 100644 --- a/RabbitMQ.AMQP.Client/IConsumer.cs +++ b/RabbitMQ.AMQP.Client/IConsumer.cs @@ -26,11 +26,63 @@ public interface IConsumer : ILifeCycle public interface IContext { + /// + /// Accept the message (AMQP 1.0 accepted outcome). + /// + ///This means the message has been processed and the broker can delete it. + /// + /// Task AcceptAsync(); + + /// + /// Discard the message (AMQP 1.0 rejected outcome). + ///This means the message cannot be processed because it is invalid, the broker can drop it + /// or dead-letter it if it is configured. + /// Task DiscardAsync(); + + /// + ///Discard the message with annotations to combine with the existing message annotations. + ///This means the message cannot be processed because it is invalid, the broker can drop it + ///or dead-letter it if it is configured. + ///Application-specific annotation keys must start with the x-opt- prefix. + ///Annotation keys the broker understands starts with x-, but not with x-opt- + ///. + ///This maps to the AMQP 1.0 + ///modified{delivery-failed = true, undeliverable-here = true} outcome. + ///@param annotations message annotations to combine with existing ones + ///@see AMQP + /// 1.0 modified outcome + /// Task DiscardAsync(Dictionary annotations); + /// + ///Requeue the message (AMQP 1.0 released outcome). + /// + ///This means the message has not been processed and the broker can requeue it and deliver it + /// to the same or a different consumer. + /// + /// Task RequeueAsync(); + + /// + ///Requeue the message with annotations to combine with the existing message annotations. + /// + ///This means the message has not been processed and the broker can requeue it and deliver it + /// to the same or a different consumer. + /// Application-specific annotation keys must start with the x-opt- prefix. + /// Annotation keys the broker understands starts with x-, but not with x-opt- + /// . + /// + /// This maps to the AMQP 1.0 + /// modified{delivery-failed = false, undeliverable-here = false} outcome. + /// + /// @param annotations message annotations to combine with existing ones + ///@see AMQP + /// 1.0 modified outcome + /// Task RequeueAsync(Dictionary annotations); } } diff --git a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs index e8f694d5..d3746b2a 100644 --- a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs +++ b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs @@ -62,7 +62,6 @@ public Task DiscardAsync(Dictionary annotations) { throw new ConsumerException("Link is closed"); } - Utils.ValidateMessageAnnotations(annotations); Task rejectTask = Task.Run(() => diff --git a/Tests/Consumer/ConsumerOutcomeTests.cs b/Tests/Consumer/ConsumerOutcomeTests.cs index 2aa96b62..a20a6f2d 100644 --- a/Tests/Consumer/ConsumerOutcomeTests.cs +++ b/Tests/Consumer/ConsumerOutcomeTests.cs @@ -24,10 +24,67 @@ public void ValidateAnnotations() const string correctAnnotationKey = "x-otp-annotation-key"; // This should not throw an exception because the annotation key starts with "x-" - Utils.ValidateMessageAnnotations(new Dictionary - { - { correctAnnotationKey, annotationValue } - }); + Utils.ValidateMessageAnnotations(new Dictionary { { correctAnnotationKey, annotationValue } }); + } + + + [Fact] + public async Task RequeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + TaskCompletionSource tcsRequeue = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + const string annotationKey = "x-opt-annotation-key"; + const string annotationValue = "annotation-value"; + + const string annotationKey1 = "x-opt-annotation1-key"; + const string annotationValue1 = "annotation1-value"; + + int requeueCount = 0; + + + await _management.Queue().Type(QueueType.QUORUM).Name(_queueName).DeclareAsync(); + List messages = []; + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler( + async (context, message) => + { + messages.Add(message); + if (requeueCount == 0) + { + requeueCount++; + await context.RequeueAsync(new Dictionary + { + { annotationKey, annotationValue }, { annotationKey1, annotationValue1 } + }); + } + else + { + await context.AcceptAsync(); + tcsRequeue.SetResult(true); + } + } + ).Queue(_queueName).BuildAndStartAsync(); + + IMessage message = new AmqpMessage($"message"); + PublishResult pr = await publisher.PublishAsync(message); + + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + + await tcsRequeue.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Equal(2, messages.Count); + Assert.Null(messages[0].Annotation(annotationKey)); + Assert.Null(messages[0].Annotation(annotationKey1)); + Assert.Null(messages[0].Annotation("x-delivery-count")); + + Assert.Equal(messages[1].Annotation(annotationKey), annotationValue); + Assert.Equal(messages[1].Annotation(annotationKey1), annotationValue1); + Assert.NotNull(messages[1].Annotation("x-delivery-count")); + + await consumer.CloseAsync(); } [Fact] @@ -67,7 +124,6 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont }).Queue(dlqQueueName).BuildAndStartAsync(); IMessage mResult = await tcsDl.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.NotNull(mResult); Assert.Equal(mResult.Annotation(annotationKey), annotationValue); await dlConsumer.CloseAsync(); From c3dacee8d8e6a84978f4ac3a3fd56d9c4b23dadc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 26 Sep 2024 15:56:29 +0200 Subject: [PATCH 5/6] Update RabbitMQ version from our dockerhub repo to the official repo Signed-off-by: Gabriele Santomaggio --- .ci/ubuntu/cluster/gha-setup.sh | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh index d2d8cc1a..babb75eb 100755 --- a/.ci/ubuntu/cluster/gha-setup.sh +++ b/.ci/ubuntu/cluster/gha-setup.sh @@ -19,12 +19,7 @@ function run_docker_compose docker compose --file "$script_dir/docker-compose.yml" $@ } -if [[ $2 == 'arm' ]] -then - readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq1:4-management}" -else - readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq1:4-management}" -fi +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}" if [[ ! -v GITHUB_ACTIONS ]] then From 0ae3d4368992717797848e27aed73861c29e7826 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 26 Sep 2024 16:23:54 +0200 Subject: [PATCH 6/6] Add more checks during the tests Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IConsumer.cs | 15 +++++++++------ Tests/Consumer/ConsumerOutcomeTests.cs | 22 ++++++++++++++++++++-- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IConsumer.cs b/RabbitMQ.AMQP.Client/IConsumer.cs index fa26d450..bd54d827 100644 --- a/RabbitMQ.AMQP.Client/IConsumer.cs +++ b/RabbitMQ.AMQP.Client/IConsumer.cs @@ -29,7 +29,7 @@ public interface IContext /// /// Accept the message (AMQP 1.0 accepted outcome). /// - ///This means the message has been processed and the broker can delete it. + /// This means the message has been processed and the broker can delete it. /// /// Task AcceptAsync(); @@ -50,13 +50,14 @@ public interface IContext ///. ///This maps to the AMQP 1.0 ///modified{delivery-failed = true, undeliverable-here = true} outcome. - ///@param annotations message annotations to combine with existing ones - ///@see annotations message annotations to combine with existing ones + ///AMQP /// 1.0 modified outcome + /// + /// The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome /// Task DiscardAsync(Dictionary annotations); - /// ///Requeue the message (AMQP 1.0 released outcome). /// @@ -78,10 +79,12 @@ public interface IContext /// This maps to the AMQP 1.0 /// modified{delivery-failed = false, undeliverable-here = false} outcome. /// - /// @param annotations message annotations to combine with existing ones - ///@see annotations message annotations to combine with existing ones + ///AMQP /// 1.0 modified outcome + /// + ///The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome /// Task RequeueAsync(Dictionary annotations); } diff --git a/Tests/Consumer/ConsumerOutcomeTests.cs b/Tests/Consumer/ConsumerOutcomeTests.cs index a20a6f2d..06df52be 100644 --- a/Tests/Consumer/ConsumerOutcomeTests.cs +++ b/Tests/Consumer/ConsumerOutcomeTests.cs @@ -1,10 +1,13 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using EasyNetQ.Management.Client.Model; using RabbitMQ.AMQP.Client; using RabbitMQ.AMQP.Client.Impl; using Xunit; using Xunit.Abstractions; +using PublishResult = RabbitMQ.AMQP.Client.PublishResult; +using QueueType = RabbitMQ.AMQP.Client.QueueType; namespace Tests.Consumer; @@ -28,6 +31,10 @@ public void ValidateAnnotations() } + /// + /// The test verifies that a requeued message with annotations will contain the annotations on redelivery. + /// The delivered message should contain the custom annotations and x-delivery-count + /// [Fact] public async Task RequeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() { @@ -83,6 +90,9 @@ await context.RequeueAsync(new Dictionary Assert.Equal(messages[1].Annotation(annotationKey), annotationValue); Assert.Equal(messages[1].Annotation(annotationKey1), annotationValue1); Assert.NotNull(messages[1].Annotation("x-delivery-count")); + HttpApiClient client = new(); + Queue q = await client.GetQueueAsync(_queueName); + Assert.Equal(0, q.Messages); await consumer.CloseAsync(); } @@ -92,8 +102,6 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont { string dlqQueueName = $"dlq_{_queueName}"; await DeclareDeadLetterTopology(_queueName, dlqQueueName); - - Assert.NotNull(_connection); Assert.NotNull(_management); @@ -126,6 +134,16 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont IMessage mResult = await tcsDl.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.NotNull(mResult); Assert.Equal(mResult.Annotation(annotationKey), annotationValue); + + var client = new HttpApiClient(); + Queue q = await client.GetQueueAsync(_queueName); + Assert.Equal(0, q.Messages); + + Queue q1 = await client.GetQueueAsync(dlqQueueName); + Assert.Equal(0, q1.Messages); + + + await dlConsumer.CloseAsync(); }