Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Monitor.Query" Version="1.6.0" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="ByteSize" Version="2.1.2" />
<PackageVersion Include="Caliburn.Micro" Version="4.0.230" />
Expand All @@ -17,8 +17,8 @@
<PackageVersion Include="Fody" Version="6.9.1" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="HdrHistogram" Version="2.5.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.17" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.17" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.21" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.21" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="8.0.2" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
Expand Down
97 changes: 62 additions & 35 deletions src/ServiceControl.Transports.ASBS/AzureQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace ServiceControl.Transports.ASBS;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Identity;
using Azure.Monitor.Query;
using Azure.Monitor.Query.Models;
using Azure.Monitor.Query.Metrics;
using Azure.Monitor.Query.Metrics.Models;
using Azure.ResourceManager;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.ServiceBus;
Expand All @@ -26,10 +26,12 @@ public class AzureQuery(ILogger<AzureQuery> logger, TimeProvider timeProvider, T
: BrokerThroughputQuery(logger, "AzureServiceBus")
{
string serviceBusName = string.Empty;
MetricsQueryClient? client;
MetricsClient? client;
ArmClient? armClient;
TokenCredential? credential;
string? resourceId;
ArmEnvironment armEnvironment;
MetricsClientAudience metricsQueryAudience;

protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
{
Expand Down Expand Up @@ -102,7 +104,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
Diagnostics.AppendLine("Client secret set");
}

(armEnvironment, var metricsQueryAudience) = GetEnvironment();
(armEnvironment, metricsQueryAudience) = GetEnvironment();

if (managementUrl == null)
{
Expand All @@ -118,28 +120,17 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
return;
}

TokenCredential clientCredentials;
if (connectionSettings.AuthenticationMethod is TokenCredentialAuthentication tokenCredentialAuthentication)
{
Diagnostics.AppendLine("Attempting to use managed identity");
clientCredentials = tokenCredentialAuthentication.Credential;
credential = tokenCredentialAuthentication.Credential;
}
else
{
clientCredentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
credential = new ClientSecretCredential(tenantId, clientId, clientSecret);
}

client = new MetricsQueryClient(armEnvironment.Endpoint, clientCredentials,
new MetricsQueryClientOptions
{
Audience = metricsQueryAudience,
Transport = new HttpClientTransport(
new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
}))
});
armClient = new ArmClient(clientCredentials, subscriptionId,
armClient = new ArmClient(credential, subscriptionId,
new ArmClientOptions
{
Environment = armEnvironment,
Expand All @@ -152,31 +143,31 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin

return;

(ArmEnvironment armEnvironment, MetricsQueryAudience metricsQueryAudience) GetEnvironment()
(ArmEnvironment armEnvironment, MetricsClientAudience metricsQueryAudience) GetEnvironment()
{
if (managementUrlParsed == null)
{
return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzurePublicCloud.Endpoint)
{
return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzureChina.Endpoint)
{
return (ArmEnvironment.AzureChina, MetricsQueryAudience.AzureChina);
return (ArmEnvironment.AzureChina, MetricsClientAudience.AzureChina);
}

if (managementUrlParsed == ArmEnvironment.AzureGermany.Endpoint)
{
return (ArmEnvironment.AzureGermany, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzureGermany, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzureGovernment.Endpoint)
{
return (ArmEnvironment.AzureGovernment, MetricsQueryAudience.AzureGovernment);
return (ArmEnvironment.AzureGovernment, MetricsClientAudience.AzureGovernment);
}

string options = string.Join(", ",
Expand All @@ -187,7 +178,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
}.Select(armEnvironment => $"\"{armEnvironment.Endpoint}\""));
InitialiseErrors.Add($"Management url configuration is invalid, available options are {options}");

return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}
}

Expand Down Expand Up @@ -229,7 +220,6 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
while (currentDate <= endDate)
{
data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate });

currentDate = currentDate.AddDays(1);
}

Expand All @@ -247,20 +237,22 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
async Task<IReadOnlyList<MetricValue>> GetMetrics(string queueName, DateOnly startTime, DateOnly endTime,
CancellationToken cancellationToken = default)
{
var response = await client!.QueryResourceAsync(resourceId,
new[] { "CompleteMessage" },
new MetricsQueryOptions
var response = await client!.QueryResourcesAsync(
[new ResourceIdentifier(resourceId!)],
["CompleteMessage"],
"Microsoft.ServiceBus/namespaces",
new MetricsQueryResourcesOptions
{
Filter = $"EntityName eq '{queueName}'",
TimeRange = new QueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
TimeRange = new MetricsQueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
Granularity = TimeSpan.FromDays(1)
},
cancellationToken);

var metricValues =
response.Value.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];
response.Value.Values.FirstOrDefault()?.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];

return metricValues;
return metricValues.AsReadOnly();
}

public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
Expand All @@ -272,12 +264,32 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
var namespaces =
subscription.GetServiceBusNamespacesAsync(cancellationToken);

await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(
cancellationToken))
await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(cancellationToken))
{
if (validNamespaces.Contains(serviceBusNamespaceResource.Data.Name))
{
resourceId = serviceBusNamespaceResource.Id;

// Determine the region of the namespace
var regionName = serviceBusNamespaceResource.Data.Location.Name;

// Build the regional Azure Monitor Metrics endpoint from the audience
var metricsEndpoint = BuildMetricsEndpointFromAudience(metricsQueryAudience, regionName);

// CreateNewOnMetadataUpdateAttribute the MetricsClient for this namespace
client = new MetricsClient(
metricsEndpoint,
credential!,
new MetricsClientOptions
{
Audience = metricsQueryAudience,
Transport = new HttpClientTransport(
new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
}))
});

await foreach (var queue in serviceBusNamespaceResource.GetServiceBusQueues()
.WithCancellation(cancellationToken))
{
Expand All @@ -301,6 +313,21 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
{ ArmEnvironment.AzureChina, "servicebus.chinacloudapi.cn" },
};

// Build metrics endpoint host directly from the configured audience.
Uri BuildMetricsEndpointFromAudience(MetricsClientAudience audience, string regionName)
{
var region = regionName.ToLowerInvariant();

var audienceUri = new Uri(audience.ToString());
var audienceHost = audienceUri.Host; // e.g., "metrics.monitor.azure.com"

var regionalHost = audienceHost.StartsWith("metrics.", StringComparison.OrdinalIgnoreCase)
? audienceHost.Replace("metrics.", $"{region}.metrics.")
: $"{region}.metrics.{audienceHost}";

return new Uri($"https://{regionalHost}");
}

async Task<HashSet<string>> GetValidNamespaceNames(CancellationToken cancellationToken = default)
{
var validNamespaces = new HashSet<string>(StringComparer.OrdinalIgnoreCase) { serviceBusName };
Expand Down Expand Up @@ -372,4 +399,4 @@ public static class AzureServiceBusSettings
public static readonly string ManagementUrl = "ASB/ManagementUrl";
public static readonly string ManagementUrlDescription = "Azure management URL";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Monitor.Query" />
<PackageReference Include="Azure.Monitor.Query.Metrics" />
<PackageReference Include="Azure.ResourceManager.ServiceBus" />
<PackageReference Include="DnsClient" />
<PackageReference Include="NServiceBus.CustomChecks" />
Expand Down