@@ -11,7 +11,6 @@ namespace ServiceControl.Transports.RabbitMQ;
1111using System . Threading ;
1212using System . Threading . Tasks ;
1313using Microsoft . Extensions . Logging ;
14- using NServiceBus ;
1514using NServiceBus . Transport . RabbitMQ . ManagementApi ;
1615using Polly ;
1716using Polly . Retry ;
@@ -21,7 +20,8 @@ public class RabbitMQQuery : BrokerThroughputQuery
2120{
2221 readonly ILogger < RabbitMQQuery > logger ;
2322 readonly TimeProvider timeProvider ;
24- readonly RabbitMQTransport rabbitMQTransport ;
23+ readonly ManagementClient managementClient ;
24+
2525 readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder ( )
2626 . AddRetry ( new RetryStrategyOptions ( ) ) // Add retry using the default options
2727 . AddTimeout ( TimeSpan . FromMinutes ( 2 ) ) // Add timeout if it keeps failing
@@ -31,7 +31,15 @@ public RabbitMQQuery(ILogger<RabbitMQQuery> logger, TimeProvider timeProvider, T
3131 {
3232 this . logger = logger ;
3333 this . timeProvider = timeProvider ;
34- rabbitMQTransport = GetRabbitMQTransport ( transportCustomization ) ;
34+
35+ if ( transportCustomization is IManagementClientProvider provider )
36+ {
37+ managementClient = provider . ManagementClient ;
38+ }
39+ else
40+ {
41+ throw new ArgumentException ( $ "Transport customization does not implement { nameof ( IManagementClientProvider ) } . Type: { transportCustomization . GetType ( ) . Name } ", nameof ( transportCustomization ) ) ;
42+ }
3543 }
3644
3745 protected override void InitializeCore ( ReadOnlyDictionary < string , string > settings )
@@ -43,16 +51,6 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
4351 CheckLegacySettings ( settings , RabbitMQSettings . API ) ;
4452 }
4553
46- static RabbitMQTransport GetRabbitMQTransport ( ITransportCustomization transportCustomization )
47- {
48- if ( transportCustomization is IRabbitMQTransportExtensions rabbitMQTransportCustomization )
49- {
50- return rabbitMQTransportCustomization . GetTransport ( ) ;
51- }
52-
53- throw new InvalidOperationException ( $ "Expected a RabbitMQTransport but received { transportCustomization . GetType ( ) . Name } .") ;
54- }
55-
5654 void CheckLegacySettings ( ReadOnlyDictionary < string , string > settings , string key )
5755 {
5856 if ( settings . TryGetValue ( key , out _ ) )
@@ -65,7 +63,7 @@ void CheckLegacySettings(ReadOnlyDictionary<string, string> settings, string key
6563 public override async IAsyncEnumerable < QueueThroughput > GetThroughputPerDay ( IBrokerQueue brokerQueue , DateOnly startDate , [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
6664 {
6765 var queue = ( RabbitMQBrokerQueueDetails ) brokerQueue ;
68- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , token ) , cancellationToken ) ;
66+ var response = await pipeline . ExecuteAsync ( async token => await managementClient . GetQueue ( queue . QueueName , token ) , cancellationToken ) ;
6967
7068 if ( response . Value is null )
7169 {
@@ -81,7 +79,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
8179 {
8280 await Task . Delay ( TimeSpan . FromMinutes ( 15 ) , timeProvider , cancellationToken ) ;
8381
84- response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , token ) , cancellationToken ) ;
82+ response = await pipeline . ExecuteAsync ( async token => await managementClient . GetQueue ( queue . QueueName , token ) , cancellationToken ) ;
8583
8684 if ( response . Value is null )
8785 {
@@ -101,7 +99,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
10199
102100 async Task GetRabbitDetails ( CancellationToken cancellationToken )
103101 {
104- var response = await pipeline . ExecuteAsync ( async async => await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken ) , cancellationToken ) ;
102+ var response = await pipeline . ExecuteAsync ( async async => await managementClient . GetOverview ( cancellationToken ) , cancellationToken ) ;
105103
106104 ValidateResponse ( response ) ;
107105
@@ -162,7 +160,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
162160 {
163161 try
164162 {
165- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForQueue ( brokerQueue . QueueName , token ) , cancellationToken ) ;
163+ var response = await pipeline . ExecuteAsync ( async token => await managementClient . GetBindingsForQueue ( brokerQueue . QueueName , token ) , cancellationToken ) ;
166164
167165 // Check if conventional binding is found
168166 if ( response . Value . Any ( binding => binding ? . Source == brokerQueue . QueueName
@@ -181,7 +179,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
181179
182180 try
183181 {
184- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForExchange ( brokerQueue . QueueName , token ) , cancellationToken ) ;
182+ var response = await pipeline . ExecuteAsync ( async token => await managementClient . GetBindingsForExchange ( brokerQueue . QueueName , token ) , cancellationToken ) ;
185183
186184 // Check if delayed binding is found
187185 if ( response . Value . Any ( binding => binding ? . Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
@@ -200,7 +198,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
200198
201199 internal async Task < ( List < RabbitMQBrokerQueueDetails > ? , bool morePages ) > GetPage ( int page , CancellationToken cancellationToken )
202200 {
203- var ( StatusCode , Reason , Value , MorePages ) = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueues ( page , 500 , token ) , cancellationToken ) ;
201+ var ( StatusCode , Reason , Value , MorePages ) = await pipeline . ExecuteAsync ( async token => await managementClient . GetQueues ( page , 500 , token ) , cancellationToken ) ;
204202
205203 ValidateResponse ( ( StatusCode , Reason , Value ) ) ;
206204
@@ -229,7 +227,7 @@ static List<RabbitMQBrokerQueueDetails> MaterializeQueueDetails(List<Queue> item
229227 {
230228 try
231229 {
232- var ( statusCode , reason , value ) = await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken ) ;
230+ var ( statusCode , reason , value ) = await managementClient . GetOverview ( cancellationToken ) ;
233231
234232 if ( value is not null )
235233 {
0 commit comments