Skip to content

Commit 3874315

Browse files
authored
Add disposition example (#126)
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c3262ca commit 3874315

File tree

4 files changed

+99
-0
lines changed

4 files changed

+99
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
<RootNamespace>Dispositions</RootNamespace>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\RabbitMQ.AMQP.Client\RabbitMQ.AMQP.Client.csproj" />
13+
</ItemGroup>
14+
15+
</Project>

BatchDispositions/Program.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// See https://aka.ms/new-console-template for more information
2+
3+
using System.Diagnostics;
4+
using Amqp;
5+
using RabbitMQ.AMQP.Client;
6+
using RabbitMQ.AMQP.Client.Impl;
7+
using IConnection = RabbitMQ.AMQP.Client.IConnection;
8+
using Trace = Amqp.Trace;
9+
using TraceLevel = Amqp.TraceLevel;
10+
11+
Trace.TraceLevel = TraceLevel.Information;
12+
13+
ConsoleTraceListener consoleListener = new();
14+
Trace.TraceListener = (l, f, a) =>
15+
consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}");
16+
17+
Trace.WriteLine(TraceLevel.Information, "Starting the example...");
18+
const string containerId = "dispositions-id";
19+
20+
IEnvironment environment = AmqpEnvironment.Create(
21+
ConnectionSettingsBuilder.Create().ContainerId(containerId).Build());
22+
23+
IConnection connection = await environment.CreateConnectionAsync().ConfigureAwait(false);
24+
25+
Trace.WriteLine(TraceLevel.Information, $"Connected to the broker {connection} successfully");
26+
27+
IManagement management = connection.Management();
28+
const string queueName = "dispositions";
29+
IQueueSpecification queueSpec = management.Queue(queueName).Type(QueueType.QUORUM);
30+
await queueSpec.DeclareAsync().ConfigureAwait(false);
31+
32+
IPublisher publisher = await connection.PublisherBuilder().Queue(queueName)
33+
.BuildAsync().ConfigureAwait(false);
34+
35+
BatchDeliveryContext batchContext = new();
36+
IConsumer consumer = await connection.ConsumerBuilder().Queue(queueName).MessageHandler(
37+
(context, message) =>
38+
{
39+
Trace.WriteLine(TraceLevel.Information, $"[Consumer] Message: {message.BodyAsString()} received");
40+
batchContext.Add(context);
41+
if (batchContext.Count() >= 10)
42+
{
43+
Trace.WriteLine(TraceLevel.Information, "[Consumer] Committing batch of messages");
44+
// here the batch is committed, all messages in the batch will be accepted
45+
// the contexts will be disposed and deleted after the batch is committed
46+
batchContext.Accept();
47+
}
48+
else
49+
{
50+
Trace.WriteLine(TraceLevel.Information, "[Consumer] Adding message to batch");
51+
}
52+
return Task.CompletedTask;
53+
}
54+
).BuildAndStartAsync().ConfigureAwait(false);
55+
56+
const int total = 10;
57+
for (int i = 0; i < total; i++)
58+
{
59+
string body = $"Message {i}";
60+
Trace.WriteLine(TraceLevel.Information, $"[Publisher] Publishing message: {body}");
61+
var message = new AmqpMessage($"Hello World_{i}");
62+
await publisher.PublishAsync(message).ConfigureAwait(false);
63+
// ignoring the publish result for this example
64+
}
65+
66+
Console.WriteLine("Press any key to delete the queue and close the environment.");
67+
Console.ReadKey();
68+
69+
await publisher.CloseAsync().ConfigureAwait(false);
70+
await consumer.CloseAsync().ConfigureAwait(false);
71+
await queueSpec.DeleteAsync().ConfigureAwait(false);
72+
await environment.CloseAsync().ConfigureAwait(false);

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ public class BatchDeliveryContext : IBatchContext
171171

172172
///<summary>
173173
/// Accept all messages in the batch context (AMQP 1.0 <c>accepted</c> outcome).
174+
/// Contexts are cleared after the operation.
174175
/// </summary>
175176
public void Accept()
176177
{
@@ -192,6 +193,7 @@ public void Accept()
192193

193194
///<summary>
194195
/// Discard all messages in the batch context (AMQP 1.0 <c>rejected</c> outcome).
196+
/// Contexts are cleared after the operation.
195197
/// </summary>
196198
public void Discard()
197199
{
@@ -213,6 +215,7 @@ public void Discard()
213215

214216
///<summary>
215217
/// Discard all messages in the batch context with annotations
218+
/// Contexts are cleared after the operation.
216219
/// </summary>
217220
public void Discard(Dictionary<string, object> annotations)
218221
{
@@ -236,6 +239,7 @@ public void Discard(Dictionary<string, object> annotations)
236239

237240
///<summary>
238241
/// Requeue all messages in the batch context (AMQP 1.0 <c>released</c> outcome).
242+
/// Contexts are cleared after the operation.
239243
/// </summary>
240244
public void Requeue()
241245
{
@@ -257,6 +261,7 @@ public void Requeue()
257261

258262
///<summary>
259263
/// Requeue all messages in the batch context with annotations
264+
/// Contexts are cleared after the operation.
260265
/// </summary>
261266
public void Requeue(Dictionary<string, object> annotations)
262267
{

rabbitmq-amqp-dotnet-client.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{91
3535
EndProject
3636
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuth2", "docs\Examples\OAuth2\OAuth2.csproj", "{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}"
3737
EndProject
38+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchDispositions", "BatchDispositions\BatchDispositions.csproj", "{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}"
39+
EndProject
3840
Global
3941
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4042
Debug|Any CPU = Debug|Any CPU
@@ -73,6 +75,10 @@ Global
7375
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Debug|Any CPU.Build.0 = Debug|Any CPU
7476
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.ActiveCfg = Release|Any CPU
7577
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.Build.0 = Release|Any CPU
78+
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
79+
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Debug|Any CPU.Build.0 = Debug|Any CPU
80+
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Release|Any CPU.ActiveCfg = Release|Any CPU
81+
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Release|Any CPU.Build.0 = Release|Any CPU
7682
EndGlobalSection
7783
GlobalSection(SolutionProperties) = preSolution
7884
HideSolutionNode = FALSE
@@ -84,5 +90,6 @@ Global
8490
{59CB2F07-4A5A-4871-8C97-02EC21C68D6B} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
8591
{D74F49FC-2C9A-4227-8988-30925C509388} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
8692
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
93+
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
8794
EndGlobalSection
8895
EndGlobal

0 commit comments

Comments
 (0)