diff --git a/.ci/publish-documentation-to-github-pages.sh b/.ci/publish-documentation-to-github-pages.sh deleted file mode 100755 index 76a54c28..00000000 --- a/.ci/publish-documentation-to-github-pages.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash - -. $(pwd)/release-versions.txt - -MESSAGE=$(git log -1 --pretty=%B) -./mvnw buildnumber:create pre-site --no-transfer-progress - -./mvnw javadoc:javadoc -Dmaven.javadoc.skip=false --no-transfer-progress - -if [ -e target/site/apidocs/element-list ] - then cp target/site/apidocs/element-list target/site/apidocs/package-list -fi - -RELEASE_VERSION=$(cat pom.xml | grep -oPm1 "(?<=)[^<]+") - -# GHA does shallow clones, so need the next 2 commands to have the gh-pages branch -git remote set-branches origin 'gh-pages' -git fetch -v - -git checkout gh-pages -mkdir -p $RELEASE_VERSION/htmlsingle -cp target/generated-docs/index.html $RELEASE_VERSION/htmlsingle -mkdir -p $RELEASE_VERSION/api -cp -r target/site/apidocs/* $RELEASE_VERSION/api/ -git add $RELEASE_VERSION/ - -if [[ $LATEST == "true" ]] - then - if [[ $RELEASE_VERSION == *[RCM]* ]] - then - DOC_DIR="milestone" - elif [[ $RELEASE_VERSION == *SNAPSHOT* ]] - then - DOC_DIR="snapshot" - else - DOC_DIR="stable" - fi - - mkdir -p $DOC_DIR/htmlsingle - cp target/generated-docs/index.html $DOC_DIR/htmlsingle - mkdir -p $DOC_DIR/api - cp -r target/site/apidocs/* $DOC_DIR/api/ - git add $DOC_DIR/ - -fi - -git commit -m "$MESSAGE" -git push origin gh-pages -git checkout main diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh index 3c37d04f..babb75eb 100755 --- a/.ci/ubuntu/cluster/gha-setup.sh +++ b/.ci/ubuntu/cluster/gha-setup.sh @@ -19,12 +19,7 @@ function run_docker_compose docker compose --file "$script_dir/docker-compose.yml" $@ } -if [[ $2 == 'arm' ]] -then - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" -else - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" -fi +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}" if [[ ! -v GITHUB_ACTIONS ]] then diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh index bbcbcaa2..c170a28b 100755 --- a/.ci/ubuntu/one-node/gha-setup.sh +++ b/.ci/ubuntu/one-node/gha-setup.sh @@ -8,12 +8,7 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -if [[ $3 == 'arm' ]] -then - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" -else - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" -fi +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}" readonly docker_name_prefix='rabbitmq-amqp-dotnet-client' diff --git a/.ci/windows/versions.json b/.ci/windows/versions.json index 0b6ecdbe..91caa981 100644 --- a/.ci/windows/versions.json +++ b/.ci/windows/versions.json @@ -1,4 +1,4 @@ { "erlang": "27.0.1", - "rabbitmq": "4.0.0-beta.6" + "rabbitmq": "4.0.2" } diff --git a/.github/workflows/wf_build-and-test.yaml b/.github/workflows/wf_build-and-test.yaml index 539f9c29..dd7ed263 100644 --- a/.github/workflows/wf_build-and-test.yaml +++ b/.github/workflows/wf_build-and-test.yaml @@ -34,7 +34,7 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Test - timeout-minutes: 20 + timeout-minutes: 25 run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed' - name: Check for errors in RabbitMQ logs run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 @@ -64,7 +64,7 @@ jobs: id: start-rabbitmq run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh - name: Test - timeout-minutes: 15 + timeout-minutes: 25 run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" - name: Check for errors in RabbitMQ logs run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check diff --git a/Makefile b/Makefile index fa1adbb3..03c88bf9 100644 --- a/Makefile +++ b/Makefile @@ -9,35 +9,8 @@ build: test: build dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" -rabbitmq-server-start-arm: - ./.ci/ubuntu/one-node/gha-setup.sh start pull arm +rabbitmq-server-start: + ./.ci/ubuntu/one-node/gha-setup.sh start pull rabbitmq-server-stop: ./.ci/ubuntu/one-node/gha-setup.sh stop - - -# TODO: -## publish the documentation on github pages -## you should execute this command only on the `main` branch -# publish-github-pages: -# ## Create the PDF -# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor-pdf index.adoc" -# ## Create the HTML -# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor index.adoc" -# ## copy the PDF and HTML to temp folder -# rm -rf docs/temp -# mkdir -p docs/temp -# cp docs/asciidoc/index.pdf docs/temp/dotnet-stream-client.pdf -# cp docs/asciidoc/index.html docs/temp/index.html -# ## check out the gh-pages branch -# git checkout gh-pages -# ## copy the PDF and HTML to the root folder -# mv docs/temp/dotnet-stream-client.pdf stable/dotnet-stream-client.pdf -# mv docs/temp/index.html stable/htmlsingle/index.html -# ## commit and push -# git add stable/dotnet-stream-client.pdf -# git add stable/htmlsingle/index.html -# git commit -m "Update the documentation" -# git push origin gh-pages -# ## go back to the main branch -# git checkout main diff --git a/RabbitMQ.AMQP.Client/IConsumer.cs b/RabbitMQ.AMQP.Client/IConsumer.cs index a3c0b3a3..bd54d827 100644 --- a/RabbitMQ.AMQP.Client/IConsumer.cs +++ b/RabbitMQ.AMQP.Client/IConsumer.cs @@ -3,6 +3,7 @@ // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace RabbitMQ.AMQP.Client @@ -25,8 +26,66 @@ public interface IConsumer : ILifeCycle public interface IContext { + /// + /// Accept the message (AMQP 1.0 accepted outcome). + /// + /// This means the message has been processed and the broker can delete it. + /// + /// Task AcceptAsync(); + + /// + /// Discard the message (AMQP 1.0 rejected outcome). + ///This means the message cannot be processed because it is invalid, the broker can drop it + /// or dead-letter it if it is configured. + /// Task DiscardAsync(); + + /// + ///Discard the message with annotations to combine with the existing message annotations. + ///This means the message cannot be processed because it is invalid, the broker can drop it + ///or dead-letter it if it is configured. + ///Application-specific annotation keys must start with the x-opt- prefix. + ///Annotation keys the broker understands starts with x-, but not with x-opt- + ///. + ///This maps to the AMQP 1.0 + ///modified{delivery-failed = true, undeliverable-here = true} outcome. + /// annotations message annotations to combine with existing ones + ///AMQP + /// 1.0 modified outcome + /// + /// The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome + /// + Task DiscardAsync(Dictionary annotations); + /// + ///Requeue the message (AMQP 1.0 released outcome). + /// + ///This means the message has not been processed and the broker can requeue it and deliver it + /// to the same or a different consumer. + /// + /// Task RequeueAsync(); + + /// + ///Requeue the message with annotations to combine with the existing message annotations. + /// + ///This means the message has not been processed and the broker can requeue it and deliver it + /// to the same or a different consumer. + /// Application-specific annotation keys must start with the x-opt- prefix. + /// Annotation keys the broker understands starts with x-, but not with x-opt- + /// . + /// + /// This maps to the AMQP 1.0 + /// modified{delivery-failed = false, undeliverable-here = false} outcome. + /// + /// annotations message annotations to combine with existing ones + ///AMQP + /// 1.0 modified outcome + /// + ///The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome + /// + Task RequeueAsync(Dictionary annotations); } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index c5e5d394..020d4646 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -130,7 +130,7 @@ public IMessage Annotation(string key, object value) public object Annotation(string key) { ThrowIfAnnotationsNotSet(); - return NativeMessage.MessageAnnotations[key]; + return NativeMessage.MessageAnnotations[new Symbol(key)]; } } } diff --git a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs index 427554a0..d3746b2a 100644 --- a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs +++ b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs @@ -2,8 +2,10 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System.Collections.Generic; using System.Threading.Tasks; using Amqp; +using Amqp.Types; namespace RabbitMQ.AMQP.Client.Impl { @@ -54,6 +56,30 @@ public Task DiscardAsync() return rejectTask; } + public Task DiscardAsync(Dictionary annotations) + { + if (_link.IsClosed) + { + throw new ConsumerException("Link is closed"); + } + Utils.ValidateMessageAnnotations(annotations); + + Task rejectTask = Task.Run(() => + { + Fields messageAnnotations = new(); + foreach (var kvp in annotations) + { + messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value); + } + + _link.Modify(_message, true, true, messageAnnotations); + _unsettledMessageCounter.Decrement(); + _message.Dispose(); + }); + + return rejectTask; + } + public Task RequeueAsync() { if (_link.IsClosed) @@ -70,5 +96,28 @@ public Task RequeueAsync() return requeueTask; } + + public Task RequeueAsync(Dictionary annotations) + { + if (_link.IsClosed) + { + throw new ConsumerException("Link is closed"); + } + Utils.ValidateMessageAnnotations(annotations); + Task requeueTask = Task.Run(() => + { + Fields messageAnnotations = new(); + foreach (var kvp in annotations) + { + messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value); + } + + _link.Modify(_message, false, false, messageAnnotations); + _unsettledMessageCounter.Decrement(); + _message.Dispose(); + }); + + return requeueTask; + } } } diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index a361a8d3..519edd8c 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -125,7 +125,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer RabbitMQ.AMQP.Client.IContext RabbitMQ.AMQP.Client.IContext.AcceptAsync() -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IContext.DiscardAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IContext.DiscardAsync(System.Collections.Generic.Dictionary! annotations) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IContext.RequeueAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IContext.RequeueAsync(System.Collections.Generic.Dictionary! annotations) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IEntityInfo RabbitMQ.AMQP.Client.IEntityInfoSpecification RabbitMQ.AMQP.Client.IEntityInfoSpecification.DeclareAsync() -> System.Threading.Tasks.Task! diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs index f0d39a22..1f5444fe 100644 --- a/RabbitMQ.AMQP.Client/Utils.cs +++ b/RabbitMQ.AMQP.Client/Utils.cs @@ -3,6 +3,8 @@ // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Collections.Generic; +using System.Linq; using System.Security.Cryptography; using System.Text; using System.Web; @@ -177,8 +179,18 @@ internal static bool CompareMap(Map map1, Map map2) return false; } } + return true; } + + internal static void ValidateMessageAnnotations(Dictionary annotations) + { + foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-"))) + { + throw new ArgumentException( + $"Message annotation keys must start with 'x-': {kvp.Key}"); + } + } } // TODO why can't we use normal HTTP encoding? diff --git a/Tests/Consumer/ConsumerOutcomeTests.cs b/Tests/Consumer/ConsumerOutcomeTests.cs new file mode 100644 index 00000000..06df52be --- /dev/null +++ b/Tests/Consumer/ConsumerOutcomeTests.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using EasyNetQ.Management.Client.Model; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; +using PublishResult = RabbitMQ.AMQP.Client.PublishResult; +using QueueType = RabbitMQ.AMQP.Client.QueueType; + +namespace Tests.Consumer; + +public class ConsumerOutcomeTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) +{ + [Fact] + public void ValidateAnnotations() + { + const string wrongAnnotationKey = "missing-the-start-x-annotation-key"; + const string annotationValue = "annotation-value"; + // This should throw an exception because the annotation key does not start with "x-" + Assert.Throws(() => + Utils.ValidateMessageAnnotations(new Dictionary + { + { wrongAnnotationKey, annotationValue } + })); + + const string correctAnnotationKey = "x-otp-annotation-key"; + // This should not throw an exception because the annotation key starts with "x-" + Utils.ValidateMessageAnnotations(new Dictionary { { correctAnnotationKey, annotationValue } }); + } + + + /// + /// The test verifies that a requeued message with annotations will contain the annotations on redelivery. + /// The delivered message should contain the custom annotations and x-delivery-count + /// + [Fact] + public async Task RequeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + TaskCompletionSource tcsRequeue = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + const string annotationKey = "x-opt-annotation-key"; + const string annotationValue = "annotation-value"; + + const string annotationKey1 = "x-opt-annotation1-key"; + const string annotationValue1 = "annotation1-value"; + + int requeueCount = 0; + + + await _management.Queue().Type(QueueType.QUORUM).Name(_queueName).DeclareAsync(); + List messages = []; + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler( + async (context, message) => + { + messages.Add(message); + if (requeueCount == 0) + { + requeueCount++; + await context.RequeueAsync(new Dictionary + { + { annotationKey, annotationValue }, { annotationKey1, annotationValue1 } + }); + } + else + { + await context.AcceptAsync(); + tcsRequeue.SetResult(true); + } + } + ).Queue(_queueName).BuildAndStartAsync(); + + IMessage message = new AmqpMessage($"message"); + PublishResult pr = await publisher.PublishAsync(message); + + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + + await tcsRequeue.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Equal(2, messages.Count); + Assert.Null(messages[0].Annotation(annotationKey)); + Assert.Null(messages[0].Annotation(annotationKey1)); + Assert.Null(messages[0].Annotation("x-delivery-count")); + + Assert.Equal(messages[1].Annotation(annotationKey), annotationValue); + Assert.Equal(messages[1].Annotation(annotationKey1), annotationValue1); + Assert.NotNull(messages[1].Annotation("x-delivery-count")); + HttpApiClient client = new(); + Queue q = await client.GetQueueAsync(_queueName); + Assert.Equal(0, q.Messages); + + await consumer.CloseAsync(); + } + + [Fact] + public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured() + { + string dlqQueueName = $"dlq_{_queueName}"; + await DeclareDeadLetterTopology(_queueName, dlqQueueName); + Assert.NotNull(_connection); + Assert.NotNull(_management); + + const string annotationKey = "x-opt-annotation-key"; + const string annotationValue = "annotation-value"; + TaskCompletionSource tcs = + new(TaskCreationOptions.RunContinuationsAsynchronously); + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler( + async (context, _) => + { + await context.DiscardAsync(new Dictionary { { annotationKey, annotationValue } }); + tcs.SetResult(true); + } + ).Queue(_queueName).BuildAndStartAsync(); + + IMessage message = new AmqpMessage($"message"); + PublishResult pr = await publisher.PublishAsync(message); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await consumer.CloseAsync(); + TaskCompletionSource tcsDl = + new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer dlConsumer = await _connection.ConsumerBuilder().MessageHandler(async (context, message1) => + { + await context.AcceptAsync(); + tcsDl.SetResult(message1); + }).Queue(dlqQueueName).BuildAndStartAsync(); + + IMessage mResult = await tcsDl.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.NotNull(mResult); + Assert.Equal(mResult.Annotation(annotationKey), annotationValue); + + var client = new HttpApiClient(); + Queue q = await client.GetQueueAsync(_queueName); + Assert.Equal(0, q.Messages); + + Queue q1 = await client.GetQueueAsync(dlqQueueName); + Assert.Equal(0, q1.Messages); + + + + await dlConsumer.CloseAsync(); + } + + + private async Task DeclareDeadLetterTopology(string queueName, string dlxQueueName) + { + string dlx = $"{queueName}.dlx"; + Assert.NotNull(_management); + await _management.Queue().Name(queueName).Type(QueueType.QUORUM).DeadLetterExchange(dlx).DeclareAsync(); + await _management.Exchange(dlx).Type(ExchangeType.FANOUT).AutoDelete(true).DeclareAsync(); + await _management.Queue(dlxQueueName).Exclusive(true).DeclareAsync(); + await _management.Binding().SourceExchange(dlx).DestinationQueue(dlxQueueName).BindAsync(); + } +}