From a8c7832efefe9ecc80c4992dfb66a0ead0b08bc1 Mon Sep 17 00:00:00 2001 From: Michael Fyffe <6224270+TraGicCode@users.noreply.github.com> Date: Sat, 6 Dec 2025 17:17:25 -0600 Subject: [PATCH 1/2] Add Routing Topology and QueueType to RabbitMQ Transport --- .../Config/RabbitmqTransportConfig.cs | 18 +++++++- .../RabbitMQTransportConfigValidator.cs | 3 ++ .../Factories/RawEndpointFactory.cs | 11 +++-- .../Spectre/AppConfiguration.cs | 10 +++-- website/docs/transports/rabbitmq.md | 42 +++++++++++++++++-- 5 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/BuslyCLI.Console/Config/RabbitmqTransportConfig.cs b/src/BuslyCLI.Console/Config/RabbitmqTransportConfig.cs index c4ffbd5..9e15795 100644 --- a/src/BuslyCLI.Console/Config/RabbitmqTransportConfig.cs +++ b/src/BuslyCLI.Console/Config/RabbitmqTransportConfig.cs @@ -2,15 +2,29 @@ public class RabbitmqTransportConfig : ITransportConfig { + // TODO: Add Support for TLS Client Certificate Authentication + // https://github.com/Particular/NServiceBus.RabbitMQ/blob/master/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs#L69 // TODO: Test TLS Connections to broker using "amqps://your-username:your-password@your.rabbitmq.host:5671/vhost" public string AmqpConnectionString { get; set; } - // TODO: Add Support for TLS Client Certificate Authentication - // https://github.com/Particular/NServiceBus.RabbitMQ/blob/master/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs#L69 + public RabbitmqRoutingTopology RoutingTopology { get; set; } = RabbitmqRoutingTopology.Conventional; + public RabbitmqQueueType QueueType { get; set; } = RabbitmqQueueType.Quorum; public ManagementApi ManagementApi { get; set; } } +public enum RabbitmqRoutingTopology +{ + Conventional, + Direct, +} + +public enum RabbitmqQueueType +{ + Quorum, + Classic, +} + public class ManagementApi { public string Url { get; set; } diff --git a/src/BuslyCLI.Console/Config/Validators/RabbitMQTransportConfigValidator.cs b/src/BuslyCLI.Console/Config/Validators/RabbitMQTransportConfigValidator.cs index b93002a..7d0b905 100644 --- a/src/BuslyCLI.Console/Config/Validators/RabbitMQTransportConfigValidator.cs +++ b/src/BuslyCLI.Console/Config/Validators/RabbitMQTransportConfigValidator.cs @@ -9,6 +9,9 @@ public RabbitMQTransportConfigValidator() RuleFor(x => x.AmqpConnectionString) .NotEmpty(); + RuleFor(x => x.RoutingTopology) + .IsInEnum(); + RuleFor(x => x.ManagementApi) .SetValidator(new ManagementApiConfigValidator()); } diff --git a/src/BuslyCLI.Console/Factories/RawEndpointFactory.cs b/src/BuslyCLI.Console/Factories/RawEndpointFactory.cs index 9b6c6a4..55191b7 100644 --- a/src/BuslyCLI.Console/Factories/RawEndpointFactory.cs +++ b/src/BuslyCLI.Console/Factories/RawEndpointFactory.cs @@ -69,12 +69,17 @@ private TransportDefinition CreateSqlServerTransport(SqlServerTransportConfig sq private RabbitMQTransport CreateRabbitMQTransport(RabbitmqTransportConfig rabbitmqTransportConfig) { - var t = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), rabbitmqTransportConfig.AmqpConnectionString); + var routingTopology = rabbitmqTransportConfig.RoutingTopology switch + { + RabbitmqRoutingTopology.Conventional => RoutingTopology.Conventional(rabbitmqTransportConfig.QueueType == RabbitmqQueueType.Quorum ? QueueType.Quorum : QueueType.Classic), + RabbitmqRoutingTopology.Direct => RoutingTopology.Direct(rabbitmqTransportConfig.QueueType == RabbitmqQueueType.Quorum ? QueueType.Quorum : QueueType.Classic), + _ => throw new Exception("Unknown RabbitMQ routing topology: " + rabbitmqTransportConfig.RoutingTopology) + }; + var t = new RabbitMQTransport(routingTopology, rabbitmqTransportConfig.AmqpConnectionString); if (rabbitmqTransportConfig.ManagementApi != null) { - t.ManagementApiConfiguration = - CreateManagementApiConfig(rabbitmqTransportConfig.ManagementApi); + t.ManagementApiConfiguration = CreateManagementApiConfig(rabbitmqTransportConfig.ManagementApi); } return t; } diff --git a/src/BuslyCLI.Console/Spectre/AppConfiguration.cs b/src/BuslyCLI.Console/Spectre/AppConfiguration.cs index aea9ff8..76db710 100644 --- a/src/BuslyCLI.Console/Spectre/AppConfiguration.cs +++ b/src/BuslyCLI.Console/Spectre/AppConfiguration.cs @@ -3,7 +3,9 @@ using BuslyCLI.Commands.Demo; using BuslyCLI.Commands.Event; using BuslyCLI.Commands.Transport; +using Spectre.Console; using Spectre.Console.Cli; +using YamlDotNet.Core; namespace BuslyCLI.Spectre; @@ -53,9 +55,11 @@ public static Action GetSpectreCommandConfiguration() demo.AddCommand("start") .WithDescription("Start a demo endpoint that can receive any command and a single 'Messages.Events.OrderPlaced' event."); }); -#if DEBUG - config.PropagateExceptions(); -#endif + + config.SetExceptionHandler((ex, _) => + { + AnsiConsole.WriteException(ex, ExceptionFormats.ShortenEverything); + }); }; } } \ No newline at end of file diff --git a/website/docs/transports/rabbitmq.md b/website/docs/transports/rabbitmq.md index 47dd171..488849f 100644 --- a/website/docs/transports/rabbitmq.md +++ b/website/docs/transports/rabbitmq.md @@ -22,10 +22,12 @@ transports: ## `rabbitmq-transport-config` Fields -| Field | Required | Type | Default | Description | -| ------------------------ | -------- | ------ | ------- | ----------------------------------------------------------------- | -| `amqp-connection-string` | **Yes** | string | — | Full AMQP connection string used to connect to RabbitMQ. | -| `management-api` | No | object | — | Optional configuration to connect to the RabbitMQ Management API. | +| Field | Required | Type | Default | Description | +| ------------------------ | -------- | ------ | ------------ | ----------------------------------------------------------------- | +| `amqp-connection-string` | **Yes** | string | — | Full AMQP connection string used to connect to RabbitMQ. | +| `routing-toplogy` | No | string | conventional | Routing toplogy to be used (Conventional or Direct). | +| `queue-type` | No | string | quorum | Type of queues RabbitMQ is using (Quorum or Classic). | +| `management-api` | No | object | — | Optional configuration to connect to the RabbitMQ Management API. | --- @@ -57,6 +59,38 @@ amqp-connection-string: amqps://user:pass@rabbitmq.example.com:5671/my-vhost --- +### `routing-topology` (optional) + +The Routing topology to be used for the transport + +Examples: + +```yaml +routing-toplogy: conventional +``` + +```yaml +routing-toplogy: direct +``` + +--- + +### `queue-type` (optional) + +The type of queues being used. + +Examples: + +```yaml +queue-type: quorum +``` + +```yaml +queue-type: classic +``` + +--- + ### `management-api` (optional) Allows Busly to interact with the RabbitMQ Management API for monitoring or queue management. From 38400d93d571ba1727daab364a0dcafa2907181c Mon Sep 17 00:00:00 2001 From: Michael Fyffe <6224270+TraGicCode@users.noreply.github.com> Date: Sat, 6 Dec 2025 17:21:08 -0600 Subject: [PATCH 2/2] Adding comments that i might need to use later --- src/BuslyCLI.Console/Spectre/AppConfiguration.cs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/BuslyCLI.Console/Spectre/AppConfiguration.cs b/src/BuslyCLI.Console/Spectre/AppConfiguration.cs index 76db710..15c1433 100644 --- a/src/BuslyCLI.Console/Spectre/AppConfiguration.cs +++ b/src/BuslyCLI.Console/Spectre/AppConfiguration.cs @@ -58,8 +58,21 @@ public static Action GetSpectreCommandConfiguration() config.SetExceptionHandler((ex, _) => { + // if (ex.InnerException is OptionsValidationException) + // { + // AnsiConsole.Write(new Markup($"{ConsoleExtensions.ErrorMarkup}{ex.InnerException.Message}")); + // return; + // } + // if (ex is CommandAppException) + // { + // AnsiConsole.Write(new Markup($"{ConsoleExtensions.ErrorMarkup}{ex.Message}")); + // return; + // } + // AnsiConsole.WriteException(ex, ExceptionFormats.ShortenPaths); AnsiConsole.WriteException(ex, ExceptionFormats.ShortenEverything); }); + + }; } } \ No newline at end of file