Skip to content

Commit a226649

Browse files
Travis Nickelsbording
authored andcommitted
Update RabbitMQQuery to use the transport management client
1 parent 0d18b21 commit a226649

File tree

2 files changed

+94
-136
lines changed

2 files changed

+94
-136
lines changed

src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@
22
namespace ServiceControl.Transports.RabbitMQ;
33

44
using System.Collections.Generic;
5-
using System.Text.Json;
65
using ServiceControl.Transports.BrokerThroughput;
6+
using NServiceBus.Transport.RabbitMQ.ManagementApi;
77

8-
public class RabbitMQBrokerQueueDetails(JsonElement token) : IBrokerQueue
8+
class RabbitMQBrokerQueueDetails(Queue queue) : IBrokerQueue
99
{
10-
public string QueueName { get; } = token.GetProperty("name").GetString()!;
10+
public string QueueName { get; } = queue.Name;
1111
public string SanitizedName => QueueName;
1212
public string Scope => VHost;
13-
public string VHost { get; } = token.GetProperty("vhost").GetString()!;
13+
public string VHost { get; } = queue.Vhost;
1414
public List<string> EndpointIndicators { get; } = [];
15-
long? AckedMessages { get; set; } = FromToken(token);
16-
long Baseline { get; set; } = FromToken(token) ?? 0;
15+
long? AckedMessages { get; set; } = queue.MessageStats?.Ack;
16+
long Baseline { get; set; } = queue.MessageStats?.Ack ?? 0;
1717

1818
public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading)
1919
{
@@ -32,9 +32,4 @@ public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading)
3232

3333
return newlyAckedMessages;
3434
}
35-
36-
static long? FromToken(JsonElement jsonElement) =>
37-
jsonElement.TryGetProperty("message_stats", out var stats) && stats.TryGetProperty("ack", out var val)
38-
? val.GetInt64()
39-
: null;
4035
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs

Lines changed: 88 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@ namespace ServiceControl.Transports.RabbitMQ;
77
using System.Linq;
88
using System.Net;
99
using System.Net.Http;
10-
using System.Net.Http.Json;
1110
using System.Runtime.CompilerServices;
12-
using System.Text.Json;
13-
using System.Text.Json.Nodes;
1411
using System.Threading;
1512
using System.Threading.Tasks;
1613
using System.Web;
@@ -19,6 +16,7 @@ namespace ServiceControl.Transports.RabbitMQ;
1916
using Polly;
2017
using Polly.Retry;
2118
using ServiceControl.Transports.BrokerThroughput;
19+
using NServiceBus.Transport.RabbitMQ.ManagementApi;
2220

