Skip to content

Commit 5344258

Browse files
committed
Update QueueLengthProvider to use management client
1 parent be96ca6 commit 5344258

File tree

4 files changed

+28
-77
lines changed

4 files changed

+28
-77
lines changed

src/Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<PackageVersion Include="NServiceBus.Metrics" Version="5.0.1" />
3939
<PackageVersion Include="NServiceBus.Metrics.ServiceControl" Version="5.0.0" />
4040
<PackageVersion Include="NServiceBus.Persistence.NonDurable" Version="2.0.1" />
41-
<PackageVersion Include="NServiceBus.RabbitMQ" Version="10.0.0-beta.1" />
41+
<PackageVersion Include="NServiceBus.RabbitMQ" Version="10.0.0-beta.2" />
4242
<PackageVersion Include="NServiceBus.SagaAudit" Version="5.0.2" />
4343
<PackageVersion Include="NServiceBus.Testing" Version="9.0.1" />
4444
<PackageVersion Include="NServiceBus.Transport.AzureServiceBus" Version="5.0.0" />

src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs

Lines changed: 25 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@
44
using System.Collections.Concurrent;
55
using System.Threading;
66
using System.Threading.Tasks;
7-
using global::RabbitMQ.Client;
87
using NServiceBus.Logging;
9-
using NServiceBus.Transport.RabbitMQ;
10-
using ConnectionFactory = NServiceBus.Transport.RabbitMQ.ConnectionFactory;
8+
using NServiceBus.Transport.RabbitMQ.ManagementApi;
119

1210
class QueueLengthProvider : AbstractQueueLengthProvider
1311
{
14-
public QueueLengthProvider(TransportSettings settings, Action<QueueLengthEntry[], EndpointToQueueMapping> store) : base(settings, store)
12+
public QueueLengthProvider(TransportSettings settings, Action<QueueLengthEntry[], EndpointToQueueMapping> store, ITransportCustomization transportCustomization) : base(settings, store)
1513
{
16-
queryExecutor = new QueryExecutor(ConnectionString);
17-
queryExecutor.Initialize();
14+
if (transportCustomization is IManagementClientProvider provider)
15+
{
16+
managementClient = provider.ManagementClient;
17+
}
18+
else
19+
{
20+
throw new ArgumentException($"Transport customization does not implement {nameof(IManagementClientProvider)}. Type: {transportCustomization.GetType().Name}", nameof(transportCustomization));
21+
}
1822
}
1923

2024
public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) =>
@@ -76,90 +80,37 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
7680
{
7781
foreach (var endpointQueuePair in endpointQueues)
7882
{
79-
await queryExecutor.Execute(async m =>
80-
{
81-
var queueName = endpointQueuePair.Value;
82-
83-
try
84-
{
85-
var size = (int)await m.MessageCountAsync(queueName, cancellationToken).ConfigureAwait(false);
86-
87-
sizes.AddOrUpdate(queueName, _ => size, (_, __) => size);
88-
}
89-
catch (Exception e)
90-
{
91-
Logger.Warn($"Error querying queue length for {queueName}", e);
92-
}
93-
}, cancellationToken);
94-
}
95-
}
96-
97-
readonly QueryExecutor queryExecutor;
98-
static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
99-
100-
readonly ConcurrentDictionary<string, string> endpointQueues = new ConcurrentDictionary<string, string>();
101-
readonly ConcurrentDictionary<string, int> sizes = new ConcurrentDictionary<string, int>();
102-
103-
static readonly ILog Logger = LogManager.GetLogger<QueueLengthProvider>();
104-
105-
class QueryExecutor(string connectionString) : IDisposable
106-
{
107-
108-
public void Initialize()
109-
{
110-
var connectionConfiguration = ConnectionConfiguration.Create(connectionString);
111-
112-
// TODO Fix this up
113-
//var dbConnectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = connectionString };
114-
115-
connectionFactory = new("ServiceControl.Monitoring",
116-
connectionConfiguration,
117-
null, //providing certificates is not supported yet
118-
false, // TODO Fix dbConnectionStringBuilder.GetBooleanValue("DisableRemoteCertificateValidation"),
119-
false, // TODO fix dbConnectionStringBuilder.GetBooleanValue("UseExternalAuthMechanism"),
120-
TimeSpan.FromSeconds(60), // value would come from config API in actual transport
121-
TimeSpan.FromSeconds(10), // value would come from config API in actual transport
122-
null); // value would come from config API in actual transport
123-
}
83+
var queueName = endpointQueuePair.Value;
12484

125-
public async Task Execute(Action<IChannel> action, CancellationToken cancellationToken = default)
126-
{
12785
try
12886
{
129-
connection ??= await connectionFactory.CreateConnection("queue length monitor", cancellationToken: cancellationToken);
87+
var (statusCode, reason, queue) = await managementClient.GetQueue(queueName, cancellationToken);
13088

131-
//Connection implements reconnection logic
132-
while (!connection.IsOpen)
89+
if (queue is not null)
13390
{
134-
await Task.Delay(ReconnectionDelay, cancellationToken);
91+
var size = queue.MessageCount;
92+
sizes.AddOrUpdate(queueName, _ => size, (_, _) => size);
13593
}
136-
137-
if (channel == null || channel.IsClosed)
94+
else
13895
{
139-
channel?.Dispose();
140-
141-
channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
96+
Logger.Warn($"Error querying queue length for {queueName}. {statusCode}: {reason}");
14297
}
14398

144-
action(channel);
145-
}
146-
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
147-
{
148-
// no-op
14999
}
150100
catch (Exception e)
151101
{
152-
Logger.Warn("Error querying queue length.", e);
102+
Logger.Warn($"Error querying queue length for {queueName}", e);
153103
}
154104
}
105+
}
155106

156-
public void Dispose() => connection?.Dispose();
107+
static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
157108

158-
IConnection connection;
159-
IChannel channel;
160-
ConnectionFactory connectionFactory;
109+
readonly ConcurrentDictionary<string, string> endpointQueues = new();
110+
readonly ConcurrentDictionary<string, long> sizes = new();
161111

162-
static readonly TimeSpan ReconnectionDelay = TimeSpan.FromSeconds(5);
163-
}
112+
static readonly ILog Logger = LogManager.GetLogger<QueueLengthProvider>();
113+
114+
readonly ManagementClient managementClient;
164115
}
165116
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public abstract class RabbitMQConventionalRoutingTransportCustomization(NService
1717

1818
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
1919

20-
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
20+
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => rabbitMQTransport = transportDefinition;
2121

2222
protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
2323
{

src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public abstract class RabbitMQDirectRoutingTransportCustomization(NServiceBus.Qu
1717

1818
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
1919

20-
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
20+
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => rabbitMQTransport = transportDefinition;
2121

2222
protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
2323
{

0 commit comments

Comments
 (0)