Skip to content

Commit c1e4f5d

Browse files
authored
DiscardAsync and RequeueAsync with Annotations (#70)
* Add RequeueAsync(Dictionary<string, object> annotations); * Add DiscardAsync(Dictionary<string, object> annotations); * Change the Docker image from PivotalHub to Official DockerHub --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 35cc6f6 commit c1e4f5d

File tree

12 files changed

+290
-94
lines changed

12 files changed

+290
-94
lines changed

.ci/publish-documentation-to-github-pages.sh

Lines changed: 0 additions & 49 deletions
This file was deleted.

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
if [[ $2 == 'arm' ]]
23-
then
24-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
25-
else
26-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
27-
fi
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
2823

2924
if [[ ! -v GITHUB_ACTIONS ]]
3025
then

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,7 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

11-
if [[ $3 == 'arm' ]]
12-
then
13-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
14-
else
15-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
16-
fi
11+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
1712

1813

1914
readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'

.ci/windows/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "27.0.1",
3-
"rabbitmq": "4.0.0-beta.6"
3+
"rabbitmq": "4.0.2"
44
}

.github/workflows/wf_build-and-test.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
id: install-start-rabbitmq
3535
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
3636
- name: Test
37-
timeout-minutes: 20
37+
timeout-minutes: 25
3838
run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed'
3939
- name: Check for errors in RabbitMQ logs
4040
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
@@ -64,7 +64,7 @@ jobs:
6464
id: start-rabbitmq
6565
run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh
6666
- name: Test
67-
timeout-minutes: 15
67+
timeout-minutes: 25
6868
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed"
6969
- name: Check for errors in RabbitMQ logs
7070
run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check

Makefile

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,8 @@ build:
99
test: build
1010
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed"
1111

12-
rabbitmq-server-start-arm:
13-
./.ci/ubuntu/one-node/gha-setup.sh start pull arm
12+
rabbitmq-server-start:
13+
./.ci/ubuntu/one-node/gha-setup.sh start pull
1414

1515
rabbitmq-server-stop:
1616
./.ci/ubuntu/one-node/gha-setup.sh stop
17-
18-
19-
# TODO:
20-
## publish the documentation on github pages
21-
## you should execute this command only on the `main` branch
22-
# publish-github-pages:
23-
# ## Create the PDF
24-
# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor-pdf index.adoc"
25-
# ## Create the HTML
26-
# docker run -it -v $(shell pwd)/docs/:/client_doc/ asciidoctor/docker-asciidoctor /bin/bash -c "cd /client_doc/asciidoc && asciidoctor index.adoc"
27-
# ## copy the PDF and HTML to temp folder
28-
# rm -rf docs/temp
29-
# mkdir -p docs/temp
30-
# cp docs/asciidoc/index.pdf docs/temp/dotnet-stream-client.pdf
31-
# cp docs/asciidoc/index.html docs/temp/index.html
32-
# ## check out the gh-pages branch
33-
# git checkout gh-pages
34-
# ## copy the PDF and HTML to the root folder
35-
# mv docs/temp/dotnet-stream-client.pdf stable/dotnet-stream-client.pdf
36-
# mv docs/temp/index.html stable/htmlsingle/index.html
37-
# ## commit and push
38-
# git add stable/dotnet-stream-client.pdf
39-
# git add stable/htmlsingle/index.html
40-
# git commit -m "Update the documentation"
41-
# git push origin gh-pages
42-
# ## go back to the main branch
43-
# git checkout main

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Collections.Generic;
67
using System.Threading.Tasks;
78

89
namespace RabbitMQ.AMQP.Client
@@ -25,8 +26,66 @@ public interface IConsumer : ILifeCycle
2526

