From 9db5e041124f55000545f3a8812bdfa64cf5cb02 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 18 Jun 2025 15:33:05 +0200 Subject: [PATCH] Add disposition example Signed-off-by: Gabriele Santomaggio --- BatchDispositions/BatchDispositions.csproj | 15 ++++ BatchDispositions/Program.cs | 72 ++++++++++++++++++++ RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs | 5 ++ rabbitmq-amqp-dotnet-client.sln | 7 ++ 4 files changed, 99 insertions(+) create mode 100644 BatchDispositions/BatchDispositions.csproj create mode 100644 BatchDispositions/Program.cs diff --git a/BatchDispositions/BatchDispositions.csproj b/BatchDispositions/BatchDispositions.csproj new file mode 100644 index 0000000..1b442de --- /dev/null +++ b/BatchDispositions/BatchDispositions.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + enable + enable + Dispositions + + + + + + + diff --git a/BatchDispositions/Program.cs b/BatchDispositions/Program.cs new file mode 100644 index 0000000..bf35de9 --- /dev/null +++ b/BatchDispositions/Program.cs @@ -0,0 +1,72 @@ +// See https://aka.ms/new-console-template for more information + +using System.Diagnostics; +using Amqp; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using IConnection = RabbitMQ.AMQP.Client.IConnection; +using Trace = Amqp.Trace; +using TraceLevel = Amqp.TraceLevel; + +Trace.TraceLevel = TraceLevel.Information; + +ConsoleTraceListener consoleListener = new(); +Trace.TraceListener = (l, f, a) => + consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}"); + +Trace.WriteLine(TraceLevel.Information, "Starting the example..."); +const string containerId = "dispositions-id"; + +IEnvironment environment = AmqpEnvironment.Create( + ConnectionSettingsBuilder.Create().ContainerId(containerId).Build()); + +IConnection connection = await environment.CreateConnectionAsync().ConfigureAwait(false); + +Trace.WriteLine(TraceLevel.Information, $"Connected to the broker {connection} successfully"); + +IManagement management = connection.Management(); +const string queueName = "dispositions"; +IQueueSpecification queueSpec = management.Queue(queueName).Type(QueueType.QUORUM); +await queueSpec.DeclareAsync().ConfigureAwait(false); + +IPublisher publisher = await connection.PublisherBuilder().Queue(queueName) + .BuildAsync().ConfigureAwait(false); + +BatchDeliveryContext batchContext = new(); +IConsumer consumer = await connection.ConsumerBuilder().Queue(queueName).MessageHandler( + (context, message) => + { + Trace.WriteLine(TraceLevel.Information, $"[Consumer] Message: {message.BodyAsString()} received"); + batchContext.Add(context); + if (batchContext.Count() >= 10) + { + Trace.WriteLine(TraceLevel.Information, "[Consumer] Committing batch of messages"); + // here the batch is committed, all messages in the batch will be accepted + // the contexts will be disposed and deleted after the batch is committed + batchContext.Accept(); + } + else + { + Trace.WriteLine(TraceLevel.Information, "[Consumer] Adding message to batch"); + } + return Task.CompletedTask; + } +).BuildAndStartAsync().ConfigureAwait(false); + +const int total = 10; +for (int i = 0; i < total; i++) +{ + string body = $"Message {i}"; + Trace.WriteLine(TraceLevel.Information, $"[Publisher] Publishing message: {body}"); + var message = new AmqpMessage($"Hello World_{i}"); + await publisher.PublishAsync(message).ConfigureAwait(false); + // ignoring the publish result for this example +} + +Console.WriteLine("Press any key to delete the queue and close the environment."); +Console.ReadKey(); + +await publisher.CloseAsync().ConfigureAwait(false); +await consumer.CloseAsync().ConfigureAwait(false); +await queueSpec.DeleteAsync().ConfigureAwait(false); +await environment.CloseAsync().ConfigureAwait(false); diff --git a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs index 52efcc4..03173a6 100644 --- a/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs +++ b/RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs @@ -171,6 +171,7 @@ public class BatchDeliveryContext : IBatchContext /// /// Accept all messages in the batch context (AMQP 1.0 accepted outcome). + /// Contexts are cleared after the operation. /// public void Accept() { @@ -192,6 +193,7 @@ public void Accept() /// /// Discard all messages in the batch context (AMQP 1.0 rejected outcome). + /// Contexts are cleared after the operation. /// public void Discard() { @@ -213,6 +215,7 @@ public void Discard() /// /// Discard all messages in the batch context with annotations + /// Contexts are cleared after the operation. /// public void Discard(Dictionary annotations) { @@ -236,6 +239,7 @@ public void Discard(Dictionary annotations) /// /// Requeue all messages in the batch context (AMQP 1.0 released outcome). + /// Contexts are cleared after the operation. /// public void Requeue() { @@ -257,6 +261,7 @@ public void Requeue() /// /// Requeue all messages in the batch context with annotations + /// Contexts are cleared after the operation. /// public void Requeue(Dictionary annotations) { diff --git a/rabbitmq-amqp-dotnet-client.sln b/rabbitmq-amqp-dotnet-client.sln index 5098c1b..6305435 100644 --- a/rabbitmq-amqp-dotnet-client.sln +++ b/rabbitmq-amqp-dotnet-client.sln @@ -35,6 +35,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{91 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuth2", "docs\Examples\OAuth2\OAuth2.csproj", "{C1EA4B66-E60E-4945-A4C6-91B433F9BA65}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchDispositions", "BatchDispositions\BatchDispositions.csproj", "{AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -73,6 +75,10 @@ Global {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Debug|Any CPU.Build.0 = Debug|Any CPU {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.ActiveCfg = Release|Any CPU {C1EA4B66-E60E-4945-A4C6-91B433F9BA65}.Release|Any CPU.Build.0 = Release|Any CPU + {AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -84,5 +90,6 @@ Global {59CB2F07-4A5A-4871-8C97-02EC21C68D6B} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C} {D74F49FC-2C9A-4227-8988-30925C509388} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C} {C1EA4B66-E60E-4945-A4C6-91B433F9BA65} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C} + {AB353D28-5DAC-4CE7-8EF3-1250F9F3CBBD} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C} EndGlobalSection EndGlobal