@@ -19,18 +19,15 @@ namespace ServiceControl.Transports.RabbitMQ;
1919
2020public class RabbitMQQuery : BrokerThroughputQuery
2121{
22- readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder ( )
23- . AddRetry ( new RetryStrategyOptions ( ) ) // Add retry using the default options
24- . AddTimeout ( TimeSpan . FromMinutes ( 2 ) ) // Add timeout if it keeps failing
25- . Build ( ) ;
2622 readonly ILogger < RabbitMQQuery > logger ;
2723 readonly TimeProvider timeProvider ;
2824 readonly RabbitMQTransport rabbitMQTransport ;
25+ readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder ( )
26+ . AddRetry ( new RetryStrategyOptions ( ) ) // Add retry using the default options
27+ . AddTimeout ( TimeSpan . FromMinutes ( 2 ) ) // Add timeout if it keeps failing
28+ . Build ( ) ;
2929
30- public RabbitMQQuery ( ILogger < RabbitMQQuery > logger ,
31- TimeProvider timeProvider ,
32- TransportSettings transportSettings ,
33- ITransportCustomization transportCustomization ) : base ( logger , "RabbitMQ" )
30+ public RabbitMQQuery ( ILogger < RabbitMQQuery > logger , TimeProvider timeProvider , TransportSettings transportSettings , ITransportCustomization transportCustomization ) : base ( logger , "RabbitMQ" )
3431 {
3532 this . logger = logger ;
3633 this . timeProvider = timeProvider ;
@@ -65,16 +62,10 @@ void CheckLegacySettings(ReadOnlyDictionary<string, string> settings, string key
6562 }
6663 }
6764
68- public override async IAsyncEnumerable < QueueThroughput > GetThroughputPerDay ( IBrokerQueue brokerQueue ,
69- DateOnly startDate ,
70- [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
65+ public override async IAsyncEnumerable < QueueThroughput > GetThroughputPerDay ( IBrokerQueue brokerQueue , DateOnly startDate , [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
7166 {
7267 var queue = ( RabbitMQBrokerQueueDetails ) brokerQueue ;
73- //var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}";
74-
75- //logger.LogDebug($"Querying {url}");
76-
77- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , cancellationToken ) , cancellationToken ) ;
68+ var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , token ) , cancellationToken ) ;
7869
7970 if ( response . Value is null )
8071 {
@@ -89,8 +80,8 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
8980 for ( var i = 0 ; i < 24 * 4 ; i ++ )
9081 {
9182 await Task . Delay ( TimeSpan . FromMinutes ( 15 ) , timeProvider , cancellationToken ) ;
92- //logger.LogDebug($"Querying {url}");
93- response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , cancellationToken ) , cancellationToken ) ;
83+
84+ response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueue ( queue . QueueName , token ) , cancellationToken ) ;
9485
9586 if ( response . Value is null )
9687 {
@@ -99,6 +90,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
9990
10091 newReading = new RabbitMQBrokerQueueDetails ( response . Value ) ;
10192 var newTotalThroughput = queue . CalculateThroughputFrom ( newReading ) ;
93+
10294 yield return new QueueThroughput
10395 {
10496 DateUTC = DateOnly . FromDateTime ( timeProvider . GetUtcNow ( ) . DateTime ) ,
@@ -107,12 +99,9 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
10799 }
108100 }
109101
110- async Task GetRabbitDetails ( bool skipResiliencePipeline , CancellationToken cancellationToken )
102+ async Task GetRabbitDetails ( CancellationToken cancellationToken )
111103 {
112-
113- var response = skipResiliencePipeline
114- ? await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken )
115- : await pipeline . ExecuteAsync ( async async => await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken ) , cancellationToken ) ;
104+ var response = await pipeline . ExecuteAsync ( async async => await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken ) , cancellationToken ) ;
116105
117106 ValidateResponse ( response ) ;
118107
@@ -142,15 +131,14 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames([EnumeratorCa
142131 {
143132 var page = 1 ;
144133 bool morePages ;
145- //var vHosts = new HashSet<string>(StringComparer.CurrentCultureIgnoreCase);
146134
147- await GetRabbitDetails ( false , cancellationToken ) ;
135+ await GetRabbitDetails ( cancellationToken ) ;
148136
149137 do
150138 {
151139 ( var queues , morePages ) = await GetPage ( page , cancellationToken ) ;
152140
153- if ( queues != null )
141+ if ( queues is not null )
154142 {
155143 foreach ( var rabbitMQQueueDetails in queues )
156144 {
@@ -160,27 +148,24 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames([EnumeratorCa
160148 {
161149 continue ;
162150 }
163- //vHosts.Add(rabbitMQQueueDetails.VHost);
151+
164152 await AddAdditionalQueueDetails ( rabbitMQQueueDetails , cancellationToken ) ;
165153 yield return rabbitMQQueueDetails ;
166154 }
167155 }
168156
169157 page ++ ;
170158 } while ( morePages ) ;
171-
172- //ScopeType = vHosts.Count > 1 ? "VirtualHost" : null;
173159 }
174160
175161 async Task AddAdditionalQueueDetails ( RabbitMQBrokerQueueDetails brokerQueue , CancellationToken cancellationToken )
176162 {
177163 try
178164 {
179- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForQueue ( brokerQueue . QueueName , cancellationToken ) , cancellationToken ) ;
165+ var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForQueue ( brokerQueue . QueueName , token ) , cancellationToken ) ;
180166
181167 // Check if conventional binding is found
182168 if ( response . Value . Any ( binding => binding ? . Source == brokerQueue . QueueName
183- //&& binding?.Vhost == brokerQueue.VHost
184169 && binding ? . Destination == brokerQueue . QueueName
185170 && binding ? . DestinationType == "queue"
186171 && binding ? . RoutingKey == string . Empty
@@ -196,11 +181,10 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
196181
197182 try
198183 {
199- var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForExchange ( brokerQueue . QueueName , cancellationToken ) , cancellationToken ) ;
184+ var response = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetBindingsForExchange ( brokerQueue . QueueName , token ) , cancellationToken ) ;
200185
201186 // Check if delayed binding is found
202187 if ( response . Value . Any ( binding => binding ? . Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
203- //&& binding?.Vhost == brokerQueue.VHost
204188 && binding ? . Destination == brokerQueue . QueueName
205189 && binding ? . DestinationType == "exchange"
206190 && binding ? . RoutingKey == $ "#.{ brokerQueue . QueueName } ") )
@@ -216,7 +200,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
216200
217201 internal async Task < ( List < RabbitMQBrokerQueueDetails > ? , bool morePages ) > GetPage ( int page , CancellationToken cancellationToken )
218202 {
219- var ( StatusCode , Reason , Value , MorePages ) = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueues ( page , 500 , cancellationToken ) , cancellationToken ) ;
203+ var ( StatusCode , Reason , Value , MorePages ) = await pipeline . ExecuteAsync ( async token => await rabbitMQTransport . ManagementClient . GetQueues ( page , 500 , token ) , cancellationToken ) ;
220204
221205 ValidateResponse ( ( StatusCode , Reason , Value ) ) ;
222206
@@ -241,19 +225,25 @@ static List<RabbitMQBrokerQueueDetails> MaterializeQueueDetails(List<Queue> item
241225 new KeyDescriptionPair ( RabbitMQSettings . Password , RabbitMQSettings . PasswordDescription )
242226 ] ;
243227
244- protected override async Task < ( bool Success , List < string > Errors ) > TestConnectionCore (
245- CancellationToken cancellationToken )
228+ protected override async Task < ( bool Success , List < string > Errors ) > TestConnectionCore ( CancellationToken cancellationToken )
246229 {
247230 try
248231 {
249- await GetRabbitDetails ( true , cancellationToken ) ;
232+ var ( statusCode , reason , value ) = await rabbitMQTransport . ManagementClient . GetOverview ( cancellationToken ) ;
233+
234+ if ( value is not null )
235+ {
236+ return ( true , [ ] ) ;
237+ }
238+ else
239+ {
240+ return ( false , [ $ "{ statusCode } : { reason } "] ) ;
241+ }
250242 }
251- catch ( HttpRequestException e )
243+ catch ( HttpRequestException ex )
252244 {
253- throw new Exception ( $ "Failed to connect to management API", e ) ;
245+ throw new Exception ( $ "Failed to connect to RabbitMQ management API", ex ) ;
254246 }
255-
256- return ( true , [ ] ) ;
257247 }
258248
259249 public static class RabbitMQSettings
0 commit comments