2321
public class RabbitMQQuery : BrokerThroughputQuery
2422
{
@@ -29,7 +27,6 @@ public class RabbitMQQuery : BrokerThroughputQuery
2927
.Build();
3028
readonly ILogger<RabbitMQQuery> logger;
3129
readonly TimeProvider timeProvider;
32-
readonly ConnectionConfiguration connectionConfiguration;
3330
readonly RabbitMQTransport rabbitMQTransport;
3431

3532
public RabbitMQQuery(ILogger<RabbitMQQuery> logger,
@@ -39,81 +36,38 @@ public RabbitMQQuery(ILogger<RabbitMQQuery> logger,
3936
{
4037
this.logger = logger;
4138
this.timeProvider = timeProvider;
42-
if (transportCustomization is IRabbitMQTransportExtensions rabbitMQTransportCustomization)
43-
{
44-
rabbitMQTransport = rabbitMQTransportCustomization.GetTransport();
45-
_ = rabbitMQTransport;
46-
}
47-
else
48-
{
49-
throw new InvalidOperationException($"Expected a RabbitMQTransport but received {transportCustomization.GetType().Name}.");
50-
}
51-
52-
connectionConfiguration = ConnectionConfiguration.Create(transportSettings.ConnectionString, string.Empty);
39+
rabbitMQTransport = GetRabbitMQTransport(transportCustomization);
5340
}
5441

5542
protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
5643
{
57-
// The licensing component configurations take precedence over the management API connection string configuration options
58-
// https://docs.particular.net/servicecontrol/servicecontrol-instances/configuration#usage-reporting-when-using-the-rabbitmq-transport
59-
var userName = GetSettingsValue(settings, RabbitMQSettings.UserName, rabbitMQTransport.ManagementApiConfiguration.UserName);
60-
var password = GetSettingsValue(settings, RabbitMQSettings.Password, rabbitMQTransport.ManagementApiConfiguration.Password);
61-
var apiUrl = GetSettingsValue(settings, RabbitMQSettings.API, rabbitMQTransport.ManagementApiConfiguration.Url);
62-
63-
if (userName != rabbitMQTransport.ManagementApiConfiguration.UserName)
64-
{
65-
_ = Diagnostics.AppendLine($"UserName in settings is different from Management API URL: {userName} != {rabbitMQTransport.ManagementApiConfiguration.UserName}");
66-
}
67-
68-
if (password != rabbitMQTransport.ManagementApiConfiguration.Password)
69-
{
70-
_ = Diagnostics.AppendLine($"Password in settings is different from Management API URL.");
71-
}
72-
73-
if (apiUrl != rabbitMQTransport.ManagementApiConfiguration.Url)
74-
{
75-
_ = Diagnostics.AppendLine($"API URL in settings is different from Management API URL: {apiUrl} != {rabbitMQTransport.ManagementApiConfiguration.Url}");
76-
}
44+
//// TODO: Update documentation
45+
//// https://docs.particular.net/servicecontrol/servicecontrol-instances/configuration#usage-reporting-when-using-the-rabbitmq-transport
46+
CheckLegacySettings(settings, RabbitMQSettings.UserName);
47+
CheckLegacySettings(settings, RabbitMQSettings.Password);
48+
CheckLegacySettings(settings, RabbitMQSettings.API);
49+
}
7750

78-
if (!Uri.TryCreate(apiUrl, UriKind.Absolute, out _))
51+
static RabbitMQTransport GetRabbitMQTransport(ITransportCustomization transportCustomization)
52+
{
53+
if (transportCustomization is IRabbitMQTransportExtensions rabbitMQTransportCustomization)
7954
{
80-
InitialiseErrors.Add("API url configured is invalid");
55+
return rabbitMQTransportCustomization.GetTransport();
8156
}
8257

83-
var defaultCredential = new NetworkCredential(userName, password);
84-
85-
if (InitialiseErrors.Count == 0)
86-
{
87-
// ideally we would use the HttpClientFactory, but it would be a bit more involved to set that up
88-
// so for now we are using a virtual method that can be overriden in tests
89-
// https://github.com/Particular/ServiceControl/issues/4493
90-
httpClient = CreateHttpClient(defaultCredential, apiUrl);
91-
}
58+
throw new InvalidOperationException($"Expected a RabbitMQTransport but received {transportCustomization.GetType().Name}.");
9259
}
9360

94-
string GetSettingsValue(ReadOnlyDictionary<string, string> settings, string key, string defaultValue)
61+
void CheckLegacySettings(ReadOnlyDictionary<string, string> settings, string key)
9562
{
96-
if (!settings.TryGetValue(key, out string? value) ||
97-
string.IsNullOrEmpty(value))
98-
{
99-
logger.LogInformation($"Using {key} from connection string");
100-
value = defaultValue;
101-
_ = Diagnostics.AppendLine(
102-
$"{key} not set, defaulted to using {key} from the ConnectionString used by instance");
103-
}
104-
else
63+
if (settings.TryGetValue(key, out _))
10564
{
106-
if (key == RabbitMQSettings.Password)
107-
{
108-
_ = Diagnostics.AppendLine($"{key} set.");
109-
}
110-
111-
_ = Diagnostics.AppendLine($"{key} set to {value}.");
65+
logger.LogInformation($"The legacy LicensingComponent/{key} is still defined in the app.config or environment variables");
66+
_ = Diagnostics.AppendLine($"LicensingComponent/{key} is still defined in the app.config or environment variables");
11267
}
113-
114-
return value;
11568
}
11669

70+
// TODO: Determine if this needs to be updated in the RabbitMQ Transport
11771
protected virtual HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) =>
11872
new(new SocketsHttpHandler
11973
{
@@ -130,16 +84,31 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
13084
var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}";
13185

13286
logger.LogDebug($"Querying {url}");
133-
var newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync<JsonElement>(url, token)), cancellationToken);
87+
88+
var response = await pipeline.ExecuteAsync(async token => await rabbitMQTransport.ManagementClient.GetQueue(queue.QueueName, cancellationToken), cancellationToken);
89+
90+
if (!response.HasValue)
91+
{
92+
throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
93+
}
94+
95+
var newReading = new RabbitMQBrokerQueueDetails(response.Value);
96+
13497
_ = queue.CalculateThroughputFrom(newReading);
13598

13699
// looping for 24hrs, in 4 increments of 15 minutes
137100
for (var i = 0; i < 24 * 4; i++)
138101
{
139102
await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken);
140103
logger.LogDebug($"Querying {url}");
141-
newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync<JsonElement>(url, token)), cancellationToken);
104+
response = await pipeline.ExecuteAsync(async token => await rabbitMQTransport.ManagementClient.GetQueue(queue.QueueName, cancellationToken), cancellationToken);
142105

