Skip to content

Commit 9587913

Browse files
committed
Update to 10.0.0-beta.4
1 parent 3d58e87 commit 9587913

File tree

4 files changed

+45
-101
lines changed

4 files changed

+45
-101
lines changed

src/Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<PackageVersion Include="NServiceBus.Metrics" Version="5.0.1" />
3939
<PackageVersion Include="NServiceBus.Metrics.ServiceControl" Version="5.0.0" />
4040
<PackageVersion Include="NServiceBus.Persistence.NonDurable" Version="2.0.1" />
41-
<PackageVersion Include="NServiceBus.RabbitMQ" Version="10.0.0-beta.3" />
41+
<PackageVersion Include="NServiceBus.RabbitMQ" Version="10.0.0-beta.4" />
4242
<PackageVersion Include="NServiceBus.SagaAudit" Version="5.0.2" />
4343
<PackageVersion Include="NServiceBus.Testing" Version="9.0.1" />
4444
<PackageVersion Include="NServiceBus.Transport.AzureServiceBus" Version="5.0.0" />

src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,10 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
8484

8585
try
8686
{
87-
var (statusCode, reason, queue) = await managementClient.Value.GetQueue(queueName, cancellationToken);
88-
89-
if (queue is not null)
90-
{
91-
var size = queue.MessageCount;
92-
sizes.AddOrUpdate(queueName, _ => size, (_, _) => size);
93-
}
94-
else
95-
{
96-
Logger.Warn($"Error querying queue length for {queueName}. {statusCode}: {reason}");
97-
}
87+
var queue = await managementClient.Value.GetQueue(queueName, cancellationToken);
9888

89+
var size = queue.MessageCount;
90+
sizes.AddOrUpdate(queueName, _ => size, (_, _) => size);
9991
}
10092
catch (Exception e)
10193
{

src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs

Lines changed: 36 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -42,32 +42,21 @@ public RabbitMQQuery(ILogger<RabbitMQQuery> logger, TimeProvider timeProvider, I
4242

4343
public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQueue brokerQueue, DateOnly startDate, [EnumeratorCancellation] CancellationToken cancellationToken = default)
4444
{
45-
var queue = (RabbitMQBrokerQueue)brokerQueue;
46-
var response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(queue.QueueName, token), cancellationToken);
45+
var rabbitBrokerQueue = (RabbitMQBrokerQueue)brokerQueue;
46+
var queue = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(rabbitBrokerQueue.QueueName, token), cancellationToken);
47+
var newReading = new RabbitMQBrokerQueue(queue);
4748

48-
if (response.Value is null)
49-
{
50-
throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
51-
}
52-
53-
var newReading = new RabbitMQBrokerQueue(response.Value);
54-
55-
_ = queue.CalculateThroughputFrom(newReading);
49+
_ = rabbitBrokerQueue.CalculateThroughputFrom(newReading);
5650

5751
// looping for 24hrs, in 4 increments of 15 minutes
5852
for (var i = 0; i < 24 * 4; i++)
5953
{
6054
await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken);
6155

62-
response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(queue.QueueName, token), cancellationToken);
63-
64-
if (response.Value is null)
65-
{
66-
throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
67-
}
56+
queue = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(rabbitBrokerQueue.QueueName, token), cancellationToken);
57+
newReading = new RabbitMQBrokerQueue(queue);
6858

69-
newReading = new RabbitMQBrokerQueue(response.Value);
70-
var newTotalThroughput = queue.CalculateThroughputFrom(newReading);
59+
var newTotalThroughput = rabbitBrokerQueue.CalculateThroughputFrom(newReading);
7160

