Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Monitor.Query" Version="1.6.0" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="ByteSize" Version="2.1.2" />
<PackageVersion Include="Caliburn.Micro" Version="4.0.230" />
Expand All @@ -17,8 +17,8 @@
<PackageVersion Include="Fody" Version="6.9.1" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="HdrHistogram" Version="2.5.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.17" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.17" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.21" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.21" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="8.0.2" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
Expand Down
109 changes: 71 additions & 38 deletions src/ServiceControl.Transports.ASBS/AzureQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace ServiceControl.Transports.ASBS;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Identity;
using Azure.Monitor.Query;
using Azure.Monitor.Query.Models;
using Azure.Monitor.Query.Metrics;
using Azure.Monitor.Query.Metrics.Models;
using Azure.ResourceManager;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.ServiceBus;
Expand All @@ -26,10 +26,12 @@ public class AzureQuery(ILogger<AzureQuery> logger, TimeProvider timeProvider, T
: BrokerThroughputQuery(logger, "AzureServiceBus")
{
string serviceBusName = string.Empty;
MetricsQueryClient? client;
ArmClient? armClient;
string? resourceId;
TokenCredential? credential;
ResourceIdentifier? resourceId;
ArmEnvironment armEnvironment;
MetricsClientAudience metricsClientAudience;
MetricsClient? metricsClient;

protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
{
Expand Down Expand Up @@ -102,7 +104,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
Diagnostics.AppendLine("Client secret set");
}

(armEnvironment, var metricsQueryAudience) = GetEnvironment();
(armEnvironment, metricsClientAudience) = GetEnvironment();

if (managementUrl == null)
{
Expand All @@ -118,28 +120,17 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
return;
}

TokenCredential clientCredentials;
if (connectionSettings.AuthenticationMethod is TokenCredentialAuthentication tokenCredentialAuthentication)
{
Diagnostics.AppendLine("Attempting to use managed identity");
clientCredentials = tokenCredentialAuthentication.Credential;
credential = tokenCredentialAuthentication.Credential;
}
else
{
clientCredentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
credential = new ClientSecretCredential(tenantId, clientId, clientSecret);
}

client = new MetricsQueryClient(armEnvironment.Endpoint, clientCredentials,
new MetricsQueryClientOptions
{
Audience = metricsQueryAudience,
Transport = new HttpClientTransport(
new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
}))
});
armClient = new ArmClient(clientCredentials, subscriptionId,
armClient = new ArmClient(credential, subscriptionId,
new ArmClientOptions
{
Environment = armEnvironment,
Expand All @@ -152,31 +143,31 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin

return;

(ArmEnvironment armEnvironment, MetricsQueryAudience metricsQueryAudience) GetEnvironment()
(ArmEnvironment armEnvironment, MetricsClientAudience metricsClientAudience) GetEnvironment()
{
if (managementUrlParsed == null)
{
return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzurePublicCloud.Endpoint)
{
return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzureChina.Endpoint)
{
return (ArmEnvironment.AzureChina, MetricsQueryAudience.AzureChina);
return (ArmEnvironment.AzureChina, MetricsClientAudience.AzureChina);
}

if (managementUrlParsed == ArmEnvironment.AzureGermany.Endpoint)
{
return (ArmEnvironment.AzureGermany, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzureGermany, MetricsClientAudience.AzurePublicCloud);
}

if (managementUrlParsed == ArmEnvironment.AzureGovernment.Endpoint)
{
return (ArmEnvironment.AzureGovernment, MetricsQueryAudience.AzureGovernment);
return (ArmEnvironment.AzureGovernment, MetricsClientAudience.AzureGovernment);
}

string options = string.Join(", ",
Expand All @@ -187,7 +178,7 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
}.Select(armEnvironment => $"\"{armEnvironment.Endpoint}\""));
InitialiseErrors.Add($"Management url configuration is invalid, available options are {options}");

return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}
}

Expand Down Expand Up @@ -229,7 +220,6 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
while (currentDate <= endDate)
{
data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate });

currentDate = currentDate.AddDays(1);
}

Expand All @@ -244,23 +234,67 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
}
}

async Task<MetricsClient> InitializeMetricsClient(CancellationToken cancellationToken = default)
{
if (resourceId is null || armClient is null || credential is null)
{
throw new InvalidOperationException("AzureQuery has not been initialized correctly.");
}

var serviceBusNamespaceResource = await armClient
.GetServiceBusNamespaceResource(resourceId).GetAsync(cancellationToken)
?? throw new Exception($"Could not find ServiceBus with resource Id: \"{resourceId}\"");

// Determine the region of the namespace
var regionName = serviceBusNamespaceResource.Value.Data.Location.Name;

// Build the regional Azure Monitor Metrics endpoint from the audience
var metricsEndpoint = BuildMetricsEndpoint(metricsClientAudience, regionName);

// CreateNewOnMetadataUpdateAttribute the MetricsClient for this namespace
return new MetricsClient(
metricsEndpoint,
credential!,
new MetricsClientOptions
{
Audience = metricsClientAudience,
Transport = new HttpClientTransport(
new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
}))
});
}

static Uri BuildMetricsEndpoint(MetricsClientAudience audience, string regionName)
{
var region = regionName.ToLowerInvariant();
var builder = new UriBuilder(audience.ToString());
builder.Host = $"{region}.{builder.Host}";
return builder.Uri;
}

async Task<IReadOnlyList<MetricValue>> GetMetrics(string queueName, DateOnly startTime, DateOnly endTime,
CancellationToken cancellationToken = default)
{
var response = await client!.QueryResourceAsync(resourceId,
new[] { "CompleteMessage" },
new MetricsQueryOptions
metricsClient ??= await InitializeMetricsClient(cancellationToken);

var response = await metricsClient.QueryResourcesAsync(
[new ResourceIdentifier(resourceId!)],
["CompleteMessage"],
"Microsoft.ServiceBus/namespaces",
new MetricsQueryResourcesOptions
{
Filter = $"EntityName eq '{queueName}'",
TimeRange = new QueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
TimeRange = new MetricsQueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
Granularity = TimeSpan.FromDays(1)
},
cancellationToken);

var metricValues =
response.Value.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];
response.Value.Values.FirstOrDefault()?.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];

return metricValues;
return metricValues.AsReadOnly();
}

public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
Expand All @@ -269,15 +303,14 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
var validNamespaces = await GetValidNamespaceNames(cancellationToken);

SubscriptionResource? subscription = await armClient!.GetDefaultSubscriptionAsync(cancellationToken);
var namespaces =
subscription.GetServiceBusNamespacesAsync(cancellationToken);
var namespaces = subscription.GetServiceBusNamespacesAsync(cancellationToken);

await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(
cancellationToken))
await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(cancellationToken))
{
if (validNamespaces.Contains(serviceBusNamespaceResource.Data.Name))
{
resourceId = serviceBusNamespaceResource.Id;

await foreach (var queue in serviceBusNamespaceResource.GetServiceBusQueues()
.WithCancellation(cancellationToken))
{
Expand Down Expand Up @@ -372,4 +405,4 @@ public static class AzureServiceBusSettings
public static readonly string ManagementUrl = "ASB/ManagementUrl";
public static readonly string ManagementUrlDescription = "Azure management URL";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Monitor.Query" />
<PackageReference Include="Azure.Monitor.Query.Metrics" />
<PackageReference Include="Azure.ResourceManager.ServiceBus" />
<PackageReference Include="DnsClient" />
<PackageReference Include="NServiceBus.CustomChecks" />
Expand Down