2627
public interface IContext
2728
{
29+
///<summary>
30+
/// Accept the message (AMQP 1.0 <code>accepted</code> outcome).
31+
///
32+
/// This means the message has been processed and the broker can delete it.
33+
///
34+
/// </summary>
2835
Task AcceptAsync();
36+
37+
///<summary>
38+
/// Discard the message (AMQP 1.0 <code>rejected</code> outcome).
39+
///This means the message cannot be processed because it is invalid, the broker can drop it
40+
/// or dead-letter it if it is configured.
41+
///</summary>
2942
Task DiscardAsync();
43+
44+
///<summary>
45+
///Discard the message with annotations to combine with the existing message annotations.
46+
///This means the message cannot be processed because it is invalid, the broker can drop it
47+
///or dead-letter it if it is configured.
48+
///Application-specific annotation keys must start with the <code>x-opt-</code> prefix.
49+
///Annotation keys the broker understands starts with <code>x-</code>, but not with <code>x-opt-
50+
///</code>.
51+
///This maps to the AMQP 1.0 <code>
52+
///modified{delivery-failed = true, undeliverable-here = true}</code> outcome.
53+
/// <param name="annotations"> annotations message annotations to combine with existing ones </param>
54+
///<a
55+
/// href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
56+
/// 1.0 <code>modified</code> outcome</a>
57+
///
58+
/// The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
59+
///</summary>
60+
Task DiscardAsync(Dictionary<string, object> annotations);
61+
///<summary>
62+
///Requeue the message (AMQP 1.0 <code>released</code> outcome).
63+
///
64+
///This means the message has not been processed and the broker can requeue it and deliver it
65+
/// to the same or a different consumer.
66+
///
67+
/// </summary>
3068
Task RequeueAsync();
69+
70+
///<summary>
71+
///Requeue the message with annotations to combine with the existing message annotations.
72+
///
73+
///This means the message has not been processed and the broker can requeue it and deliver it
74+
/// to the same or a different consumer.
75+
/// Application-specific annotation keys must start with the <code>x-opt-</code> prefix.
76+
/// Annotation keys the broker understands starts with <code>x-</code>, but not with <code>x-opt-
77+
/// </code>.
78+
///
79+
/// This maps to the AMQP 1.0 <code>
80+
/// modified{delivery-failed = false, undeliverable-here = false}</code> outcome.
81+
///
82+
/// <param name="annotations"> annotations message annotations to combine with existing ones </param>
83+
///<a
84+
/// href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
85+
/// 1.0 <code>modified</code> outcome</a>
86+
///
87+
///The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
88+
///</summary>
89+
Task RequeueAsync(Dictionary<string, object> annotations);
3190
}
3291
}

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public IMessage Annotation(string key, object value)
130130
public object Annotation(string key)
131131
{
132132
ThrowIfAnnotationsNotSet();
133-
return NativeMessage.MessageAnnotations[key];
133+
return NativeMessage.MessageAnnotations[new Symbol(key)];
134134
}
135135
}
136136
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System.Collections.Generic;
56
using System.Threading.Tasks;
67
using Amqp;
8+
using Amqp.Types;
79

810
namespace RabbitMQ.AMQP.Client.Impl
911
{
@@ -54,6 +56,30 @@ public Task DiscardAsync()
5456
return rejectTask;
5557
}
5658

59+
public Task DiscardAsync(Dictionary<string, object> annotations)
60+
{
61+
if (_link.IsClosed)
62+
{
63+
throw new ConsumerException("Link is closed");
64+
}
65+
Utils.ValidateMessageAnnotations(annotations);
66+
67+
Task rejectTask = Task.Run(() =>
68+
{
69+
Fields messageAnnotations = new();
70+
foreach (var kvp in annotations)
71+
{
72+
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
73+
}
74+
75+
_link.Modify(_message, true, true, messageAnnotations);
76+
_unsettledMessageCounter.Decrement();
77+
_message.Dispose();
78+
});
79+
80+
return rejectTask;
81+
}
82+
5783
public Task RequeueAsync()
5884
{
5985
if (_link.IsClosed)
@@ -70,5 +96,28 @@ public Task RequeueAsync()
7096

7197
return requeueTask;
7298
}
99+
100+
public Task RequeueAsync(Dictionary<string, object> annotations)
101+
{
102+
if (_link.IsClosed)
103+
{
104+
throw new ConsumerException("Link is closed");
105+
}
106+
Utils.ValidateMessageAnnotations(annotations);
107+
Task requeueTask = Task.Run(() =>
108+
{
109+
Fields messageAnnotations = new();
110+
foreach (var kvp in annotations)
111+
{
112+
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
113+
}
114+
115+
_link.Modify(_message, false, false, messageAnnotations);
116+
_unsettledMessageCounter.Decrement();
117+
_message.Dispose();
118+
});
119+
120+
return requeueTask;
121+
}
73122
}
74123
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer
125125
RabbitMQ.AMQP.Client.IContext
126126
RabbitMQ.AMQP.Client.IContext.AcceptAsync() -> System.Threading.Tasks.Task!
127127
RabbitMQ.AMQP.Client.IContext.DiscardAsync() -> System.Threading.Tasks.Task!
128+
RabbitMQ.AMQP.Client.IContext.DiscardAsync(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> System.Threading.Tasks.Task!
128129
RabbitMQ.AMQP.Client.IContext.RequeueAsync() -> System.Threading.Tasks.Task!
130+
RabbitMQ.AMQP.Client.IContext.RequeueAsync(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> System.Threading.Tasks.Task!
129131
RabbitMQ.AMQP.Client.IEntityInfo
130132
RabbitMQ.AMQP.Client.IEntityInfoSpecification<T>
131133
RabbitMQ.AMQP.Client.IEntityInfoSpecification<T>.DeclareAsync() -> System.Threading.Tasks.Task<T>!

0 commit comments

Comments
 (0)