106+
if (!response.HasValue)
107+
{
108+
throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
109+
}
110+
111+
newReading = new RabbitMQBrokerQueueDetails(response.Value);
143112
var newTotalThroughput = queue.CalculateThroughputFrom(newReading);
144113
yield return new QueueThroughput
145114
{
@@ -151,31 +120,32 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
151120

152121
async Task<(string rabbitVersion, string managementVersion)> GetRabbitDetails(bool skipResiliencePipeline, CancellationToken cancellationToken)
153122
{
154-
var overviewUrl = "/api/overview";
123+
Response<Overview?> response = skipResiliencePipeline
124+
? await rabbitMQTransport.ManagementClient.GetOverview(cancellationToken)
125+
: await pipeline.ExecuteAsync(async async => await rabbitMQTransport.ManagementClient.GetOverview(cancellationToken), cancellationToken);
155126

156-
JsonObject obj;
127+
var overview = GetResponseValue(response);
157128

158-
if (skipResiliencePipeline)
129+
if (overview.DisableStats)
159130
{
160-
obj = (await httpClient!.GetFromJsonAsync<JsonObject>(overviewUrl, cancellationToken))!;
161-
}
162-
else
163-
{
164-
obj = (await pipeline.ExecuteAsync(async token =>
165-
await httpClient!.GetFromJsonAsync<JsonObject>(overviewUrl, token), cancellationToken))!;
131+
throw new Exception("The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' " +
132+
"and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker.");
166133
}
167134

168-
var statsDisabled = obj["disable_stats"]?.GetValue<bool>() ?? false;
135+
var rabbitVersion = response.Value?.BrokerVersion ?? response.Value?.ProductVersion;
136+
var mgmtVersion = response.Value?.ManagementVersion;
169137

170-
if (statsDisabled)
138+
return (rabbitVersion?.ToString() ?? "Unknown", mgmtVersion?.ToString() ?? "Unknown");
139+
}
140+
141+
static T GetResponseValue<T>(Response<T?> response) where T : class
142+
{
143+
if (!response.HasValue || response.Value is null)
171144
{
172-
throw new Exception("The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker.");
145+
throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
173146
}
174147

175-
var rabbitVersion = obj["rabbitmq_version"] ?? obj["product_version"];
176-
var mgmtVersion = obj["management_version"];
177-
178-
return (rabbitVersion?.GetValue<string>() ?? "Unknown", mgmtVersion?.GetValue<string>() ?? "Unknown");
148+
return response.Value;
179149
}
180150

181151
public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
@@ -219,16 +189,15 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
219189
{
220190
try
221191
{
222-
var bindingsUrl = $"/api/queues/{HttpUtility.UrlEncode(brokerQueue.VHost)}/{HttpUtility.UrlEncode(brokerQueue.QueueName)}/bindings";
223-
var bindings = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonArray>(bindingsUrl, token), cancellationToken);
224-
var conventionalBindingFound = bindings?.Any(binding => binding!["source"]?.GetValue<string>() == brokerQueue.QueueName
225-
&& binding["vhost"]?.GetValue<string>() == brokerQueue.VHost
226-
&& binding["destination"]?.GetValue<string>() == brokerQueue.QueueName
227-
&& binding["destination_type"]?.GetValue<string>() == "queue"
228-
&& binding["routing_key"]?.GetValue<string>() == string.Empty
229-
&& binding["properties_key"]?.GetValue<string>() == "~") ?? false;
230-
231-
if (conventionalBindingFound)
192+
var response = await pipeline.ExecuteAsync(async token => await rabbitMQTransport.ManagementClient.GetQueueBindings(brokerQueue.QueueName, cancellationToken), cancellationToken);
193+
194+
// Check if conventional binding is found
195+
if (response.Value.Any(binding => binding?.Source == brokerQueue.QueueName
196+
&& binding?.Vhost == brokerQueue.VHost
197+
&& binding?.Destination == brokerQueue.QueueName
198+
&& binding?.DestinationType == "queue"
199+
&& binding?.RoutingKey == string.Empty
200+
&& binding?.PropertiesKey == "~"))
232201
{
233202
brokerQueue.EndpointIndicators.Add("ConventionalTopologyBinding");
234203
}
@@ -240,20 +209,14 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
240209

241210
try
242211
{
243-
var exchangeUrl = $"/api/exchanges/{HttpUtility.UrlEncode(brokerQueue.VHost)}/{HttpUtility.UrlEncode(brokerQueue.QueueName)}/bindings/destination";
244-
var bindings = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonArray>(exchangeUrl, token), cancellationToken);
245-
var delayBindingFound = bindings?.Any(binding =>
246-
{
247-
var source = binding!["source"]?.GetValue<string>();
248-
249-
return source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
250-
&& binding["vhost"]?.GetValue<string>() == brokerQueue.VHost
251-
&& binding["destination"]?.GetValue<string>() == brokerQueue.QueueName
252-
&& binding["destination_type"]?.GetValue<string>() == "exchange"
253-
&& binding["routing_key"]?.GetValue<string>() == $"#.{brokerQueue.QueueName}";
254-
}) ?? false;
255-
256-
if (delayBindingFound)
212+
var response = await pipeline.ExecuteAsync(async token => await rabbitMQTransport.ManagementClient.GetExchangeBindingsDestination(brokerQueue.QueueName, cancellationToken), cancellationToken);
213+
214+
// Check if delayed binding is found
215+
if (response.Value.Any(binding => binding?.Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
216+
&& binding?.Vhost == brokerQueue.VHost
217+
&& binding?.Destination == brokerQueue.QueueName
218+
&& binding?.DestinationType == "exchange"
219+
&& binding?.RoutingKey == $"#.{brokerQueue.QueueName}"))
257220
{
258221
brokerQueue.EndpointIndicators.Add("DelayBinding");
259222
}
@@ -264,41 +227,41 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
264227
}
265228
}
266229

