Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions BatchDispositions/BatchDispositions.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>Dispositions</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\RabbitMQ.AMQP.Client\RabbitMQ.AMQP.Client.csproj" />
</ItemGroup>

</Project>
72 changes: 72 additions & 0 deletions BatchDispositions/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// See https://aka.ms/new-console-template for more information

using System.Diagnostics;
using Amqp;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
using IConnection = RabbitMQ.AMQP.Client.IConnection;
using Trace = Amqp.Trace;
using TraceLevel = Amqp.TraceLevel;

Trace.TraceLevel = TraceLevel.Information;

ConsoleTraceListener consoleListener = new();
Trace.TraceListener = (l, f, a) =>
consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}");

Trace.WriteLine(TraceLevel.Information, "Starting the example...");
const string containerId = "dispositions-id";

IEnvironment environment = AmqpEnvironment.Create(
ConnectionSettingsBuilder.Create().ContainerId(containerId).Build());

IConnection connection = await environment.CreateConnectionAsync().ConfigureAwait(false);

Trace.WriteLine(TraceLevel.Information, $"Connected to the broker {connection} successfully");

IManagement management = connection.Management();
const string queueName = "dispositions";
IQueueSpecification queueSpec = management.Queue(queueName).Type(QueueType.QUORUM);
await queueSpec.DeclareAsync().ConfigureAwait(false);

IPublisher publisher = await connection.PublisherBuilder().Queue(queueName)
.BuildAsync().ConfigureAwait(false);

BatchDeliveryContext batchContext = new();
IConsumer consumer = await connection.ConsumerBuilder().Queue(queueName).MessageHandler(
(context, message) =>
{
Trace.WriteLine(TraceLevel.Information, $"[Consumer] Message: {message.BodyAsString()} received");
batchContext.Add(context);
if (batchContext.Count() >= 10)
{
Trace.WriteLine(TraceLevel.Information, "[Consumer] Committing batch of messages");
// here the batch is committed, all messages in the batch will be accepted
// the contexts will be disposed and deleted after the batch is committed
batchContext.Accept();
}
else
{
Trace.WriteLine(TraceLevel.Information, "[Consumer] Adding message to batch");
}
return Task.CompletedTask;
}
).BuildAndStartAsync().ConfigureAwait(false);

const int total = 10;
for (int i = 0; i < total; i++)
{
string body = $"Message {i}";
Trace.WriteLine(TraceLevel.Information, $"[Publisher] Publishing message: {body}");
var message = new AmqpMessage($"Hello World_{i}");
await publisher.PublishAsync(message).ConfigureAwait(false);
// ignoring the publish result for this example
}

Console.WriteLine("Press any key to delete the queue and close the environment.");
Console.ReadKey();

await publisher.CloseAsync().ConfigureAwait(false);
await consumer.CloseAsync().ConfigureAwait(false);
await queueSpec.DeleteAsync().ConfigureAwait(false);
await environment.CloseAsync().ConfigureAwait(false);
5 changes: 5 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public class BatchDeliveryContext : IBatchContext

///<summary>
/// Accept all messages in the batch context (AMQP 1.0 <c>accepted</c> outcome).
/// Contexts are cleared after the operation.
/// </summary>
public void Accept()
{
Expand All @@ -192,6 +193,7 @@ public void Accept()

///<summary>
/// Discard all messages in the batch context (AMQP 1.0 <c>rejected</c> outcome).
/// Contexts are cleared after the operation.
/// </summary>
public void Discard()
{
Expand All @@ -213,6 +215,7 @@ public void Discard()

///<summary>
/// Discard all messages in the batch context with annotations
/// Contexts are cleared after the operation.
/// </summary>
public void Discard(Dictionary<string, object> annotations)
{
Expand All @@ -236,6 +239,7 @@ public void Discard(Dictionary<string, object> annotations)

///<summary>
/// Requeue all messages in the batch context (AMQP 1.0 <c>released</c> outcome).
/// Contexts are cleared after the operation.
/// </summary>
public void Requeue()
{
Expand All @@ -257,6 +261,7 @@ public void Requeue()

///<summary>
/// Requeue all messages in the batch context with annotations
/// Contexts are cleared after the operation.
/// </summary>
public void Requeue(Dictionary<string, object> annotations)
{
Expand Down
7 changes: 7 additions & 0 deletions rabbitmq-amqp-dotnet-client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{91
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuth2", "docs\Examples\OAuth2\OAuth2.csproj", "{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchDispositions", "BatchDispositions\BatchDispositions.csproj", "{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -73,6 +75,10 @@ Global
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.Build.0 = Release|Any CPU
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -84,5 +90,6 @@ Global
{59CB2F07-4A5A-4871-8C97-02EC21C68D6B} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
{D74F49FC-2C9A-4227-8988-30925C509388} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
EndGlobalSection
EndGlobal