@@ -12,8 +12,8 @@ namespace ServiceControl.Transports.ASBS;
1212using Azure . Core ;
1313using Azure . Core . Pipeline ;
1414using Azure . Identity ;
15- using Azure . Monitor . Query ;
16- using Azure . Monitor . Query . Models ;
15+ using Azure . Monitor . Query . Metrics ;
16+ using Azure . Monitor . Query . Metrics . Models ;
1717using Azure . ResourceManager ;
1818using Azure . ResourceManager . Resources ;
1919using Azure . ResourceManager . ServiceBus ;
@@ -26,10 +26,12 @@ public class AzureQuery(ILogger<AzureQuery> logger, TimeProvider timeProvider, T
2626 : BrokerThroughputQuery ( logger , "AzureServiceBus" )
2727{
2828 string serviceBusName = string . Empty ;
29- MetricsQueryClient ? client ;
3029 ArmClient ? armClient ;
31- string ? resourceId ;
30+ TokenCredential ? credential ;
31+ ResourceIdentifier ? resourceId ;
3232 ArmEnvironment armEnvironment ;
33+ MetricsClientAudience metricsClientAudience ;
34+ MetricsClient ? metricsClient ;
3335
3436 protected override void InitializeCore ( ReadOnlyDictionary < string , string > settings )
3537 {
@@ -102,7 +104,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
102104 Diagnostics . AppendLine ( "Client secret set" ) ;
103105 }
104106
105- ( armEnvironment , var metricsQueryAudience ) = GetEnvironment ( ) ;
107+ ( armEnvironment , metricsClientAudience ) = GetEnvironment ( ) ;
106108
107109 if ( managementUrl == null )
108110 {
@@ -118,28 +120,17 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
118120 return ;
119121 }
120122
121- TokenCredential clientCredentials ;
122123 if ( connectionSettings . AuthenticationMethod is TokenCredentialAuthentication tokenCredentialAuthentication )
123124 {
124125 Diagnostics . AppendLine ( "Attempting to use managed identity" ) ;
125- clientCredentials = tokenCredentialAuthentication . Credential ;
126+ credential = tokenCredentialAuthentication . Credential ;
126127 }
127128 else
128129 {
129- clientCredentials = new ClientSecretCredential ( tenantId , clientId , clientSecret ) ;
130+ credential = new ClientSecretCredential ( tenantId , clientId , clientSecret ) ;
130131 }
131132
132- client = new MetricsQueryClient ( armEnvironment . Endpoint , clientCredentials ,
133- new MetricsQueryClientOptions
134- {
135- Audience = metricsQueryAudience ,
136- Transport = new HttpClientTransport (
137- new HttpClient ( new SocketsHttpHandler
138- {
139- PooledConnectionIdleTimeout = TimeSpan . FromMinutes ( 2 )
140- } ) )
141- } ) ;
142- armClient = new ArmClient ( clientCredentials , subscriptionId ,
133+ armClient = new ArmClient ( credential , subscriptionId ,
143134 new ArmClientOptions
144135 {
145136 Environment = armEnvironment ,
@@ -152,31 +143,31 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
152143
153144 return ;
154145
155- ( ArmEnvironment armEnvironment , MetricsQueryAudience metricsQueryAudience ) GetEnvironment ( )
146+ ( ArmEnvironment armEnvironment , MetricsClientAudience metricsClientAudience ) GetEnvironment ( )
156147 {
157148 if ( managementUrlParsed == null )
158149 {
159- return ( ArmEnvironment . AzurePublicCloud , MetricsQueryAudience . AzurePublicCloud ) ;
150+ return ( ArmEnvironment . AzurePublicCloud , MetricsClientAudience . AzurePublicCloud ) ;
160151 }
161152
162153 if ( managementUrlParsed == ArmEnvironment . AzurePublicCloud . Endpoint )
163154 {
164- return ( ArmEnvironment . AzurePublicCloud , MetricsQueryAudience . AzurePublicCloud ) ;
155+ return ( ArmEnvironment . AzurePublicCloud , MetricsClientAudience . AzurePublicCloud ) ;
165156 }
166157
167158 if ( managementUrlParsed == ArmEnvironment . AzureChina . Endpoint )
168159 {
169- return ( ArmEnvironment . AzureChina , MetricsQueryAudience . AzureChina ) ;
160+ return ( ArmEnvironment . AzureChina , MetricsClientAudience . AzureChina ) ;
170161 }
171162
172163 if ( managementUrlParsed == ArmEnvironment . AzureGermany . Endpoint )
173164 {
174- return ( ArmEnvironment . AzureGermany , MetricsQueryAudience . AzurePublicCloud ) ;
165+ return ( ArmEnvironment . AzureGermany , MetricsClientAudience . AzurePublicCloud ) ;
175166 }
176167
177168 if ( managementUrlParsed == ArmEnvironment . AzureGovernment . Endpoint )
178169 {
179- return ( ArmEnvironment . AzureGovernment , MetricsQueryAudience . AzureGovernment ) ;
170+ return ( ArmEnvironment . AzureGovernment , MetricsClientAudience . AzureGovernment ) ;
180171 }
181172
182173 string options = string . Join ( ", " ,
@@ -187,7 +178,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
187178 } . Select ( armEnvironment => $ "\" { armEnvironment . Endpoint } \" ") ) ;
188179 InitialiseErrors . Add ( $ "Management url configuration is invalid, available options are { options } ") ;
189180
190- return ( ArmEnvironment . AzurePublicCloud , MetricsQueryAudience . AzurePublicCloud ) ;
181+ return ( ArmEnvironment . AzurePublicCloud , MetricsClientAudience . AzurePublicCloud ) ;
191182 }
192183 }
193184
@@ -229,7 +220,6 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
229220 while ( currentDate <= endDate )
230221 {
231222 data . Add ( currentDate , new QueueThroughput { TotalThroughput = 0 , DateUTC = currentDate } ) ;
232-
233223 currentDate = currentDate . AddDays ( 1 ) ;
234224 }
235225
@@ -244,23 +234,67 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
244234 }
245235 }
246236
237+ async Task < MetricsClient > InitializeMetricsClient ( CancellationToken cancellationToken = default )
238+ {
239+ if ( resourceId is null || armClient is null || credential is null )
240+ {
241+ throw new InvalidOperationException ( "AzureQuery has not been initialized correctly." ) ;
242+ }
243+
244+ var serviceBusNamespaceResource = await armClient
245+ . GetServiceBusNamespaceResource ( resourceId ) . GetAsync ( cancellationToken )
246+ ?? throw new Exception ( $ "Could not find ServiceBus with resource Id: \" { resourceId } \" ") ;
247+
248+ // Determine the region of the namespace
249+ var regionName = serviceBusNamespaceResource . Value . Data . Location . Name ;
250+
251+ // Build the regional Azure Monitor Metrics endpoint from the audience
252+ var metricsEndpoint = BuildMetricsEndpoint ( metricsClientAudience , regionName ) ;
253+
254+ // CreateNewOnMetadataUpdateAttribute the MetricsClient for this namespace
255+ return new MetricsClient (
256+ metricsEndpoint ,
257+ credential ! ,
258+ new MetricsClientOptions
259+ {
260+ Audience = metricsClientAudience ,
261+ Transport = new HttpClientTransport (
262+ new HttpClient ( new SocketsHttpHandler
263+ {
264+ PooledConnectionIdleTimeout = TimeSpan . FromMinutes ( 2 )
265+ } ) )
266+ } ) ;
267+ }
268+
269+ static Uri BuildMetricsEndpoint ( MetricsClientAudience audience , string regionName )
270+ {
271+ var region = regionName . ToLowerInvariant ( ) ;
272+ var builder = new UriBuilder ( audience . ToString ( ) ) ;
273+ builder . Host = $ "{ region } .{ builder . Host } ";
274+ return builder . Uri ;
275+ }
276+
247277 async Task < IReadOnlyList < MetricValue > > GetMetrics ( string queueName , DateOnly startTime , DateOnly endTime ,
248278 CancellationToken cancellationToken = default )
249279 {
250- var response = await client ! . QueryResourceAsync ( resourceId ,
251- new [ ] { "CompleteMessage" } ,
252- new MetricsQueryOptions
280+ metricsClient ??= await InitializeMetricsClient ( cancellationToken ) ;
281+
282+ var response = await metricsClient . QueryResourcesAsync (
283+ [ resourceId ! ] ,
284+ [ "CompleteMessage" ] ,
285+ "Microsoft.ServiceBus/namespaces" ,
286+ new MetricsQueryResourcesOptions
253287 {
254288 Filter = $ "EntityName eq '{ queueName } '",
255- TimeRange = new QueryTimeRange ( startTime . ToDateTime ( TimeOnly . MinValue , DateTimeKind . Utc ) , endTime . ToDateTime ( TimeOnly . MaxValue , DateTimeKind . Utc ) ) ,
289+ TimeRange = new MetricsQueryTimeRange ( startTime . ToDateTime ( TimeOnly . MinValue , DateTimeKind . Utc ) , endTime . ToDateTime ( TimeOnly . MaxValue , DateTimeKind . Utc ) ) ,
256290 Granularity = TimeSpan . FromDays ( 1 )
257291 } ,
258292 cancellationToken ) ;
259293
260294 var metricValues =
261- response . Value . Metrics . FirstOrDefault ( ) ? . TimeSeries . FirstOrDefault ( ) ? . Values ?? [ ] ;
295+ response . Value . Values . FirstOrDefault ( ) ? . Metrics . FirstOrDefault ( ) ? . TimeSeries . FirstOrDefault ( ) ? . Values ?? [ ] ;
262296
263- return metricValues ;
297+ return metricValues . AsReadOnly ( ) ;
264298 }
265299
266300 public override async IAsyncEnumerable < IBrokerQueue > GetQueueNames (
@@ -269,15 +303,14 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
269303 var validNamespaces = await GetValidNamespaceNames ( cancellationToken ) ;
270304
271305 SubscriptionResource ? subscription = await armClient ! . GetDefaultSubscriptionAsync ( cancellationToken ) ;
272- var namespaces =
273- subscription . GetServiceBusNamespacesAsync ( cancellationToken ) ;
306+ var namespaces = subscription . GetServiceBusNamespacesAsync ( cancellationToken ) ;
274307
275- await foreach ( var serviceBusNamespaceResource in namespaces . WithCancellation (
276- cancellationToken ) )
308+ await foreach ( var serviceBusNamespaceResource in namespaces . WithCancellation ( cancellationToken ) )
277309 {
278310 if ( validNamespaces . Contains ( serviceBusNamespaceResource . Data . Name ) )
279311 {
280312 resourceId = serviceBusNamespaceResource . Id ;
313+
281314 await foreach ( var queue in serviceBusNamespaceResource . GetServiceBusQueues ( )
282315 . WithCancellation ( cancellationToken ) )
283316 {
@@ -372,4 +405,4 @@ public static class AzureServiceBusSettings
372405 public static readonly string ManagementUrl = "ASB/ManagementUrl" ;
373406 public static readonly string ManagementUrlDescription = "Azure management URL" ;
374407 }
375- }
408+ }
0 commit comments