Skip to content

Commit 122649f

Browse files
committed
Make IManagementClientProvider lazy to ensure endpoints have started
1 parent 5dd9aa7 commit 122649f

File tree

5 files changed

+60
-21
lines changed

5 files changed

+60
-21
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
namespace ServiceControl.Transports.RabbitMQ;
22

3+
using System;
34
using NServiceBus.Transport.RabbitMQ.ManagementApi;
45

56
interface IManagementClientProvider
67
{
7-
ManagementClient ManagementClient { get; }
8+
Lazy<ManagementClient> GetManagementClient();
89
}

src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public QueueLengthProvider(TransportSettings settings, Action<QueueLengthEntry[]
1313
{
1414
if (transportCustomization is IManagementClientProvider provider)
1515
{
16-
managementClient = provider.ManagementClient;
16+
managementClient = provider.GetManagementClient();
1717
}
1818
else
1919
{
@@ -84,7 +84,7 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
8484

8585
try
8686
{
87-
var (statusCode, reason, queue) = await managementClient.GetQueue(queueName, cancellationToken);
87+
var (statusCode, reason, queue) = await managementClient.Value.GetQueue(queueName, cancellationToken);
8888

8989
if (queue is not null)
9090
{
@@ -111,6 +111,6 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
111111

112112
static readonly ILog Logger = LogManager.GetLogger<QueueLengthProvider>();
113113

114-
readonly ManagementClient managementClient;
114+
readonly Lazy<ManagementClient> managementClient;
115115
}
116116
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,34 @@
99

1010
public abstract class RabbitMQConventionalRoutingTransportCustomization(NServiceBus.QueueType queueType) : TransportCustomization<RabbitMQTransport>, IManagementClientProvider
1111
{
12-
RabbitMQTransport rabbitMQTransport;
12+
RabbitMQTransport transport;
1313

14-
ManagementClient IManagementClientProvider.ManagementClient => rabbitMQTransport?.ManagementClient ?? new ManagementClient(rabbitMQTransport.ConnectionConfiguration, rabbitMQTransport.ManagementApiConfiguration);
14+
Lazy<ManagementClient> IManagementClientProvider.GetManagementClient()
15+
{
16+
return new(() => Get());
17+
18+
ManagementClient Get()
19+
{
20+
if (transport is null)
21+
{
22+
throw new InvalidOperationException("Management client not available because a CustomizeTransport method has not been called first.");
23+
}
24+
25+
// Since some tests don't actually start an endpoint, this is needed to ensure a management client is available
26+
if (transport.ManagementClient is null)
27+
{
28+
return new ManagementClient(transport.ConnectionConfiguration, transport.ManagementApiConfiguration);
29+
}
30+
31+
return transport.ManagementClient;
32+
}
33+
}
1534

16-
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => rabbitMQTransport = transportDefinition;
35+
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
1736

1837
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
1938

20-
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => rabbitMQTransport = transportDefinition;
39+
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
2140

2241
protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
2342
{

src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,34 @@
99

1010
public abstract class RabbitMQDirectRoutingTransportCustomization(NServiceBus.QueueType queueType) : TransportCustomization<RabbitMQTransport>, IManagementClientProvider
1111
{
12-
RabbitMQTransport rabbitMQTransport;
12+
RabbitMQTransport transport;
1313

14-
ManagementClient IManagementClientProvider.ManagementClient => rabbitMQTransport?.ManagementClient ?? new ManagementClient(rabbitMQTransport.ConnectionConfiguration, rabbitMQTransport.ManagementApiConfiguration);
14+
Lazy<ManagementClient> IManagementClientProvider.GetManagementClient()
15+
{
16+
return new(() => Get());
17+
18+
ManagementClient Get()
19+
{
20+
if (transport is null)
21+
{
22+
throw new InvalidOperationException("Management client not available because a CustomizeTransport method has not been called first.");
23+
}
24+
25+
// Since some tests don't actually start an endpoint, this is needed to ensure a management client is available
26+
if (transport.ManagementClient is null)
27+
{
28+
return new ManagementClient(transport.ConnectionConfiguration, transport.ManagementApiConfiguration);
29+
}
30+
31+
return transport.ManagementClient;
32+
}
33+
}
1534

16-
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => rabbitMQTransport = transportDefinition;
35+
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
1736

1837
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
1938

20-
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => rabbitMQTransport = transportDefinition;
39+
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
2140

2241
protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
2342
{

src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class RabbitMQQuery : BrokerThroughputQuery
2020
{
2121
readonly ILogger<RabbitMQQuery> logger;
2222
readonly TimeProvider timeProvider;
23-
readonly ManagementClient managementClient;
23+
readonly Lazy<ManagementClient> managementClient;
2424

2525
readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
2626
.AddRetry(new RetryStrategyOptions()) // Add retry using the default options
@@ -34,7 +34,7 @@ public RabbitMQQuery(ILogger<RabbitMQQuery> logger, TimeProvider timeProvider, I
3434

3535
if (transportCustomization is IManagementClientProvider provider)
3636
{
37-
managementClient = provider.ManagementClient;
37+
managementClient = provider.GetManagementClient();
3838
}
3939
else
4040
{
@@ -63,7 +63,7 @@ void CheckLegacySettings(ReadOnlyDictionary<string, string> settings, string key
6363
public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQueue brokerQueue, DateOnly startDate, [EnumeratorCancellation] CancellationToken cancellationToken = default)
6464
{
6565
var queue = (RabbitMQBrokerQueueDetails)brokerQueue;
66-
var response = await pipeline.ExecuteAsync(async token => await managementClient.GetQueue(queue.QueueName, token), cancellationToken);
66+
var response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(queue.QueueName, token), cancellationToken);
6767

6868
if (response.Value is null)
6969
{
@@ -79,7 +79,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
7979
{
8080
await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken);
8181

82-
response = await pipeline.ExecuteAsync(async token => await managementClient.GetQueue(queue.QueueName, token), cancellationToken);
82+
response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(queue.QueueName, token), cancellationToken);
8383

8484
if (response.Value is null)
8585
{
@@ -99,7 +99,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
9999

100100
async Task GetRabbitDetails(CancellationToken cancellationToken)
101101
{
102-
var response = await pipeline.ExecuteAsync(async async => await managementClient.GetOverview(cancellationToken), cancellationToken);
102+
var response = await pipeline.ExecuteAsync(async async => await managementClient.Value.GetOverview(cancellationToken), cancellationToken);
103103

104104
ValidateResponse(response);
105105

@@ -160,7 +160,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
160160
{
161161
try
162162
{
163-
var response = await pipeline.ExecuteAsync(async token => await managementClient.GetBindingsForQueue(brokerQueue.QueueName, token), cancellationToken);
163+
var response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForQueue(brokerQueue.QueueName, token), cancellationToken);
164164

165165
// Check if conventional binding is found
166166
if (response.Value.Any(binding => binding?.Source == brokerQueue.QueueName
@@ -179,7 +179,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
179179

180180
try
181181
{
182-
var response = await pipeline.ExecuteAsync(async token => await managementClient.GetBindingsForExchange(brokerQueue.QueueName, token), cancellationToken);
182+
var response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForExchange(brokerQueue.QueueName, token), cancellationToken);
183183

184184
// Check if delayed binding is found
185185
if (response.Value.Any(binding => binding?.Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
@@ -198,7 +198,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
198198

199199
internal async Task<(List<RabbitMQBrokerQueueDetails>?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
200200
{
201-
var (StatusCode, Reason, Value, MorePages) = await pipeline.ExecuteAsync(async token => await managementClient.GetQueues(page, 500, token), cancellationToken);
201+
var (StatusCode, Reason, Value, MorePages) = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueues(page, 500, token), cancellationToken);
202202

203203
ValidateResponse((StatusCode, Reason, Value));
204204

@@ -227,7 +227,7 @@ static List<RabbitMQBrokerQueueDetails> MaterializeQueueDetails(List<Queue> item
227227
{
228228
try
229229
{
230-
var (statusCode, reason, value) = await managementClient.GetOverview(cancellationToken);
230+
var (statusCode, reason, value) = await managementClient.Value.GetOverview(cancellationToken);
231231

232232
if (value is not null)
233233
{

0 commit comments

Comments
 (0)