diff --git a/.reposync.yml b/.reposync.yml index 05ec9e3185..8b13789179 100644 --- a/.reposync.yml +++ b/.reposync.yml @@ -1,2 +1 @@ -exclusions: -- src/NServiceBus.snk + diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index cfe0a35952..bc50b29325 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -38,7 +38,7 @@ - + diff --git a/src/NServiceBus.snk b/src/NServiceBus.snk new file mode 100644 index 0000000000..6fa7ddec10 Binary files /dev/null and b/src/NServiceBus.snk differ diff --git a/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs b/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs index b89291e01c..b21e0b37fd 100644 --- a/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs +++ b/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs @@ -2,6 +2,7 @@ { using System; using System.Diagnostics; + using System.Text; using System.Threading.Tasks; using ServiceControl.Config.Framework; using ServiceControlInstaller.Engine; @@ -21,12 +22,19 @@ protected override async Task PromptForRabbitMqCheck(bool isUpgrade) { var title = isUpgrade ? "UPGRADE WARNING" : "INSTALL WARNING"; var beforeWhat = isUpgrade ? "upgrading" : "installing"; - var message = $"ServiceControl version {Constants.CurrentVersion} requires RabbitMQ broker version 3.10.0 or higher. Also, the stream_queue and quorum_queue feature flags must be enabled on the broker. Please confirm your broker meets the minimum requirements before {beforeWhat}."; var question = "Do you want to proceed?"; var yes = "Yes, my RabbitMQ broker meets the minimum requirements"; var no = "No, cancel the install"; - var continueInstall = await windowManager.ShowYesNoDialog(title, message, question, yes, no); + var message = new StringBuilder(); + message.AppendLine($"ServiceControl version {Constants.CurrentVersion} requires:"); + message.AppendLine("• RabbitMQ broker version 3.10.0 or higher"); + message.AppendLine("• The stream_queue and quorum_queue feature flags must be enabled"); + message.AppendLine($"• The management plugin API must be enabled and accessible. This might require custom settings to be added to the connection string before {beforeWhat}. See the ServiceControl documentation for details."); + message.AppendLine(); + message.AppendLine($"Please confirm your broker meets the minimum requirements before {beforeWhat}."); + + var continueInstall = await windowManager.ShowYesNoDialog(title, message.ToString(), question, yes, no); return continueInstall; } diff --git a/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs b/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs index 4b0b88a9c0..60de4a497d 100644 --- a/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs +++ b/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs @@ -3,5 +3,6 @@ static class AcknowledgementValues { public const string RabbitMQBrokerVersion310 = "RabbitMQBrokerVersion310"; + public const string RabbitMQManagementApi = "RabbitMQManagementApi"; } } diff --git a/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs b/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs index a53cb711a9..9d9ffd27bc 100644 --- a/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs +++ b/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs @@ -71,15 +71,23 @@ protected override Task NotifyForMissingSystemPrerequisites(string missingPrereq protected override Task PromptForRabbitMqCheck(bool isUpgrade) { - if (acknowledgements.Any(ack => ack.Equals(AcknowledgementValues.RabbitMQBrokerVersion310, StringComparison.OrdinalIgnoreCase))) + if (!acknowledgements.Any(ack => ack.Equals(AcknowledgementValues.RabbitMQBrokerVersion310, StringComparison.OrdinalIgnoreCase))) { - return Task.FromResult(true); + var terminateMsg = $"ServiceControl version {Constants.CurrentVersion} requires RabbitMQ broker version 3.10.0 or higher. Also, the stream_queue and quorum_queue feature flags must be enabled on the broker. Use -Acknowledgements {AcknowledgementValues.RabbitMQBrokerVersion310} if you are sure your broker meets these requirements."; + + Terminate(terminateMsg, "Install Error", ErrorCategory.InvalidArgument); + return Task.FromResult(false); } - var terminateMsg = $"ServiceControl version {Constants.CurrentVersion} requires RabbitMQ broker version 3.10.0 or higher. Also, the stream_queue and quorum_queue feature flags must be enabled on the broker. Use -Acknowledgements {AcknowledgementValues.RabbitMQBrokerVersion310} if you are sure your broker meets these requirements."; + if (!acknowledgements.Any(ack => ack.Equals(AcknowledgementValues.RabbitMQManagementApi, StringComparison.OrdinalIgnoreCase))) + { + var terminateMsg = $"ServiceControl version {Constants.CurrentVersion} requires that the management plugin API must be enabled and accessible. This might require custom settings to be added to the connection string. See the ServiceControl documentation for details. Use -Acknowledgements {AcknowledgementValues.RabbitMQManagementApi} if you are sure your broker meets these requirements."; - Terminate(terminateMsg, "Install Error", ErrorCategory.InvalidArgument); - return Task.FromResult(false); + Terminate(terminateMsg, "Install Error", ErrorCategory.InvalidArgument); + return Task.FromResult(false); + } + + return Task.FromResult(true); } protected override Task PromptToStopRunningInstance(BaseService instance) diff --git a/src/ServiceControl.Transports.RabbitMQ/ConnectionConfiguration.cs b/src/ServiceControl.Transports.RabbitMQ/ConnectionConfiguration.cs deleted file mode 100644 index 0f2cb882b5..0000000000 --- a/src/ServiceControl.Transports.RabbitMQ/ConnectionConfiguration.cs +++ /dev/null @@ -1,286 +0,0 @@ -namespace ServiceControl.Transports.RabbitMQ -{ - using System; - using System.Collections.Generic; - using System.Data.Common; - using System.Diagnostics; - using System.IO; - using System.Linq; - using System.Text; - using NServiceBus; - using NServiceBus.Support; - - class ConnectionConfiguration - { - const bool defaultUseTls = false; - const int defaultPort = 5672; - const int defaultTlsPort = 5671; - const string defaultVirtualHost = "/"; - const string defaultUserName = "guest"; - const string defaultPassword = "guest"; - const ushort defaultRequestedHeartbeat = 60; - static readonly TimeSpan defaultRetryDelay = TimeSpan.FromSeconds(10); - const string defaultCertPath = ""; - const string defaultCertPassphrase = null; - - public string Host { get; } - - public int Port { get; } - - public string VirtualHost { get; } - - public string UserName { get; } - - public string Password { get; } - - public TimeSpan RequestedHeartbeat { get; } - - public TimeSpan RetryDelay { get; } - - public bool UseTls { get; } - - public string CertPath { get; } - - public string CertPassphrase { get; } - - public Dictionary ClientProperties { get; } - - ConnectionConfiguration( - string host, - int port, - string virtualHost, - string userName, - string password, - TimeSpan requestedHeartbeat, - TimeSpan retryDelay, - bool useTls, - string certPath, - string certPassphrase, - Dictionary clientProperties) - { - Host = host; - Port = port; - VirtualHost = virtualHost; - UserName = userName; - Password = password; - RequestedHeartbeat = requestedHeartbeat; - RetryDelay = retryDelay; - UseTls = useTls; - CertPath = certPath; - CertPassphrase = certPassphrase; - ClientProperties = clientProperties; - } - - public static ConnectionConfiguration Create(string connectionString, string endpointName) - { - Dictionary dictionary; - var invalidOptionsMessage = new StringBuilder(); - - if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase)) - { - dictionary = ParseAmqpConnectionString(connectionString, invalidOptionsMessage); - } - else - { - dictionary = ParseNServiceBusConnectionString(connectionString, invalidOptionsMessage); - } - - var host = GetValue(dictionary, "host", default); - var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage); - var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage); - var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost); - var userName = GetValue(dictionary, "userName", defaultUserName); - var password = GetValue(dictionary, "password", defaultPassword); - - var requestedHeartbeatSeconds = GetValue(dictionary, "requestedHeartbeat", ushort.TryParse, defaultRequestedHeartbeat, invalidOptionsMessage); - var requestedHeartbeat = TimeSpan.FromSeconds(requestedHeartbeatSeconds); - - var retryDelay = GetValue(dictionary, "retryDelay", TimeSpan.TryParse, defaultRetryDelay, invalidOptionsMessage); - var certPath = GetValue(dictionary, "certPath", defaultCertPath); - var certPassPhrase = GetValue(dictionary, "certPassphrase", defaultCertPassphrase); - - if (invalidOptionsMessage.Length > 0) - { - throw new NotSupportedException(invalidOptionsMessage.ToString().TrimEnd('\r', '\n')); - } - - var nsbVersion = FileVersionInfo.GetVersionInfo(typeof(Endpoint).Assembly.Location); - var nsbFileVersion = $"{nsbVersion.FileMajorPart}.{nsbVersion.FileMinorPart}.{nsbVersion.FileBuildPart}"; - - var rabbitMQVersion = - FileVersionInfo.GetVersionInfo(typeof(ConnectionConfiguration).Assembly.Location); - var rabbitMQFileVersion = $"{rabbitMQVersion.FileMajorPart}.{rabbitMQVersion.FileMinorPart}.{rabbitMQVersion.FileBuildPart}"; - - var applicationNameAndPath = Environment.GetCommandLineArgs()[0]; - var applicationName = Path.GetFileName(applicationNameAndPath); - var applicationPath = Path.GetDirectoryName(applicationNameAndPath); - - var hostname = RuntimeEnvironment.MachineName; - - var clientProperties = new Dictionary - { - { "client_api", "NServiceBus" }, - { "nservicebus_version", nsbFileVersion }, - { "nservicebus.rabbitmq_version", rabbitMQFileVersion }, - { "application", applicationName }, - { "application_location", applicationPath }, - { "machine_name", hostname }, - { "user", userName }, - { "endpoint_name", endpointName }, - }; - - return new ConnectionConfiguration( - host, port, virtualHost, userName, password, requestedHeartbeat, retryDelay, useTls, certPath, certPassPhrase, clientProperties); - } - - static Dictionary ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage) - { - var dictionary = new Dictionary(); - var uri = new Uri(connectionString); - - var usingTls = string.Equals("amqps", uri.Scheme, StringComparison.OrdinalIgnoreCase) ? bool.TrueString : bool.FalseString; - dictionary.Add("useTls", usingTls); - - dictionary.Add("host", uri.Host); - - if (!uri.IsDefaultPort) - { - dictionary.Add("port", uri.Port.ToString()); - } - - if (!string.IsNullOrEmpty(uri.UserInfo)) - { - var userPass = uri.UserInfo.Split(':'); - - if (userPass.Length > 2) - { - invalidOptionsMessage.AppendLine($"Bad user info in AMQP URI: {uri.UserInfo}"); - } - else - { - dictionary.Add("userName", UriDecode(userPass[0])); - - if (userPass.Length == 2) - { - dictionary.Add("password", UriDecode(userPass[1])); - } - } - } - - if (uri.Segments.Length > 2) - { - invalidOptionsMessage.AppendLine($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}"); - } - else if (uri.Segments.Length == 2) - { - dictionary.Add("virtualHost", UriDecode(uri.Segments[1])); - } - - return dictionary; - } - - static Dictionary ParseNServiceBusConnectionString(string connectionString, StringBuilder invalidOptionsMessage) - { - var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString } - .OfType>() - .ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase); - - RegisterDeprecatedSettingsAsInvalidOptions(dictionary, invalidOptionsMessage); - - if (dictionary.TryGetValue("port", out var portValue) && !int.TryParse(portValue, out var port)) - { - invalidOptionsMessage.AppendLine($"'{portValue}' is not a valid Int32 value for the 'port' connection string option."); - } - - if (dictionary.TryGetValue("host", out var value)) - { - var firstHostAndPort = value.Split(',')[0]; - var parts = firstHostAndPort.Split(':'); - var host = parts.ElementAt(0); - - if (host.Length == 0) - { - invalidOptionsMessage.AppendLine("Empty host name in 'host' connection string option."); - } - - dictionary["host"] = host; - - if (parts.Length > 1) - { - if (!int.TryParse(parts[1], out port)) - { - invalidOptionsMessage.AppendLine($"'{parts[1]}' is not a valid Int32 value for the port in the 'host' connection string option."); - } - else - { - dictionary["port"] = port.ToString(); - } - } - } - else - { - invalidOptionsMessage.AppendLine("Invalid connection string. 'host' value must be supplied. e.g: \"host=myServer\""); - } - - return dictionary; - } - - static void RegisterDeprecatedSettingsAsInvalidOptions(Dictionary dictionary, StringBuilder invalidOptionsMessage) - { - if (dictionary.TryGetValue("host", out var value)) - { - var hostsAndPorts = value.Split(','); - - if (hostsAndPorts.Length > 1) - { - invalidOptionsMessage.AppendLine("Multiple hosts are no longer supported. If using RabbitMQ in a cluster, consider using a load balancer to represent the nodes as a single host."); - } - } - - if (dictionary.ContainsKey("dequeuetimeout")) - { - invalidOptionsMessage.AppendLine("The 'DequeueTimeout' connection string option has been removed. Consult the documentation for further information."); - } - - if (dictionary.ContainsKey("maxwaittimeforconfirms")) - { - invalidOptionsMessage.AppendLine("The 'MaxWaitTimeForConfirms' connection string option has been removed. Consult the documentation for further information."); - } - - if (dictionary.ContainsKey("prefetchcount")) - { - invalidOptionsMessage.AppendLine("The 'PrefetchCount' connection string option has been removed. Use 'EndpointConfiguration.UseTransport().PrefetchCount' instead."); - } - - if (dictionary.ContainsKey("usepublisherconfirms")) - { - invalidOptionsMessage.AppendLine("The 'UsePublisherConfirms' connection string option has been removed. Consult the documentation for further information."); - } - } - - static string GetValue(Dictionary dictionary, string key, string defaultValue) - { - return dictionary.TryGetValue(key, out var value) ? value : defaultValue; - } - - static string UriDecode(string value) - { - return Uri.UnescapeDataString(value); - } - - static T GetValue(Dictionary dictionary, string key, Convert convert, T defaultValue, StringBuilder invalidOptionsMessage) - { - if (dictionary.TryGetValue(key, out var value)) - { - if (!convert(value, out defaultValue)) - { - invalidOptionsMessage.AppendLine($"'{value}' is not a valid {typeof(T).Name} value for the '{key}' connection string option."); - } - } - - return defaultValue; - } - - delegate bool Convert(string input, out T output); - } -} diff --git a/src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs b/src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs deleted file mode 100644 index e52f7e1de3..0000000000 --- a/src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs +++ /dev/null @@ -1,96 +0,0 @@ -namespace ServiceControl.Transports.RabbitMQ -{ - using System; - using System.Net.Security; - using System.Security.Authentication; - using System.Security.Cryptography.X509Certificates; - using global::RabbitMQ.Client; - - class ConnectionFactory - { - readonly string endpointName; - readonly global::RabbitMQ.Client.ConnectionFactory connectionFactory; - readonly object lockObject = new object(); - - public ConnectionFactory(string endpointName, ConnectionConfiguration connectionConfiguration, - X509Certificate2Collection clientCertificateCollection, bool disableRemoteCertificateValidation, - bool useExternalAuthMechanism, TimeSpan? heartbeatInterval, TimeSpan? networkRecoveryInterval) - { - if (endpointName is null) - { - throw new ArgumentNullException(nameof(endpointName)); - } - - if (endpointName == string.Empty) - { - throw new ArgumentException("The endpoint name cannot be empty.", nameof(endpointName)); - } - - this.endpointName = endpointName; - - if (connectionConfiguration == null) - { - throw new ArgumentNullException(nameof(connectionConfiguration)); - } - - if (connectionConfiguration.Host == null) - { - throw new ArgumentException("The connectionConfiguration has a null Host.", nameof(connectionConfiguration)); - } - - connectionFactory = new global::RabbitMQ.Client.ConnectionFactory - { - HostName = connectionConfiguration.Host, - Port = connectionConfiguration.Port, - VirtualHost = connectionConfiguration.VirtualHost, - UserName = connectionConfiguration.UserName, - Password = connectionConfiguration.Password, - RequestedHeartbeat = heartbeatInterval ?? connectionConfiguration.RequestedHeartbeat, - NetworkRecoveryInterval = networkRecoveryInterval ?? connectionConfiguration.RetryDelay, - }; - - connectionFactory.Ssl.ServerName = connectionConfiguration.Host; - connectionFactory.Ssl.Certs = clientCertificateCollection; - connectionFactory.Ssl.CertPath = connectionConfiguration.CertPath; - connectionFactory.Ssl.CertPassphrase = connectionConfiguration.CertPassphrase; - connectionFactory.Ssl.Version = SslProtocols.Tls12; - connectionFactory.Ssl.Enabled = connectionConfiguration.UseTls; - - if (disableRemoteCertificateValidation) - { - connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors | - SslPolicyErrors.RemoteCertificateNameMismatch | - SslPolicyErrors.RemoteCertificateNotAvailable; - } - - if (useExternalAuthMechanism) - { - connectionFactory.AuthMechanisms = new[] { new ExternalMechanismFactory() }; - } - - connectionFactory.ClientProperties.Clear(); - - foreach (var item in connectionConfiguration.ClientProperties) - { - connectionFactory.ClientProperties.Add(item.Key, item.Value); - } - } - - public IConnection CreatePublishConnection() => CreateConnection($"{endpointName} Publish", false); - - public IConnection CreateAdministrationConnection() => CreateConnection($"{endpointName} Administration", false); - - public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true) - { - lock (lockObject) - { - connectionFactory.AutomaticRecoveryEnabled = automaticRecoveryEnabled; - connectionFactory.ClientProperties["connected"] = DateTime.UtcNow.ToString("G"); - - var connection = connectionFactory.CreateConnection(connectionName); - - return connection; - } - } - } -} diff --git a/src/ServiceControl.Transports.RabbitMQ/IManagementClientProvider.cs b/src/ServiceControl.Transports.RabbitMQ/IManagementClientProvider.cs new file mode 100644 index 0000000000..ab0bd30b3b --- /dev/null +++ b/src/ServiceControl.Transports.RabbitMQ/IManagementClientProvider.cs @@ -0,0 +1,9 @@ +namespace ServiceControl.Transports.RabbitMQ; + +using System; +using NServiceBus.Transport.RabbitMQ.ManagementApi; + +interface IManagementClientProvider +{ + Lazy GetManagementClient(); +} diff --git a/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs b/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs index e7e97025c7..3c60019abe 100644 --- a/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs @@ -2,18 +2,23 @@ { using System; using System.Collections.Concurrent; - using System.Data.Common; using System.Threading; using System.Threading.Tasks; - using global::RabbitMQ.Client; using NServiceBus.Logging; + using NServiceBus.Transport.RabbitMQ.ManagementApi; class QueueLengthProvider : AbstractQueueLengthProvider { - public QueueLengthProvider(TransportSettings settings, Action store) : base(settings, store) + public QueueLengthProvider(TransportSettings settings, Action store, ITransportCustomization transportCustomization) : base(settings, store) { - queryExecutor = new QueryExecutor(ConnectionString); - queryExecutor.Initialize(); + if (transportCustomization is IManagementClientProvider provider) + { + managementClient = provider.GetManagementClient(); + } + else + { + throw new ArgumentException($"Transport customization does not implement {nameof(IManagementClientProvider)}. Type: {transportCustomization.GetType().Name}", nameof(transportCustomization)); + } } public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) => @@ -75,89 +80,29 @@ async Task FetchQueueLengths(CancellationToken cancellationToken) { foreach (var endpointQueuePair in endpointQueues) { - await queryExecutor.Execute(m => - { - var queueName = endpointQueuePair.Value; - - try - { - var size = (int)m.MessageCount(queueName); - - sizes.AddOrUpdate(queueName, _ => size, (_, __) => size); - } - catch (Exception e) - { - Logger.Warn($"Error querying queue length for {queueName}", e); - } - }, cancellationToken); - } - } + var queueName = endpointQueuePair.Value; - readonly QueryExecutor queryExecutor; - static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200); - - readonly ConcurrentDictionary endpointQueues = new ConcurrentDictionary(); - readonly ConcurrentDictionary sizes = new ConcurrentDictionary(); - - static readonly ILog Logger = LogManager.GetLogger(); - - class QueryExecutor(string connectionString) : IDisposable - { - - public void Initialize() - { - var connectionConfiguration = - ConnectionConfiguration.Create(connectionString, "ServiceControl.Monitoring"); - - var dbConnectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = connectionString }; - - connectionFactory = new ConnectionFactory("ServiceControl.Monitoring", - connectionConfiguration, - null, //providing certificates is not supported yet - dbConnectionStringBuilder.GetBooleanValue("DisableRemoteCertificateValidation"), - dbConnectionStringBuilder.GetBooleanValue("UseExternalAuthMechanism"), - null, // value would come from config API in actual transport - null); // value would come from config API in actual transport - } - - public async Task Execute(Action action, CancellationToken cancellationToken = default) - { try { - connection ??= connectionFactory.CreateConnection("queue length monitor"); - - //Connection implements reconnection logic - while (!connection.IsOpen) - { - await Task.Delay(ReconnectionDelay, cancellationToken); - } - - if (model == null || model.IsClosed) - { - model?.Dispose(); - - model = connection.CreateModel(); - } + var queue = await managementClient.Value.GetQueue(queueName, cancellationToken); - action(model); - } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - // no-op + var size = queue.MessageCount; + sizes.AddOrUpdate(queueName, _ => size, (_, _) => size); } catch (Exception e) { - Logger.Warn("Error querying queue length.", e); + Logger.Warn($"Error querying queue length for {queueName}", e); } } + } - public void Dispose() => connection?.Dispose(); + static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200); - IConnection connection; - IModel model; - ConnectionFactory connectionFactory; + readonly ConcurrentDictionary endpointQueues = new(); + readonly ConcurrentDictionary sizes = new(); - static readonly TimeSpan ReconnectionDelay = TimeSpan.FromSeconds(5); - } + static readonly ILog Logger = LogManager.GetLogger(); + + readonly Lazy managementClient; } } diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueue.cs similarity index 50% rename from src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs rename to src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueue.cs index 742efa89a1..542e94ef7e 100644 --- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueue.cs @@ -2,22 +2,27 @@ namespace ServiceControl.Transports.RabbitMQ; using System.Collections.Generic; -using System.Text.Json; +using NServiceBus.Transport.RabbitMQ.ManagementApi; using ServiceControl.Transports.BrokerThroughput; -public class RabbitMQBrokerQueueDetails(JsonElement token) : IBrokerQueue +class RabbitMQBrokerQueue(Queue queue) : IBrokerQueue { - public string QueueName { get; } = token.GetProperty("name").GetString()!; + public string QueueName { get; } = queue.Name; + public string SanitizedName => QueueName; - public string Scope => VHost; - public string VHost { get; } = token.GetProperty("vhost").GetString()!; + + public string? Scope => null; + public List EndpointIndicators { get; } = []; - long? AckedMessages { get; set; } = FromToken(token); - long Baseline { get; set; } = FromToken(token) ?? 0; - public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading) + long? AckedMessages { get; set; } = queue.MessageStats?.Ack; + + long Baseline { get; set; } = queue.MessageStats?.Ack ?? 0; + + public long CalculateThroughputFrom(RabbitMQBrokerQueue newReading) { var newlyAckedMessages = 0L; + if (newReading.AckedMessages is null) { return newlyAckedMessages; @@ -28,13 +33,9 @@ public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading) newlyAckedMessages = newReading.AckedMessages.Value - Baseline; AckedMessages += newlyAckedMessages; } + Baseline = newReading.AckedMessages.Value; return newlyAckedMessages; } - - static long? FromToken(JsonElement jsonElement) => - jsonElement.TryGetProperty("message_stats", out var stats) && stats.TryGetProperty("ack", out var val) - ? val.GetInt64() - : null; } \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs index bc8db946f3..0a096c5222 100644 --- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs @@ -5,34 +5,55 @@ using BrokerThroughput; using Microsoft.Extensions.DependencyInjection; using NServiceBus; + using NServiceBus.Transport.RabbitMQ.ManagementApi; - public abstract class RabbitMQConventionalRoutingTransportCustomization(QueueType queueType) - : TransportCustomization + public abstract class RabbitMQConventionalRoutingTransportCustomization(NServiceBus.QueueType queueType) : TransportCustomization, IManagementClientProvider { - protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { } + RabbitMQTransport transport; - protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { } + Lazy IManagementClientProvider.GetManagementClient() + { + return new(() => Get()); + + ManagementClient Get() + { + if (transport is null) + { + throw new InvalidOperationException("Management client not available because a CustomizeTransport method has not been called first."); + } - protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { } + // Since some tests don't actually start an endpoint, this is needed to ensure a management client is available + if (transport.ManagementClient is null) + { + return new ManagementClient(transport.ConnectionConfiguration, transport.ManagementApiConfiguration); + } + + return transport.ManagementClient; + } + } + + protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition; + + protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition; + + protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition; protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) { - if (transportSettings.ConnectionString == null) + if (transportSettings.ConnectionString is null) { throw new InvalidOperationException("Connection string not configured"); } var transport = new RabbitMQTransport(RoutingTopology.Conventional(queueType), transportSettings.ConnectionString, enableDelayedDelivery: false); transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly; + transport.ApplySettingsFromConnectionString(transportSettings.ConnectionString); return transport; } - protected override void AddTransportForPrimaryCore(IServiceCollection services, - TransportSettings transportSettings) - { - services.AddSingleton(); - } + protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings) + => services.AddSingleton(); protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings) { diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs index c4e09725dd..2d497877d0 100644 --- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs @@ -5,36 +5,55 @@ using BrokerThroughput; using Microsoft.Extensions.DependencyInjection; using NServiceBus; + using NServiceBus.Transport.RabbitMQ.ManagementApi; - public abstract class RabbitMQDirectRoutingTransportCustomization : TransportCustomization + public abstract class RabbitMQDirectRoutingTransportCustomization(NServiceBus.QueueType queueType) : TransportCustomization, IManagementClientProvider { - readonly QueueType queueType; + RabbitMQTransport transport; - protected RabbitMQDirectRoutingTransportCustomization(QueueType queueType) => this.queueType = queueType; + Lazy IManagementClientProvider.GetManagementClient() + { + return new(() => Get()); + + ManagementClient Get() + { + if (transport is null) + { + throw new InvalidOperationException("Management client not available because a CustomizeTransport method has not been called first."); + } + + // Since some tests don't actually start an endpoint, this is needed to ensure a management client is available + if (transport.ManagementClient is null) + { + return new ManagementClient(transport.ConnectionConfiguration, transport.ManagementApiConfiguration); + } + + return transport.ManagementClient; + } + } + + protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition; - protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { } + protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition; - protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { } + protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition; - protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { } protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) { - if (transportSettings.ConnectionString == null) + if (transportSettings.ConnectionString is null) { throw new InvalidOperationException("Connection string not configured"); } var transport = new RabbitMQTransport(RoutingTopology.Direct(queueType, routingKeyConvention: type => type.FullName.Replace(".", "-")), transportSettings.ConnectionString, enableDelayedDelivery: false); transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly; + transport.ApplySettingsFromConnectionString(transportSettings.ConnectionString); return transport; } - protected override void AddTransportForPrimaryCore(IServiceCollection services, - TransportSettings transportSettings) - { - services.AddSingleton(); - } + protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings) + => services.AddSingleton(); protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings) { diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs index 9bacec74c3..8367d46bcb 100644 --- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs @@ -7,122 +7,57 @@ namespace ServiceControl.Transports.RabbitMQ; using System.Linq; using System.Net; using System.Net.Http; -using System.Net.Http.Json; using System.Runtime.CompilerServices; -using System.Text.Json; -using System.Text.Json.Nodes; using System.Threading; using System.Threading.Tasks; -using System.Web; using Microsoft.Extensions.Logging; +using NServiceBus.Transport.RabbitMQ.ManagementApi; using Polly; using Polly.Retry; using ServiceControl.Transports.BrokerThroughput; public class RabbitMQQuery : BrokerThroughputQuery { - HttpClient? httpClient; - readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder() - .AddRetry(new RetryStrategyOptions()) // Add retry using the default options - .AddTimeout(TimeSpan.FromMinutes(2)) // Add timeout if it keeps failing - .Build(); - readonly ILogger logger; readonly TimeProvider timeProvider; - readonly ConnectionConfiguration connectionConfiguration; + readonly Lazy managementClient; - public RabbitMQQuery(ILogger logger, - TimeProvider timeProvider, - TransportSettings transportSettings) : base(logger, "RabbitMQ") - { - this.logger = logger; - this.timeProvider = timeProvider; - - connectionConfiguration = ConnectionConfiguration.Create(transportSettings.ConnectionString, string.Empty); - } + readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions()) // Add retry using the default options + .AddTimeout(TimeSpan.FromMinutes(2)) // Add timeout if it keeps failing + .Build(); - protected override void InitializeCore(ReadOnlyDictionary settings) + public RabbitMQQuery(ILogger logger, TimeProvider timeProvider, ITransportCustomization transportCustomization) : base(logger, "RabbitMQ") { - if (!settings.TryGetValue(RabbitMQSettings.UserName, out string? username) || - string.IsNullOrEmpty(username)) - { - logger.LogInformation("Using username from connectionstring"); - username = connectionConfiguration.UserName; - Diagnostics.AppendLine( - $"Username not set, defaulted to using \"{username}\" username from the ConnectionString used by instance"); - } - else - { - Diagnostics.AppendLine($"Username set to \"{username}\""); - } - - if (!settings.TryGetValue(RabbitMQSettings.Password, out string? password) || - string.IsNullOrEmpty(password)) - { - logger.LogInformation("Using password from connectionstring"); - password = connectionConfiguration.Password; - Diagnostics.AppendLine( - "Password not set, defaulted to using password from the ConnectionString used by instance"); - } - else - { - Diagnostics.AppendLine("Password set"); - } - - var defaultCredential = new NetworkCredential(username, password); + this.timeProvider = timeProvider; - if (!settings.TryGetValue(RabbitMQSettings.API, out string? apiUrl) || - string.IsNullOrEmpty(apiUrl)) + if (transportCustomization is IManagementClientProvider provider) { - apiUrl = - $"{(connectionConfiguration.UseTls ? $"https://{connectionConfiguration.Host}:15671" : $"http://{connectionConfiguration.Host}:15672")}"; - Diagnostics.AppendLine( - $"RabbitMQ API Url not set, defaulted to using \"{apiUrl}\" from the ConnectionString used by instance"); + managementClient = provider.GetManagementClient(); } else { - Diagnostics.AppendLine($"RabbitMQ API Url set to \"{apiUrl}\""); - if (!Uri.TryCreate(apiUrl, UriKind.Absolute, out _)) - { - InitialiseErrors.Add("API url configured is invalid"); - } - } - - if (InitialiseErrors.Count == 0) - { - // ideally we would use the HttpClientFactory, but it would be a bit more involved to set that up - // so for now we are using a virtual method that can be overriden in tests - // https://github.com/Particular/ServiceControl/issues/4493 - httpClient = CreateHttpClient(defaultCredential, apiUrl); + throw new ArgumentException($"Transport customization does not implement {nameof(IManagementClientProvider)}. Type: {transportCustomization.GetType().Name}", nameof(transportCustomization)); } } - protected virtual HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => - new(new SocketsHttpHandler - { - Credentials = defaultCredential, - PooledConnectionLifetime = TimeSpan.FromMinutes(2) - }) - { BaseAddress = new Uri(apiUrl) }; - - public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue, - DateOnly startDate, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue, DateOnly startDate, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var queue = (RabbitMQBrokerQueueDetails)brokerQueue; - var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}"; + var rabbitBrokerQueue = (RabbitMQBrokerQueue)brokerQueue; + var queue = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(rabbitBrokerQueue.QueueName, token), cancellationToken); + var newReading = new RabbitMQBrokerQueue(queue); - logger.LogDebug($"Querying {url}"); - var newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync(url, token)), cancellationToken); - _ = queue.CalculateThroughputFrom(newReading); + _ = rabbitBrokerQueue.CalculateThroughputFrom(newReading); // looping for 24hrs, in 4 increments of 15 minutes for (var i = 0; i < 24 * 4; i++) { await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken); - logger.LogDebug($"Querying {url}"); - newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync(url, token)), cancellationToken); - var newTotalThroughput = queue.CalculateThroughputFrom(newReading); + queue = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(rabbitBrokerQueue.QueueName, token), cancellationToken); + newReading = new RabbitMQBrokerQueue(queue); + + var newTotalThroughput = rabbitBrokerQueue.CalculateThroughputFrom(newReading); + yield return new QueueThroughput { DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), @@ -131,86 +66,59 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro } } - async Task<(string rabbitVersion, string managementVersion)> GetRabbitDetails(bool skipResiliencePipeline, CancellationToken cancellationToken) - { - var overviewUrl = "/api/overview"; - - JsonObject obj; - - if (skipResiliencePipeline) - { - obj = (await httpClient!.GetFromJsonAsync(overviewUrl, cancellationToken))!; - } - else - { - obj = (await pipeline.ExecuteAsync(async token => - await httpClient!.GetFromJsonAsync(overviewUrl, token), cancellationToken))!; - } - - var statsDisabled = obj["disable_stats"]?.GetValue() ?? false; - - if (statsDisabled) - { - throw new Exception("The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker."); - } - - var rabbitVersion = obj["rabbitmq_version"] ?? obj["product_version"]; - var mgmtVersion = obj["management_version"]; - - return (rabbitVersion?.GetValue() ?? "Unknown", mgmtVersion?.GetValue() ?? "Unknown"); - } - - public override async IAsyncEnumerable GetQueueNames( - [EnumeratorCancellation] CancellationToken cancellationToken) + public override async IAsyncEnumerable GetQueueNames([EnumeratorCancellation] CancellationToken cancellationToken) { var page = 1; bool morePages; - var vHosts = new HashSet(StringComparer.CurrentCultureIgnoreCase); - (string rabbitVersion, string managementVersion) = await GetRabbitDetails(false, cancellationToken); - Data["RabbitMQVersion"] = rabbitVersion; - Data["RabbitMQManagementVersionVersion"] = managementVersion; + await GetBrokerDetails(cancellationToken); do { - (var queues, morePages) = await GetPage(page, cancellationToken); + (var queues, morePages) = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueues(page, 500, token), cancellationToken); - if (queues != null) + foreach (var queue in queues) { - foreach (var rabbitMQQueueDetails in queues) + if (queue.Name.StartsWith("nsb.delay-level-") || + queue.Name.StartsWith("nsb.v2.delay-level-") || + queue.Name.StartsWith("nsb.v2.verify-")) { - if (rabbitMQQueueDetails.QueueName.StartsWith("nsb.delay-level-") || - rabbitMQQueueDetails.QueueName.StartsWith("nsb.v2.delay-level-") || - rabbitMQQueueDetails.QueueName.StartsWith("nsb.v2.verify-")) - { - continue; - } - vHosts.Add(rabbitMQQueueDetails.VHost); - await AddAdditionalQueueDetails(rabbitMQQueueDetails, cancellationToken); - yield return rabbitMQQueueDetails; + continue; } + + var brokerQueue = new RabbitMQBrokerQueue(queue); + await AddEndpointIndicators(brokerQueue, cancellationToken); + yield return brokerQueue; } page++; } while (morePages); + } + + async Task GetBrokerDetails(CancellationToken cancellationToken) + { + var overview = await pipeline.ExecuteAsync(async async => await managementClient.Value.GetOverview(cancellationToken), cancellationToken); - ScopeType = vHosts.Count > 1 ? "VirtualHost" : null; + if (overview.DisableStats) + { + throw new Exception(disableStatsErrorMessage); + } + + Data["RabbitMQVersion"] = overview.BrokerVersion ?? "Unknown"; } - async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, CancellationToken cancellationToken) + async Task AddEndpointIndicators(RabbitMQBrokerQueue brokerQueue, CancellationToken cancellationToken) { try { - var bindingsUrl = $"/api/queues/{HttpUtility.UrlEncode(brokerQueue.VHost)}/{HttpUtility.UrlEncode(brokerQueue.QueueName)}/bindings"; - var bindings = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(bindingsUrl, token), cancellationToken); - var conventionalBindingFound = bindings?.Any(binding => binding!["source"]?.GetValue() == brokerQueue.QueueName - && binding["vhost"]?.GetValue() == brokerQueue.VHost - && binding["destination"]?.GetValue() == brokerQueue.QueueName - && binding["destination_type"]?.GetValue() == "queue" - && binding["routing_key"]?.GetValue() == string.Empty - && binding["properties_key"]?.GetValue() == "~") ?? false; + var bindings = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForQueue(brokerQueue.QueueName, token), cancellationToken); - if (conventionalBindingFound) + // Check if conventional binding is found + if (bindings.Any(binding => binding.Source == brokerQueue.QueueName + && binding.Destination == brokerQueue.QueueName + && binding.DestinationType == "queue" + && binding.RoutingKey == string.Empty + && binding.PropertiesKey == "~")) { brokerQueue.EndpointIndicators.Add("ConventionalTopologyBinding"); } @@ -222,20 +130,13 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can try { - var exchangeUrl = $"/api/exchanges/{HttpUtility.UrlEncode(brokerQueue.VHost)}/{HttpUtility.UrlEncode(brokerQueue.QueueName)}/bindings/destination"; - var bindings = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(exchangeUrl, token), cancellationToken); - var delayBindingFound = bindings?.Any(binding => - { - var source = binding!["source"]?.GetValue(); - - return source is "nsb.v2.delay-delivery" or "nsb.delay-delivery" - && binding["vhost"]?.GetValue() == brokerQueue.VHost - && binding["destination"]?.GetValue() == brokerQueue.QueueName - && binding["destination_type"]?.GetValue() == "exchange" - && binding["routing_key"]?.GetValue() == $"#.{brokerQueue.QueueName}"; - }) ?? false; + var bindings = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForExchange(brokerQueue.QueueName, token), cancellationToken); - if (delayBindingFound) + // Check if delayed binding is found + if (bindings.Any(binding => binding.Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery" + && binding.Destination == brokerQueue.QueueName + && binding.DestinationType == "exchange" + && binding.RoutingKey == $"#.{brokerQueue.QueueName}")) { brokerQueue.EndpointIndicators.Add("DelayBinding"); } @@ -246,74 +147,29 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can } } - public async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken) - { - var url = $"/api/queues/{HttpUtility.UrlEncode(connectionConfiguration.VirtualHost)}?page={page}&page_size=500&name=&use_regex=false&pagination=true"; - - var container = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(url, token), cancellationToken); - switch (container) - { - case JsonObject obj: - { - var pageCount = obj["page_count"]!.GetValue(); - var pageReturned = obj["page"]!.GetValue(); - - if (obj["items"] is not JsonArray items) - { - return (null, false); - } - - return (MaterializeQueueDetails(items), pageCount > pageReturned); - } - // Older versions of RabbitMQ API did not have paging and returned the array of items directly - case JsonArray arr: - { - return (MaterializeQueueDetails(arr), false); - } - default: - throw new Exception("Was not able to get list of queues from RabbitMQ broker."); - } - } - - static RabbitMQBrokerQueueDetails[] MaterializeQueueDetails(JsonArray items) - { - // It is not possible to directly operated on the JsonNode. When the JsonNode is a JObject - // and the indexer is access the internal dictionary is initialized which can cause key not found exceptions - // when the payload contains the same key multiple times (which happened in the past). - var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!.Deserialize())).ToArray(); - return queues; - } + public override KeyDescriptionPair[] Settings => []; - public override KeyDescriptionPair[] Settings => - [ - new KeyDescriptionPair(RabbitMQSettings.API, RabbitMQSettings.APIDescription), - new KeyDescriptionPair(RabbitMQSettings.UserName, RabbitMQSettings.UserNameDescription), - new KeyDescriptionPair(RabbitMQSettings.Password, RabbitMQSettings.PasswordDescription) - ]; - - protected override async Task<(bool Success, List Errors)> TestConnectionCore( - CancellationToken cancellationToken) + protected override async Task<(bool Success, List Errors)> TestConnectionCore(CancellationToken cancellationToken) { try { - await GetRabbitDetails(true, cancellationToken); + var overview = await managementClient.Value.GetOverview(cancellationToken); + + if (overview.DisableStats) + { + return (false, [disableStatsErrorMessage]); + } + + return (true, []); } - catch (HttpRequestException e) + catch (HttpRequestException ex) { - throw new Exception($"Failed to connect to '{httpClient!.BaseAddress}'", e); + throw new Exception($"Failed to connect to RabbitMQ management API", ex); } - - return (true, []); } - public static class RabbitMQSettings - { - public static readonly string API = "RabbitMQ/ApiUrl"; - public static readonly string APIDescription = "RabbitMQ management URL"; - public static readonly string UserName = "RabbitMQ/UserName"; - public static readonly string UserNameDescription = "Username to access the RabbitMQ management interface"; - public static readonly string Password = "RabbitMQ/Password"; - public static readonly string PasswordDescription = "Password to access the RabbitMQ management interface"; - } + protected override void InitializeCore(ReadOnlyDictionary settings) => Diagnostics.AppendLine("Using settings from connection string"); + + const string disableStatsErrorMessage = "The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker."; } diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs new file mode 100644 index 0000000000..ebf285839c --- /dev/null +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs @@ -0,0 +1,46 @@ +namespace ServiceControl.Transports.RabbitMQ; + +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Linq; +using NServiceBus; + +static class RabbitMQTransportExtensions +{ + public static void ApplySettingsFromConnectionString(this RabbitMQTransport transport, string connectionString) + { + if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase)) + { + return; + } + + var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString } + .OfType>() + .ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase); + + if (dictionary.TryGetValue("ValidateDeliveryLimits", out var validateDeliveryLimitsString)) + { + _ = bool.TryParse(validateDeliveryLimitsString, out var validateDeliveryLimits); + transport.ValidateDeliveryLimits = validateDeliveryLimits; + } + + dictionary.TryGetValue("ManagementApiUrl", out var url); + dictionary.TryGetValue("ManagementApiUserName", out var userName); + dictionary.TryGetValue("ManagementApiPassword", out var password); + + transport.ManagementApiConfiguration = ManagementApiConfiguration.Create(url, userName, password); + + if (dictionary.TryGetValue("DisableRemoteCertificateValidation", out var disableRemoteCertificateValidationString)) + { + _ = bool.TryParse(disableRemoteCertificateValidationString, out var disableRemoteCertificateValidation); + transport.ValidateRemoteCertificate = !disableRemoteCertificateValidation; + } + + if (dictionary.TryGetValue("UseExternalAuthMechanism", out var useExternalAuthMechanismString)) + { + _ = bool.TryParse(useExternalAuthMechanismString, out var useExternalAuthMechanism); + transport.UseExternalAuthMechanism = useExternalAuthMechanism; + } + } +} diff --git a/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj b/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj index b04877eea0..3450935934 100644 --- a/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj +++ b/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj @@ -2,6 +2,8 @@ net8.0 + true + ..\NServiceBus.snk true diff --git a/src/ServiceControl.Transports.RabbitMQ/TransportConfigurationExtensions.cs b/src/ServiceControl.Transports.RabbitMQ/TransportConfigurationExtensions.cs deleted file mode 100644 index ee10f15afb..0000000000 --- a/src/ServiceControl.Transports.RabbitMQ/TransportConfigurationExtensions.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace ServiceControl.Transports.RabbitMQ -{ - using NServiceBus; - using System; - using System.Data.Common; - - static class TransportConfigurationExtensions - { - public static void ApplyConnectionString(this TransportExtensions transport, string connectionString) - { - if (!connectionString.StartsWith("amqp", StringComparison.InvariantCultureIgnoreCase)) - { - var builder = new DbConnectionStringBuilder { ConnectionString = connectionString }; - - if (builder.GetBooleanValue("DisableRemoteCertificateValidation")) - { - transport.DisableRemoteCertificateValidation(); - } - - if (builder.GetBooleanValue("UseExternalAuthMechanism")) - { - transport.UseExternalAuthMechanism(); - } - } - - transport.ConnectionString(connectionString); - } - - public static bool GetBooleanValue(this DbConnectionStringBuilder dbConnectionStringBuilder, string key) - { - if (!dbConnectionStringBuilder.TryGetValue(key, out var rawValue)) - { - return false; - } - - if (!bool.TryParse(rawValue.ToString(), out var value)) - { - throw new Exception($"Can't parse key '{key}'. '{rawValue}' is not a valid boolean value."); - } - - return value; - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQ/transport.manifest b/src/ServiceControl.Transports.RabbitMQ/transport.manifest index 2442343847..7203eb3f81 100644 --- a/src/ServiceControl.Transports.RabbitMQ/transport.manifest +++ b/src/ServiceControl.Transports.RabbitMQ/transport.manifest @@ -27,7 +27,7 @@ "DisplayName": "RabbitMQ - Conventional routing topology (quorum queues)", "AssemblyName": "ServiceControl.Transports.RabbitMQ", "TypeName": "ServiceControl.Transports.RabbitMQ.RabbitMQQuorumConventionalRoutingTransportCustomization, ServiceControl.Transports.RabbitMQ", - "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=", + "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=;ValidateDeliveryLimits=;ManagementApiUrl=;ManagementApiUserName=;ManagementApiPassword=", "AvailableInSCMU": true, "Aliases": [ "ServiceControl.Transports.RabbitMQ.QuorumConventialRoutingTopologyRabbitMQTransport, ServiceControl.Transports.RabbitMQ" @@ -38,7 +38,7 @@ "DisplayName": "RabbitMQ - Direct routing topology (quorum queues)", "AssemblyName": "ServiceControl.Transports.RabbitMQ", "TypeName": "ServiceControl.Transports.RabbitMQ.RabbitMQQuorumDirectRoutingTransportCustomization, ServiceControl.Transports.RabbitMQ", - "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=", + "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=;ValidateDeliveryLimits=;ManagementApiUrl=;ManagementApiUserName=;ManagementApiPassword=", "AvailableInSCMU": true, "Aliases": [ "ServiceControl.Transports.RabbitMQ.QuorumDirectRoutingTopologyRabbitMQTransport, ServiceControl.Transports.RabbitMQ" diff --git a/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs b/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs index 062d498524..25116c8a47 100644 --- a/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs +++ b/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs @@ -8,10 +8,11 @@ namespace ServiceControl.Transport.Tests; using System.Threading.Tasks; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Time.Testing; +using NServiceBus; using NUnit.Framework; +using ServiceControl.Transports.BrokerThroughput; using Transports; using Transports.RabbitMQ; -using ServiceControl.Transports.BrokerThroughput; [TestFixture] class RabbitMQQueryTests : TransportTestFixture @@ -20,25 +21,30 @@ class RabbitMQQueryTests : TransportTestFixture public async Task GetQueueNames_FindsQueues() { using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - CancellationToken token = cancellationTokenSource.Token; + var token = cancellationTokenSource.Token; + var provider = new FakeTimeProvider(); + var transportSettings = new TransportSettings { ConnectionString = configuration.ConnectionString, - MaxConcurrency = 1, EndpointName = Guid.NewGuid().ToString("N") }; - var query = new RabbitMQQuery(NullLogger.Instance, provider, transportSettings); - string[] additionalQueues = Enumerable.Range(1, 10).Select(i => $"myqueue{i}").ToArray(); + + configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings); + + var additionalQueues = Enumerable.Range(1, 10).Select(i => $"myqueue{i}").ToArray(); await configuration.TransportCustomization.ProvisionQueues(transportSettings, additionalQueues); + var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization); query.Initialize(ReadOnlyDictionary.Empty); var queueNames = new List(); + await foreach (IBrokerQueue queueName in query.GetQueueNames(token)) { queueNames.Add(queueName); - Assert.That(queueName.Scope, Is.EqualTo("/")); + if (queueName.QueueName == transportSettings.EndpointName) { Assert.That(queueName.EndpointIndicators, Has.Member("ConventionalTopologyBinding")); diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithInvalidSettings.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithInvalidSettings.approved.txt deleted file mode 100644 index 5790b9e0da..0000000000 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithInvalidSettings.approved.txt +++ /dev/null @@ -1,7 +0,0 @@ -Connection test to RabbitMQ failed: -Failed to connect to 'http://localhost:12345/' - -Connection attempted with the following settings: -Username not set, defaulted to using "xxxxx" username from the ConnectionString used by instance -Password not set, defaulted to using password from the ConnectionString used by instance -RabbitMQ API Url set to "http://localhost:12345" diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithValidSettings.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithValidSettings.approved.txt deleted file mode 100644 index f64695447c..0000000000 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithValidSettings.approved.txt +++ /dev/null @@ -1,6 +0,0 @@ -Connection test to RabbitMQ was successful - -Connection settings used: -Username not set, defaulted to using "xxxxx" username from the ConnectionString used by instance -Password not set, defaulted to using password from the ConnectionString used by instance -RabbitMQ API Url not set, defaulted to using "xxxx" from the ConnectionString used by instance diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt deleted file mode 100644 index 063ba3dd64..0000000000 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt +++ /dev/null @@ -1,23 +0,0 @@ -[ - { - "QueueName": "queue1", - "SanitizedName": "queue1", - "Scope": "vhost1", - "VHost": "vhost1", - "EndpointIndicators": [] - }, - { - "QueueName": "queue2", - "SanitizedName": "queue2", - "Scope": "vhost2", - "VHost": "vhost2", - "EndpointIndicators": [] - }, - { - "QueueName": "queue3", - "SanitizedName": "queue3", - "Scope": "vhost1", - "VHost": "vhost1", - "EndpointIndicators": [] - } -] \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt deleted file mode 100644 index 016fd18d12..0000000000 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt +++ /dev/null @@ -1,16 +0,0 @@ -[ - { - "QueueName": "queue1", - "SanitizedName": "queue1", - "Scope": "vhost1", - "VHost": "vhost1", - "EndpointIndicators": [] - }, - { - "QueueName": "queue2", - "SanitizedName": "queue2", - "Scope": "vhost2", - "VHost": "vhost2", - "EndpointIndicators": [] - } -] \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs index 91b3a029ce..38a8b6f6ca 100644 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs +++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs @@ -3,75 +3,65 @@ namespace ServiceControl.Transport.Tests; using System; using System.Collections.Generic; using System.Collections.ObjectModel; -using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Time.Testing; +using NServiceBus; using NUnit.Framework; -using Particular.Approvals; +using ServiceControl.Transports.BrokerThroughput; using Transports; using Transports.RabbitMQ; -using ServiceControl.Transports.BrokerThroughput; [TestFixture] class RabbitMQQueryTests : TransportTestFixture { - FakeTimeProvider provider; - TransportSettings transportSettings; - RabbitMQQuery query; - - [SetUp] - public void Initialise() - { - provider = new(); - provider.SetUtcNow(DateTimeOffset.UtcNow); - transportSettings = new TransportSettings - { - ConnectionString = configuration.ConnectionString, - MaxConcurrency = 1, - EndpointName = Guid.NewGuid().ToString("N") - }; - query = new RabbitMQQuery(NullLogger.Instance, provider, transportSettings); - } - [Test] public async Task TestConnectionWithInvalidSettings() { using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var token = cancellationTokenSource.Token; - var dictionary = new Dictionary + var provider = new FakeTimeProvider(DateTimeOffset.UtcNow); + + var transportSettings = new TransportSettings { - { RabbitMQQuery.RabbitMQSettings.API, "http://localhost:12345" } + ConnectionString = configuration.ConnectionString + ";ManagementApiUrl=http://localhost:12345", + EndpointName = Guid.NewGuid().ToString("N") }; - query.Initialize(new ReadOnlyDictionary(dictionary)); + + configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings); + + var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization); + query.Initialize(ReadOnlyDictionary.Empty); + (bool success, _, string diagnostics) = await query.TestConnection(cancellationTokenSource.Token); Assert.That(success, Is.False); - Approver.Verify(diagnostics, - s => Regex.Replace(s, "defaulted to using \"\\w*\" username", "defaulted to using \"xxxxx\" username", - RegexOptions.Multiline)); } [Test] public async Task TestConnectionWithValidSettings() { using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var token = cancellationTokenSource.Token; + var provider = new FakeTimeProvider(DateTimeOffset.UtcNow); + + var transportSettings = new TransportSettings + { + ConnectionString = configuration.ConnectionString, + EndpointName = Guid.NewGuid().ToString("N") + }; + + configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings); + + var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization); query.Initialize(ReadOnlyDictionary.Empty); + (bool success, _, string diagnostics) = await query.TestConnection(cancellationTokenSource.Token); Assert.That(success, Is.True); - Approver.Verify(diagnostics, - s => - { - s = Regex.Replace(s, - "RabbitMQ API Url not set, defaulted to using \"http://[\\w.]*:15672\" from the ConnectionString used by instance", - "RabbitMQ API Url not set, defaulted to using \"xxxx\" from the ConnectionString used by instance", - RegexOptions.Multiline); - return Regex.Replace(s, "defaulted to using \"\\w*\" username", "defaulted to using \"xxxxx\" username", - RegexOptions.Multiline); - }); } [Test] @@ -79,10 +69,21 @@ public async Task RunScenario() { // We need to wait a bit of time, because the scenario running takes on average 1 sec per run. using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(3)); - CancellationToken token = cancellationTokenSource.Token; + var token = cancellationTokenSource.Token; + + var provider = new FakeTimeProvider(DateTimeOffset.UtcNow); + + var transportSettings = new TransportSettings + { + ConnectionString = configuration.ConnectionString, + EndpointName = Guid.NewGuid().ToString("N") + }; + + configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings); await CreateTestQueue(transportSettings.EndpointName); + var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization); query.Initialize(ReadOnlyDictionary.Empty); var queueNames = new List(); @@ -123,7 +124,7 @@ public async Task RunScenario() reset.Set(); await runScenarioAndAdvanceTime.WaitAsync(token); - // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion. + // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion. Assert.That(total, Is.GreaterThan(numMessagesToIngest)); } } \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs deleted file mode 100644 index 9e706832b9..0000000000 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs +++ /dev/null @@ -1,148 +0,0 @@ -namespace ServiceControl.Transport.Tests; - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging.Abstractions; -using Microsoft.Extensions.Time.Testing; -using NUnit.Framework; -using Transports; -using Transports.RabbitMQ; -using System.Net.Http; -using Particular.Approvals; -using System.Collections.ObjectModel; -using System.Net; - -[TestFixture] -class RabbitMQQuery_ResponseParsing_Tests -{ - FakeTimeProvider provider; - TransportSettings transportSettings; - FakeHttpHandler httpHandler; - RabbitMQQuery rabbitMQQuery; - - [SetUp] - public void SetUp() - { - provider = new(); - provider.SetUtcNow(DateTimeOffset.UtcNow); - transportSettings = new TransportSettings - { - ConnectionString = "host=localhost;username=rabbitmq;password=rabbitmq", - MaxConcurrency = 1, - EndpointName = Guid.NewGuid().ToString("N") - }; - httpHandler = new FakeHttpHandler(); - var httpClient = new HttpClient(httpHandler) { BaseAddress = new Uri("http://localhost:15672") }; - - rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient); - rabbitMQQuery.Initialize(ReadOnlyDictionary.Empty); - } - - [TearDown] - public void TearDown() => httpHandler.Dispose(); - - public Func SendCallback - { - get => httpHandler.SendCallback; - set => httpHandler.SendCallback = value; - } - - [Test] - public async Task Should_handle_duplicated_json_data() - { - SendCallback = _ => - { - var response = new HttpResponseMessage - { - Content = new StringContent(""" - { - "items": [ - { - "name": "queue1", - "vhost": "vhost1", - "memory": 1024, - "memory": 1024, - "message_stats": { - "ack": 1 - } - }, - { - "name": "queue2", - "vhost": "vhost2", - "vhost": "vhost2", - "message_stats": { - "ack": 2 - } - } - ], - "page": 1, - "page_count": 1, - "page_size": 500, - "total_count": 2 - } - """) - }; - return response; - }; - - var queues = (await rabbitMQQuery.GetPage(1, default)).Item1; - Approver.Verify(queues); - } - - [Test] - public async Task Should_fetch_queue_details_in_old_format() - { - SendCallback = _ => - { - var response = new HttpResponseMessage - { - Content = new StringContent(""" - [ - { - "name": "queue1", - "vhost": "vhost1", - "memory": 1024, - "message_stats": { - "ack": 1 - } - }, - { - "name": "queue2", - "vhost": "vhost2", - "message_stats": { - "ack": 2 - } - }, - { - "name": "queue3", - "vhost": "vhost1" - } - ] - """) - }; - return response; - }; - - var queues = (await rabbitMQQuery.GetPage(1, default)).Item1; - Approver.Verify(queues); - } - - sealed class TestableRabbitMQQuery( - TimeProvider timeProvider, - TransportSettings transportSettings, - HttpClient customHttpClient) - : RabbitMQQuery(NullLogger.Instance, timeProvider, transportSettings) - { - protected override HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => customHttpClient; - } - - sealed class FakeHttpHandler : HttpClientHandler - { - public Func SendCallback { get; set; } - - protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) => SendCallback(request); - - protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) => Task.FromResult(SendCallback(request)); - } -} \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj index 6cb4aacd7d..32f68e736f 100644 --- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj +++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj @@ -20,7 +20,6 @@ - diff --git a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs index 5504962847..c121bcae6d 100644 --- a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs +++ b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs @@ -10,7 +10,7 @@ class QueueLengthMonitoringTests : TransportTestFixture [Test] public async Task Should_report_queue_length() { - var queueName = GetTestQueueName("queuelenght"); + var queueName = GetTestQueueName("queuelength"); await CreateTestQueue(queueName); diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs index c6eefe193b..0f6d9027e1 100644 --- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs +++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; + using NServiceBus; using NServiceBus.Logging; using NServiceBus.Transport; using NUnit.Framework; @@ -87,21 +88,24 @@ protected async Task StartQueueLengthProvider(string queueName // currently working around by creating a service collection per start call and then disposing the provider // as part of the method scope. This could lead to potential problems later once we add disposable resources // but this code probably requires a major overhaul anyway. - var serviceCollection = new ServiceCollection(); + var transportSettings = new TransportSettings { ConnectionString = configuration.ConnectionString, EndpointName = queueName, MaxConcurrency = 1 }; + + var serviceCollection = new ServiceCollection(); + configuration.TransportCustomization.AddTransportForMonitoring(serviceCollection, transportSettings); - serviceCollection.AddSingleton>((qlt, _) => - onQueueLengthReported(qlt.First())); + configuration.TransportCustomization.CustomizeMonitoringEndpoint(new EndpointConfiguration("queueName"), transportSettings); + + serviceCollection.AddSingleton>((qlt, _) => onQueueLengthReported(qlt.First())); var serviceProvider = serviceCollection.BuildServiceProvider(); - queueLengthProvider = serviceProvider.GetRequiredService(); + queueLengthProvider = serviceProvider.GetRequiredService(); await queueLengthProvider.StartAsync(CancellationToken.None); - queueLengthProvider.TrackEndpointInputQueue(new EndpointToQueueMapping(queueName, queueName)); return new QueueLengthProviderScope(serviceProvider); diff --git a/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs b/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs index 01fb7cb5b5..6cd791267e 100644 --- a/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs +++ b/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs @@ -7,8 +7,11 @@ namespace ServiceControl.Transports.BrokerThroughput; public interface IBrokerQueue #pragma warning restore CA1711 { - public string QueueName { get; } - public string SanitizedName { get; } - public string? Scope { get; } - public List EndpointIndicators { get; } + string QueueName { get; } + + string SanitizedName { get; } + + string? Scope { get; } + + List EndpointIndicators { get; } } \ No newline at end of file diff --git a/src/ServiceControl.Transports/ServiceControl.Transports.csproj b/src/ServiceControl.Transports/ServiceControl.Transports.csproj index 6bd8eeea61..435491c160 100644 --- a/src/ServiceControl.Transports/ServiceControl.Transports.csproj +++ b/src/ServiceControl.Transports/ServiceControl.Transports.csproj @@ -2,6 +2,8 @@ net8.0 + true + ..\NServiceBus.snk diff --git a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs index c00b1d8ee5..c3c79743bf 100644 --- a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs +++ b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs @@ -1,7 +1,11 @@ namespace ServiceControlInstaller.Engine.Configuration.ServiceControl { + using System; + using System.Configuration; + using System.Data.Common; using System.IO; using Instances; + using NuGet.Versioning; public class ServiceControlAppConfig : AppConfig { @@ -12,9 +16,12 @@ public ServiceControlAppConfig(IServiceControlInstance instance) : base(Path.Com protected override void UpdateSettings() { + UpdateConnectionString(); Config.ConnectionStrings.ConnectionStrings.Set("NServiceBus/Transport", details.ConnectionString); + var settings = Config.AppSettings.Settings; var version = details.Version; + settings.Set(ServiceControlSettings.InstanceName, details.InstanceName, version); settings.Set(ServiceControlSettings.VirtualDirectory, details.VirtualDirectory); settings.Set(ServiceControlSettings.Port, details.Port.ToString()); @@ -43,6 +50,9 @@ protected override void UpdateSettings() settings.RemoveIfRetired(ServiceControlSettings.AuditLogQueue, version); settings.RemoveIfRetired(ServiceControlSettings.ForwardAuditMessages, version); settings.RemoveIfRetired(ServiceControlSettings.InternalQueueName, version); + settings.RemoveIfRetired(ServiceControlSettings.LicensingComponentRabbitMqManagementApiUrl, version); + settings.RemoveIfRetired(ServiceControlSettings.LicensingComponentRabbitMqManagementApiUsername, version); + settings.RemoveIfRetired(ServiceControlSettings.LicensingComponentRabbitMqManagementApiPassword, version); RemoveRavenDB35Settings(settings, version); } @@ -68,6 +78,44 @@ public override void SetTransportType(string transportTypeName) settings.Set(ServiceControlSettings.TransportType, transportTypeName, version); } - IServiceControlInstance details; + void UpdateConnectionString() + { + if (details.TransportPackage.Name.Contains("rabbitmq", StringComparison.OrdinalIgnoreCase)) + { + MigrateLicensingComponentRabbitMqManagementApiSettings(); + } + } + + void MigrateLicensingComponentRabbitMqManagementApiSettings() + { + if (details.ConnectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase)) + { + return; + } + + var shouldMigrate = VersionComparer.Version.Compare(details.Version, new SemanticVersion(6, 5, 0)) >= 0; + + if (shouldMigrate) + { + var connectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = details.ConnectionString }; + var settings = Config.AppSettings.Settings; + + MigrateSetting(connectionStringBuilder, settings[ServiceControlSettings.LicensingComponentRabbitMqManagementApiUrl.Name], "ManagementApiUrl"); + MigrateSetting(connectionStringBuilder, settings[ServiceControlSettings.LicensingComponentRabbitMqManagementApiUsername.Name], "ManagementApiUserName"); + MigrateSetting(connectionStringBuilder, settings[ServiceControlSettings.LicensingComponentRabbitMqManagementApiPassword.Name], "ManagementApiPassword"); + + details.ConnectionString = connectionStringBuilder.ConnectionString; + } + + static void MigrateSetting(DbConnectionStringBuilder connectionStringBuilder, KeyValueConfigurationElement setting, string connectionStringSettingName) + { + if (setting is not null && !connectionStringBuilder.ContainsKey(connectionStringSettingName)) + { + connectionStringBuilder.Add(connectionStringSettingName, setting.Value); + } + } + } + + readonly IServiceControlInstance details; } } \ No newline at end of file diff --git a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs index f982b8bc34..9dc2166000 100644 --- a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs +++ b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs @@ -95,5 +95,23 @@ public static class ServiceControlSettings Name = "ServiceControl/ShutdownTimeout", SupportedFrom = new SemanticVersion(6, 4, 1) }; + + public static readonly SettingInfo LicensingComponentRabbitMqManagementApiUrl = new() + { + Name = "LicensingComponent/RabbitMQ/ApiUrl", + RemovedFrom = new SemanticVersion(6, 5, 0) + }; + + public static readonly SettingInfo LicensingComponentRabbitMqManagementApiUsername = new() + { + Name = "LicensingComponent/RabbitMQ/UserName", + RemovedFrom = new SemanticVersion(6, 5, 0) + }; + + public static readonly SettingInfo LicensingComponentRabbitMqManagementApiPassword = new() + { + Name = "LicensingComponent/RabbitMQ/Password", + RemovedFrom = new SemanticVersion(6, 5, 0) + }; } -} \ No newline at end of file +} diff --git a/src/ServiceControlInstaller.Engine/Interfaces.cs b/src/ServiceControlInstaller.Engine/Interfaces.cs index 54a1c8728f..914be2cc43 100644 --- a/src/ServiceControlInstaller.Engine/Interfaces.cs +++ b/src/ServiceControlInstaller.Engine/Interfaces.cs @@ -16,7 +16,7 @@ public interface ILogging public interface ITransportConfig { TransportInfo TransportPackage { get; } - string ConnectionString { get; } + string ConnectionString { get; set; } } public interface IPersistenceConfig diff --git a/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs b/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs index 79bb2edfa7..8400dec2b2 100644 --- a/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs +++ b/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs @@ -4,6 +4,7 @@ using System.Linq; using System.ServiceProcess; using System.Threading.Tasks; + using NuGet.Versioning; using ServiceControl.LicenseManagement; using ServiceControlInstaller.Engine.Configuration.ServiceControl; using ServiceControlInstaller.Engine.Instances; @@ -46,7 +47,7 @@ public async Task ValidateNewInstance(params IServiceInstance[] instances) .Select(i => i.TransportPackage) .First(t => t is not null); - var continueInstall = await RabbitMqCheckIsOK(transport, false).ConfigureAwait(false); + var continueInstall = await RabbitMqCheckIsOK(transport, Constants.CurrentVersion, false).ConfigureAwait(false); return continueInstall; } @@ -81,12 +82,9 @@ async Task CanEditOrDelete(BaseService instance, bool isDelete) return true; } - async Task RabbitMqCheckIsOK(TransportInfo transport, bool isUpgrade) + async Task RabbitMqCheckIsOK(TransportInfo transport, SemanticVersion instanceVersion, bool isUpgrade) { - if (transport is null) - { - throw new ArgumentNullException(nameof(transport)); - } + ArgumentNullException.ThrowIfNull(transport); if (transport.ZipName != "RabbitMQ") { @@ -94,9 +92,9 @@ async Task RabbitMqCheckIsOK(TransportInfo transport, bool isUpgrade) return true; } - // Only way we DON'T need to warn is if we're updating an instance that's already on a "new" (AvailableInSCMU) Rabbit transport - var needToWarn = !(isUpgrade && transport.AvailableInSCMU); - if (!needToWarn) + var newerThan650 = VersionComparer.Version.Compare(instanceVersion, new SemanticVersion(6, 5, 0)) > 0; + + if (isUpgrade && newerThan650) { return true; } @@ -166,7 +164,7 @@ public async Task CanUpgradeInstance(BaseService instance, bool forceUpgra } } - if (!await RabbitMqCheckIsOK(instance.TransportPackage, isUpgrade: true).ConfigureAwait(false)) + if (!await RabbitMqCheckIsOK(instance.TransportPackage, instance.Version, isUpgrade: true).ConfigureAwait(false)) { return false; } diff --git a/src/TestHelper/TestHelper.csproj b/src/TestHelper/TestHelper.csproj index a310aef090..077bb6b110 100644 --- a/src/TestHelper/TestHelper.csproj +++ b/src/TestHelper/TestHelper.csproj @@ -14,5 +14,5 @@ - +