7261
yield return new QueueThroughput
7362
{
@@ -86,25 +75,20 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames([EnumeratorCa
8675

8776
do
8877
{
89-
(var statusCode, var reason, var queues, morePages) = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueues(page, 500, token), cancellationToken);
90-
91-
ValidateResponse((statusCode, reason, queues));
78+
(var queues, morePages) = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueues(page, 500, token), cancellationToken);
9279

93-
if (queues is not null)
80+
foreach (var queue in queues)
9481
{
95-
foreach (var queue in queues)
82+
if (queue.Name.StartsWith("nsb.delay-level-") ||
83+
queue.Name.StartsWith("nsb.v2.delay-level-") ||
84+
queue.Name.StartsWith("nsb.v2.verify-"))
9685
{
97-
if (queue.Name.StartsWith("nsb.delay-level-") ||
98-
queue.Name.StartsWith("nsb.v2.delay-level-") ||
99-
queue.Name.StartsWith("nsb.v2.verify-"))
100-
{
101-
continue;
102-
}
103-
104-
var brokerQueue = new RabbitMQBrokerQueue(queue);
105-
await AddEndpointIndicators(brokerQueue, cancellationToken);
106-
yield return brokerQueue;
86+
continue;
10787
}
88+
89+
var brokerQueue = new RabbitMQBrokerQueue(queue);
90+
await AddEndpointIndicators(brokerQueue, cancellationToken);
91+
yield return brokerQueue;
10892
}
10993

11094
page++;
@@ -113,30 +97,28 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames([EnumeratorCa
11397

11498
async Task GetBrokerDetails(CancellationToken cancellationToken)
11599
{
116-
var response = await pipeline.ExecuteAsync(async async => await managementClient.Value.GetOverview(cancellationToken), cancellationToken);
100+
var overview = await pipeline.ExecuteAsync(async async => await managementClient.Value.GetOverview(cancellationToken), cancellationToken);
117101

118-
ValidateResponse(response);
119-
120-
if (response.Value?.DisableStats ?? false)
102+
if (overview.DisableStats)
121103
{
122104
throw new Exception(disableStatsErrorMessage);
123105
}
124106

125-
Data["RabbitMQVersion"] = response.Value?.BrokerVersion ?? "Unknown";
107+
Data["RabbitMQVersion"] = overview.BrokerVersion ?? "Unknown";
126108
}
127109

128110
async Task AddEndpointIndicators(RabbitMQBrokerQueue brokerQueue, CancellationToken cancellationToken)
129111
{
130112
try
131113
{
132-
var response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForQueue(brokerQueue.QueueName, token), cancellationToken);
114+
var bindings = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForQueue(brokerQueue.QueueName, token), cancellationToken);
133115

134116
// Check if conventional binding is found
135-
if (response.Value.Any(binding => binding?.Source == brokerQueue.QueueName
136-
&& binding?.Destination == brokerQueue.QueueName
137-
&& binding?.DestinationType == "queue"
138-
&& binding?.RoutingKey == string.Empty
139-
&& binding?.PropertiesKey == "~"))
117+
if (bindings.Any(binding => binding.Source == brokerQueue.QueueName
118+
&& binding.Destination == brokerQueue.QueueName
119+
&& binding.DestinationType == "queue"
120+
&& binding.RoutingKey == string.Empty
121+
&& binding.PropertiesKey == "~"))
140122
{
141123
brokerQueue.EndpointIndicators.Add("ConventionalTopologyBinding");
142124
}
@@ -148,13 +130,13 @@ async Task AddEndpointIndicators(RabbitMQBrokerQueue brokerQueue, CancellationTo
148130

149131
try
150132
{
151-
var response = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForExchange(brokerQueue.QueueName, token), cancellationToken);
133+
var bindings = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForExchange(brokerQueue.QueueName, token), cancellationToken);
152134

153135
// Check if delayed binding is found
154-
if (response.Value.Any(binding => binding?.Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
155-
&& binding?.Destination == brokerQueue.QueueName
156-
&& binding?.DestinationType == "exchange"
157-
&& binding?.RoutingKey == $"#.{brokerQueue.QueueName}"))
136+
if (bindings.Any(binding => binding.Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
137+
&& binding.Destination == brokerQueue.QueueName
138+
&& binding.DestinationType == "exchange"
139+
&& binding.RoutingKey == $"#.{brokerQueue.QueueName}"))
158140
{
159141
brokerQueue.EndpointIndicators.Add("DelayBinding");
160142
}
@@ -165,40 +147,20 @@ async Task AddEndpointIndicators(RabbitMQBrokerQueue brokerQueue, CancellationTo
165147
}
166148
}
167149

168-
void ValidateResponse<T>((HttpStatusCode StatusCode, string Reason, T? Value) response)
169-
{
170-
if (response.StatusCode != HttpStatusCode.OK)
171-
{
172-
throw new HttpRequestException($"Request failed with status code {response.StatusCode}: {response.Reason}");
173-
}
174-
175-
if (response.Value is null)
176-
{
177-
throw new InvalidOperationException("Request was successful, but the response body was null when a value was expected");
178-
}
179-
}
180-
181150
public override KeyDescriptionPair[] Settings => [];
182151

183152
protected override async Task<(bool Success, List<string> Errors)> TestConnectionCore(CancellationToken cancellationToken)
184153
{
185154
try
186155
{
187-
var (statusCode, reason, value) = await managementClient.Value.GetOverview(cancellationToken);
156+
var overview = await managementClient.Value.GetOverview(cancellationToken);
188157

189-
if (value is not null)
158+
if (overview.DisableStats)
190159
{
191-
if (value.DisableStats)
192-
{
193-
return (false, [disableStatsErrorMessage]);
194-
}
195-
196-
return (true, []);
197-
}
198-
else
199-
{
200-
return (false, [$"{statusCode}: {reason}"]);
160+
return (false, [disableStatsErrorMessage]);
201161
}
162+
163+
return (true, []);
202164
}
203165
catch (HttpRequestException ex)
204166
{

src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,11 @@ public static void ApplySettingsFromConnectionString(this RabbitMQTransport tran
2525
transport.ValidateDeliveryLimits = validateDeliveryLimits;
2626
}
2727

28-
if (dictionary.TryGetValue("ManagementApiUrl", out var url))
29-
{
30-
if (dictionary.TryGetValue("ManagementApiUserName", out var userName) && dictionary.TryGetValue("ManagementApiPassword", out var password))
31-
{
32-
transport.ManagementApiConfiguration = new(url, userName, password);
33-
}
34-
else
35-
{
36-
transport.ManagementApiConfiguration = new(url);
37-
}
38-
}
39-
else if (dictionary.TryGetValue("ManagementApiUserName", out var userName) && dictionary.TryGetValue("ManagementApiPassword", out var password))
40-
{
41-
transport.ManagementApiConfiguration = new(userName, password);
42-
}
28+
dictionary.TryGetValue("ManagementApiUrl", out var url);
29+
dictionary.TryGetValue("ManagementApiUserName", out var userName);
30+
dictionary.TryGetValue("ManagementApiPassword", out var password);
31+
32+
transport.ManagementApiConfiguration = ManagementApiConfiguration.Create(url, userName, password);
4333

4434
if (dictionary.TryGetValue("DisableRemoteCertificateValidation", out var disableRemoteCertificateValidationString))
4535
{

0 commit comments

Comments
 (0)