Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions .ci/publish-documentation-to-github-pages.sh

This file was deleted.

7 changes: 1 addition & 6 deletions .ci/ubuntu/cluster/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}

if [[ $2 == 'arm' ]]
then
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
else
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
fi
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"

if [[ ! -v GITHUB_ACTIONS ]]
then
Expand Down
7 changes: 1 addition & 6 deletions .ci/ubuntu/one-node/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"

if [[ $3 == 'arm' ]]
then
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
else
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
fi
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"


readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'
Expand Down
2 changes: 1 addition & 1 deletion .ci/windows/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "27.0.1",
"rabbitmq": "4.0.0-beta.6"
"rabbitmq": "4.0.2"
}
4 changes: 2 additions & 2 deletions .github/workflows/wf_build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Test
timeout-minutes: 20
timeout-minutes: 25
run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
Expand Down Expand Up @@ -64,7 +64,7 @@ jobs:
id: start-rabbitmq
run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh
- name: Test
timeout-minutes: 15
timeout-minutes: 25
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed"
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check
Expand Down
31 changes: 2 additions & 29 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,8 @@ build:
test: build
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed"

rabbitmq-server-start-arm:
./.ci/ubuntu/one-node/gha-setup.sh start pull arm
rabbitmq-server-start:
./.ci/ubuntu/one-node/gha-setup.sh start pull

rabbitmq-server-stop:
./.ci/ubuntu/one-node/gha-setup.sh stop


# TODO:
## publish the documentation on github pages
## you should execute this command only on the `main` branch
# publish-github-pages:
# ## Create the PDF
# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor-pdf index.adoc"
# ## Create the HTML
# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor index.adoc"
# ## copy the PDF and HTML to temp folder
# rm -rf docs/temp
# mkdir -p docs/temp
# cp docs/asciidoc/index.pdf docs/temp/dotnet-stream-client.pdf
# cp docs/asciidoc/index.html docs/temp/index.html
# ## check out the gh-pages branch
# git checkout gh-pages
# ## copy the PDF and HTML to the root folder
# mv docs/temp/dotnet-stream-client.pdf stable/dotnet-stream-client.pdf
# mv docs/temp/index.html stable/htmlsingle/index.html
# ## commit and push
# git add stable/dotnet-stream-client.pdf
# git add stable/htmlsingle/index.html
# git commit -m "Update the documentation"
# git push origin gh-pages
# ## go back to the main branch
# git checkout main
59 changes: 59 additions & 0 deletions RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace RabbitMQ.AMQP.Client
Expand All @@ -25,8 +26,66 @@ public interface IConsumer : ILifeCycle

public interface IContext
{
///<summary>
/// Accept the message (AMQP 1.0 <code>accepted</code> outcome).
///
/// This means the message has been processed and the broker can delete it.
///
/// </summary>
Task AcceptAsync();

///<summary>
/// Discard the message (AMQP 1.0 <code>rejected</code> outcome).
///This means the message cannot be processed because it is invalid, the broker can drop it
/// or dead-letter it if it is configured.
///</summary>
Task DiscardAsync();

///<summary>
///Discard the message with annotations to combine with the existing message annotations.
///This means the message cannot be processed because it is invalid, the broker can drop it
///or dead-letter it if it is configured.
///Application-specific annotation keys must start with the <code>x-opt-</code> prefix.
///Annotation keys the broker understands starts with <code>x-</code>, but not with <code>x-opt-
///</code>.
///This maps to the AMQP 1.0 <code>
///modified{delivery-failed = true, undeliverable-here = true}</code> outcome.
/// <param name="annotations"> annotations message annotations to combine with existing ones </param>
///<a
/// href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
/// 1.0 <code>modified</code> outcome</a>
///
/// The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
///</summary>
Task DiscardAsync(Dictionary<string, object> annotations);
///<summary>
///Requeue the message (AMQP 1.0 <code>released</code> outcome).
///
///This means the message has not been processed and the broker can requeue it and deliver it
/// to the same or a different consumer.
///
/// </summary>
Task RequeueAsync();

///<summary>
///Requeue the message with annotations to combine with the existing message annotations.
///
///This means the message has not been processed and the broker can requeue it and deliver it
/// to the same or a different consumer.
/// Application-specific annotation keys must start with the <code>x-opt-</code> prefix.
/// Annotation keys the broker understands starts with <code>x-</code>, but not with <code>x-opt-
/// </code>.
///
/// This maps to the AMQP 1.0 <code>
/// modified{delivery-failed = false, undeliverable-here = false}</code> outcome.
///
/// <param name="annotations"> annotations message annotations to combine with existing ones </param>
///<a
/// href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
/// 1.0 <code>modified</code> outcome</a>
///
///The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
///</summary>
Task RequeueAsync(Dictionary<string, object> annotations);
}
}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public IMessage Annotation(string key, object value)
public object Annotation(string key)
{
ThrowIfAnnotationsNotSet();
return NativeMessage.MessageAnnotations[key];
return NativeMessage.MessageAnnotations[new Symbol(key)];
}
}
}
49 changes: 49 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Collections.Generic;
using System.Threading.Tasks;
using Amqp;
using Amqp.Types;

namespace RabbitMQ.AMQP.Client.Impl
{
Expand Down Expand Up @@ -54,6 +56,30 @@ public Task DiscardAsync()
return rejectTask;
}

public Task DiscardAsync(Dictionary<string, object> annotations)
{
if (_link.IsClosed)
{
throw new ConsumerException("Link is closed");
}
Utils.ValidateMessageAnnotations(annotations);

Task rejectTask = Task.Run(() =>
{
Fields messageAnnotations = new();
foreach (var kvp in annotations)
{
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
}

_link.Modify(_message, true, true, messageAnnotations);
_unsettledMessageCounter.Decrement();
_message.Dispose();
});

return rejectTask;
}

public Task RequeueAsync()
{
if (_link.IsClosed)
Expand All @@ -70,5 +96,28 @@ public Task RequeueAsync()

return requeueTask;
}

public Task RequeueAsync(Dictionary<string, object> annotations)
{
if (_link.IsClosed)
{
throw new ConsumerException("Link is closed");
}
Utils.ValidateMessageAnnotations(annotations);
Task requeueTask = Task.Run(() =>
{
Fields messageAnnotations = new();
foreach (var kvp in annotations)
{
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
}

_link.Modify(_message, false, false, messageAnnotations);
_unsettledMessageCounter.Decrement();
_message.Dispose();
});

return requeueTask;
}
}
}
2 changes: 2 additions & 0 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer
RabbitMQ.AMQP.Client.IContext
RabbitMQ.AMQP.Client.IContext.AcceptAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IContext.DiscardAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IContext.DiscardAsync(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IContext.RequeueAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IContext.RequeueAsync(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IEntityInfo
RabbitMQ.AMQP.Client.IEntityInfoSpecification<T>
RabbitMQ.AMQP.Client.IEntityInfoSpecification<T>.DeclareAsync() -> System.Threading.Tasks.Task<T>!
Expand Down
12 changes: 12 additions & 0 deletions RabbitMQ.AMQP.Client/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Web;
Expand Down Expand Up @@ -177,8 +179,18 @@ internal static bool CompareMap(Map map1, Map map2)
return false;
}
}

return true;
}

internal static void ValidateMessageAnnotations(Dictionary<string, object> annotations)
{
foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-")))
{
throw new ArgumentException(
$"Message annotation keys must start with 'x-': {kvp.Key}");
}
}
}

// TODO why can't we use normal HTTP encoding?
Expand Down
Loading