Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Monitor.Query" Version="1.6.0" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="ByteSize" Version="2.1.2" />
<PackageVersion Include="Caliburn.Micro" Version="4.0.230" />
Expand All @@ -17,8 +17,8 @@
<PackageVersion Include="Fody" Version="6.9.1" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="HdrHistogram" Version="2.5.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.17" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.17" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.21" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.21" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="8.0.2" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
Expand Down
103 changes: 68 additions & 35 deletions src/ServiceControl.Transports.ASBS/AzureQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace ServiceControl.Transports.ASBS;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Identity;
using Azure.Monitor.Query;
using Azure.Monitor.Query.Models;
using Azure.Monitor.Query.Metrics;
using Azure.Monitor.Query.Metrics.Models;
using Azure.ResourceManager;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.ServiceBus;
Expand All @@ -26,10 +26,13 @@ public class AzureQuery(ILogger<AzureQuery> logger, TimeProvider timeProvider, T
: BrokerThroughputQuery(logger, "AzureServiceBus")
{
string serviceBusName = string.Empty;
MetricsQueryClient? client;
MetricsClient? client;
ArmClient? armClient;
TokenCredential? credential;
Uri? metricsEndpoint;
string? resourceId;
ArmEnvironment armEnvironment;
MetricsClientAudience metricsQueryAudience;

protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
{
Expand Down Expand Up @@ -102,7 +105,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
Diagnostics.AppendLine("Client secret set");
}

(armEnvironment, var metricsQueryAudience) = GetEnvironment();
(armEnvironment, metricsQueryAudience) = GetEnvironment();

if (managementUrl == null)
{
Expand All @@ -118,28 +121,17 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
return;
}

TokenCredential clientCredentials;
if (connectionSettings.AuthenticationMethod is TokenCredentialAuthentication tokenCredentialAuthentication)
{
Diagnostics.AppendLine("Attempting to use managed identity");
clientCredentials = tokenCredentialAuthentication.Credential;
credential = tokenCredentialAuthentication.Credential;
}
else
{
clientCredentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
credential = new ClientSecretCredential(tenantId, clientId, clientSecret);
}

client = new MetricsQueryClient(armEnvironment.Endpoint, clientCredentials,
new MetricsQueryClientOptions
{
Audience = metricsQueryAudience,
Transport = new HttpClientTransport(
new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
}))
});
armClient = new ArmClient(clientCredentials, subscriptionId,
armClient = new ArmClient(credential, subscriptionId,
new ArmClientOptions
{
Environment = armEnvironment,
Expand All @@ -152,31 +144,31 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin

return;

(ArmEnvironment armEnvironment, MetricsQueryAudience metricsQueryAudience) GetEnvironment()
(ArmEnvironment armEnvironment, MetricsClientAudience metricsQueryAudience) GetEnvironment()
{
if (managementUrlParsed == null)
{
return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzurePublicCloud.Endpoint)
{
return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzureChina.Endpoint)
{
return (ArmEnvironment.AzureChina, MetricsQueryAudience.AzureChina);
return (ArmEnvironment.AzureChina, MetricsClientAudience.AzureChina);
}

if (managementUrlParsed == ArmEnvironment.AzureGermany.Endpoint)
{
return (ArmEnvironment.AzureGermany, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzureGermany, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzureGovernment.Endpoint)
{
return (ArmEnvironment.AzureGovernment, MetricsQueryAudience.AzureGovernment);
return (ArmEnvironment.AzureGovernment, MetricsClientAudience.AzureGovernment);
}

string options = string.Join(", ",
Expand All @@ -187,7 +179,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
}.Select(armEnvironment => $"\"{armEnvironment.Endpoint}\""));
InitialiseErrors.Add($"Management url configuration is invalid, available options are {options}");

return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}
}

Expand Down Expand Up @@ -229,7 +221,6 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
while (currentDate <= endDate)
{
data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate });

currentDate = currentDate.AddDays(1);
}

Expand All @@ -247,20 +238,22 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
async Task<IReadOnlyList<MetricValue>> GetMetrics(string queueName, DateOnly startTime, DateOnly endTime,
CancellationToken cancellationToken = default)
{
var response = await client!.QueryResourceAsync(resourceId,
new[] { "CompleteMessage" },
new MetricsQueryOptions
var response = await client!.QueryResourcesAsync(
[new ResourceIdentifier(resourceId!)],
["CompleteMessage"],
"Microsoft.ServiceBus/namespaces",
new MetricsQueryResourcesOptions
{
Filter = $"EntityName eq '{queueName}'",
TimeRange = new QueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
TimeRange = new MetricsQueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
Granularity = TimeSpan.FromDays(1)
},
cancellationToken);

var metricValues =
response.Value.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];
response.Value.Values.FirstOrDefault()?.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];

return metricValues;
return metricValues.AsReadOnly();
}

public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
Expand All @@ -272,12 +265,37 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
var namespaces =
subscription.GetServiceBusNamespacesAsync(cancellationToken);

await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(
cancellationToken))
await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(cancellationToken))
{
if (validNamespaces.Contains(serviceBusNamespaceResource.Data.Name))
{
resourceId = serviceBusNamespaceResource.Id;

// Determine the region of the namespace
var regionName = serviceBusNamespaceResource.Data.Location.Name;

// Build the regional Azure Monitor Metrics endpoint from the audience
var newEndpoint = BuildMetricsEndpointFromAudience(metricsQueryAudience, regionName);

// Create or refresh the MetricsClient if it's missing or points to a different region
if (client is null || metricsEndpoint?.ToString() != newEndpoint.ToString())
{
metricsEndpoint = newEndpoint;

client = new MetricsClient(
metricsEndpoint,
credential!,
new MetricsClientOptions
{
Audience = metricsQueryAudience,
Transport = new HttpClientTransport(
new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
}))
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the motivation to move the initialisation of the client to this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metricsEndpoint needs to specify the region which can only be done once we have the ASB resource. Currently, that information is not available in InitializeCore. As per discussion, we are considering alternative options.

}

await foreach (var queue in serviceBusNamespaceResource.GetServiceBusQueues()
.WithCancellation(cancellationToken))
{
Expand All @@ -301,6 +319,21 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
{ ArmEnvironment.AzureChina, "servicebus.chinacloudapi.cn" },
};

// Build metrics endpoint host directly from the configured audience.
Uri BuildMetricsEndpointFromAudience(MetricsClientAudience audience, string regionName)
{
var region = regionName.ToLowerInvariant();

var audienceUri = new Uri(audience.ToString());
var audienceHost = audienceUri.Host; // e.g., "metrics.monitor.azure.com"

var regionalHost = audienceHost.StartsWith("metrics.", StringComparison.OrdinalIgnoreCase)
? audienceHost.Replace("metrics.", $"{region}.metrics.")
: $"{region}.metrics.{audienceHost}";

return new Uri($"https://{regionalHost}");
}

async Task<HashSet<string>> GetValidNamespaceNames(CancellationToken cancellationToken = default)
{
var validNamespaces = new HashSet<string>(StringComparer.OrdinalIgnoreCase) { serviceBusName };
Expand Down Expand Up @@ -372,4 +405,4 @@ public static class AzureServiceBusSettings
public static readonly string ManagementUrl = "ASB/ManagementUrl";
public static readonly string ManagementUrlDescription = "Azure management URL";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Monitor.Query" />
<PackageReference Include="Azure.Monitor.Query.Metrics" />
<PackageReference Include="Azure.ResourceManager.ServiceBus" />
<PackageReference Include="DnsClient" />
<PackageReference Include="NServiceBus.CustomChecks" />
Expand Down