267-
public async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
230+
internal async Task<(List<RabbitMQBrokerQueueDetails>?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
268231
{
269-
var url = $"/api/queues/{HttpUtility.UrlEncode(connectionConfiguration.VirtualHost)}?page={page}&page_size=500&name=&use_regex=false&pagination=true";
270-
271-
var container = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonNode>(url, token), cancellationToken);
272-
switch (container)
232+
var pagination = await pipeline.ExecuteAsync(async token => await rabbitMQTransport.ManagementClient.GetPage(page, cancellationToken), cancellationToken);
233+
switch (pagination.Value)
273234
{
274-
case JsonObject obj:
235+
case Pagination obj:
275236
{
276-
var pageCount = obj["page_count"]!.GetValue<int>();
277-
var pageReturned = obj["page"]!.GetValue<int>();
237+
var pageCount = obj.PageCount;
238+
var pageReturned = obj.Page;
278239

279-
if (obj["items"] is not JsonArray items)
240+
if (obj.Items is null) //is not JsonArray items
280241
{
281242
return (null, false);
282243
}
283244

284-
return (MaterializeQueueDetails(items), pageCount > pageReturned);
245+
return (MaterializeQueueDetails(obj.Items), pageCount > pageReturned);
285246
}
286247
// Older versions of RabbitMQ API did not have paging and returned the array of items directly
287-
case JsonArray arr:
288-
{
289-
return (MaterializeQueueDetails(arr), false);
290-
}
248+
//case JsonArray arr:
249+
// {
250+
// return (MaterializeQueueDetails(arr), false);
251+
// }
291252
default:
292253
throw new Exception("Was not able to get list of queues from RabbitMQ broker.");
293254
}
294255
}
295256

296-
static RabbitMQBrokerQueueDetails[] MaterializeQueueDetails(JsonArray items)
257+
static List<RabbitMQBrokerQueueDetails> MaterializeQueueDetails(List<Queue> items)
297258
{
298-
// It is not possible to directly operated on the JsonNode. When the JsonNode is a JObject
299-
// and the indexer is access the internal dictionary is initialized which can cause key not found exceptions
300-
// when the payload contains the same key multiple times (which happened in the past).
301-
var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!.Deserialize<JsonElement>())).ToArray();
259+
var queues = new List<RabbitMQBrokerQueueDetails>();
260+
foreach (var item in items)
261+
{
262+
queues.Add(new RabbitMQBrokerQueueDetails(item));
263+
}
264+
302265
return queues;
303266
}
304267

0 commit comments

Comments
 (0)