@@ -41,8 +41,8 @@ public RabbitMQQuery(ILogger<RabbitMQQuery> logger,
4141
4242 protected override void InitializeCore ( ReadOnlyDictionary < string , string > settings )
4343 {
44- //// TODO: Update documentation
45- //// https://docs.particular.net/servicecontrol/servicecontrol-instances/configuration#usage-reporting-when-using-the-rabbitmq-transport
44+ // TODO: Update documentation
45+ // https://docs.particular.net/servicecontrol/servicecontrol-instances/configuration#usage-reporting-when-using-the-rabbitmq-transport
4646 CheckLegacySettings ( settings , RabbitMQSettings . UserName ) ;
4747 CheckLegacySettings ( settings , RabbitMQSettings . Password ) ;
4848 CheckLegacySettings ( settings , RabbitMQSettings . API ) ;
@@ -87,7 +87,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
8787
8888 var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , cancellationToken ) , cancellationToken ) ;
8989
90- if ( ! response . HasValue )
90+ if ( response . Value is null )
9191 {
9292 throw new InvalidOperationException ( $ "Could not access RabbitMQ Management API. ({ response . StatusCode } : { response . Reason } )") ;
9393 }
@@ -103,7 +103,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
103103 logger . LogDebug ( $ "Querying { url } ") ;
104104 response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , cancellationToken ) , cancellationToken ) ;
105105
106- if ( ! response . HasValue )
106+ if ( response . Value is not null )
107107 {
108108 throw new InvalidOperationException ( $ "Could not access RabbitMQ Management API. ({ response . StatusCode } : { response . Reason } )") ;
109109 }
@@ -118,46 +118,44 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
118118 }
119119 }
120120
121- async Task < ( string rabbitVersion , string managementVersion ) > GetRabbitDetails ( bool skipResiliencePipeline , CancellationToken cancellationToken )
121+ async Task GetRabbitDetails ( bool skipResiliencePipeline , CancellationToken cancellationToken )
122122 {
123- Response < Overview ? > response = skipResiliencePipeline
123+
124+ var response = skipResiliencePipeline
124125 ? await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken )
125126 : await pipeline . ExecuteAsync ( async async => await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken ) , cancellationToken ) ;
126127
127- var overview = GetResponseValue ( response ) ;
128+ ValidateResponse ( response ) ;
128129
129- if ( overview . DisableStats )
130+ if ( response . Value ! . DisableStats )
130131 {
131132 throw new Exception ( "The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' " +
132133 "and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker." ) ;
133134 }
134135
135- var rabbitVersion = response . Value ? . BrokerVersion ?? response . Value ? . ProductVersion ;
136- var mgmtVersion = response . Value ? . ManagementVersion ;
137-
138- return ( rabbitVersion ? . ToString ( ) ?? "Unknown" , mgmtVersion ? . ToString ( ) ?? "Unknown" ) ;
136+ Data [ "RabbitMQVersion" ] = response . Value ? . BrokerVersion ?? "Unknown" ;
139137 }
140138
141- static T GetResponseValue < T > ( Response < T ? > response ) where T : class
139+ void ValidateResponse < T > ( ( HttpStatusCode StatusCode , string Reason , T ? Value ) response )
142140 {
143- if ( ! response . HasValue || response . Value is null )
141+ if ( response . StatusCode != HttpStatusCode . OK )
144142 {
145- throw new InvalidOperationException ( $ "Could not access RabbitMQ Management API. ( { response . StatusCode } : { response . Reason } ) ") ;
143+ throw new HttpRequestException ( $ "Request failed with status code { response . StatusCode } : { response . Reason } ") ;
146144 }
147145
148- return response . Value ;
146+ if ( response . Value is null )
147+ {
148+ throw new InvalidOperationException ( "Request was successful, but the response body was null when a value was expected" ) ;
149+ }
149150 }
150151
151- public override async IAsyncEnumerable < IBrokerQueue > GetQueueNames (
152- [ EnumeratorCancellation ] CancellationToken cancellationToken )
152+ public override async IAsyncEnumerable < IBrokerQueue > GetQueueNames ( [ EnumeratorCancellation ] CancellationToken cancellationToken )
153153 {
154154 var page = 1 ;
155155 bool morePages ;
156156 var vHosts = new HashSet < string > ( StringComparer . CurrentCultureIgnoreCase ) ;
157157
158- ( string rabbitVersion , string managementVersion ) = await GetRabbitDetails ( false , cancellationToken ) ;
159- Data [ "RabbitMQVersion" ] = rabbitVersion ;
160- Data [ "RabbitMQManagementVersionVersion" ] = managementVersion ;
158+ await GetRabbitDetails ( false , cancellationToken ) ;
161159
162160 do
163161 {
@@ -189,7 +187,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
189187 {
190188 try
191189 {
192- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueueBindings ( brokerQueue . QueueName , cancellationToken ) , cancellationToken ) ;
190+ var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForQueue ( brokerQueue . QueueName , cancellationToken ) , cancellationToken ) ;
193191
194192 // Check if conventional binding is found
195193 if ( response . Value . Any ( binding => binding ? . Source == brokerQueue . QueueName
@@ -209,7 +207,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
209207
210208 try
211209 {
212- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetExchangeBindingsDestination ( brokerQueue . QueueName , cancellationToken ) , cancellationToken ) ;
210+ var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForExchange ( brokerQueue . QueueName , cancellationToken ) , cancellationToken ) ;
213211
214212 // Check if delayed binding is found
215213 if ( response . Value . Any ( binding => binding ? . Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
@@ -229,29 +227,11 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
229227
230228 internal async Task < ( List < RabbitMQBrokerQueueDetails > ? , bool morePages ) > GetPage ( int page , CancellationToken cancellationToken )
231229 {
232- var pagination = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetPage ( page , cancellationToken ) , cancellationToken ) ;
233- switch ( pagination . Value )
234- {
235- case Pagination obj :
236- {
237- var pageCount = obj . PageCount ;
238- var pageReturned = obj . Page ;
230+ var ( StatusCode , Reason , Value , MorePages ) = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueues ( page , 500 , cancellationToken ) , cancellationToken ) ;
239231
240- if ( obj . Items is null ) //is not JsonArray items
241- {
242- return ( null , false ) ;
243- }
232+ ValidateResponse ( ( StatusCode , Reason , Value ) ) ;
244233
245- return ( MaterializeQueueDetails ( obj . Items ) , pageCount > pageReturned ) ;
246- }
247- // Older versions of RabbitMQ API did not have paging and returned the array of items directly
248- //case JsonArray arr:
249- // {
250- // return (MaterializeQueueDetails(arr), false);
251- // }
252- default :
253- throw new Exception ( "Was not able to get list of queues from RabbitMQ broker." ) ;
254- }
234+ return ( MaterializeQueueDetails ( Value ) , MorePages ) ;
255235 }
256236
257237 static List < RabbitMQBrokerQueueDetails > MaterializeQueueDetails ( List < Queue > items )
0 commit comments