Skip to content

Commit 5b10f97

Browse files
authored
Use proper regular expression for policy pattern (#1708)
1 parent 253ff23 commit 5b10f97

File tree

2 files changed

+83
-7
lines changed

2 files changed

+83
-7
lines changed

src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public async Task ValidateDeliveryLimit_Should_Update_Old_Unlimited_Policy_Creat
7575
Assert.Ignore("Test not valid for broker versions before 4.0.0");
7676
}
7777

78-
var queueName = nameof(ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy);
78+
var queueName = nameof(ValidateDeliveryLimit_Should_Update_Old_Unlimited_Policy_Created_By_Transport);
7979
var policyName = $"nsb.{queueName}.delivery-limit";
8080

8181
var oldUnlimitedPolicy = new Policy
@@ -133,6 +133,79 @@ public async Task ValidateDeliveryLimit_Should_Update_Old_Unlimited_Policy_Creat
133133
Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}'.");
134134
}
135135

136+
[Test]
137+
public async Task ValidateDeliveryLimit_Should_Update_Old_Overeager_Policy()
138+
{
139+
var managementClient = new ManagementClient(connectionConfiguration);
140+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
141+
await brokerVerifier.Initialize();
142+
143+
if (brokerVerifier.BrokerVersion < BrokerVerifier.BrokerVersion4)
144+
{
145+
Assert.Ignore("Test not valid for broker versions before 4.0.0");
146+
}
147+
148+
var queueName = nameof(ValidateDeliveryLimit_Should_Update_Old_Overeager_Policy);
149+
var policyName = $"nsb.{queueName}.delivery-limit";
150+
151+
var oldQueueName = queueName[..^7];
152+
var oldPolicyName = $"nsb.{oldQueueName}.delivery-limit";
153+
154+
var oldPolicy = new Policy
155+
{
156+
ApplyTo = PolicyTarget.QuorumQueues,
157+
Definition = new PolicyDefinition { DeliveryLimit = -1 },
158+
Pattern = oldQueueName,
159+
Priority = 0
160+
};
161+
162+
await CreateQueue(queueName);
163+
await managementClient.CreatePolicy(oldPolicyName, oldPolicy);
164+
165+
// It can take some time for updated policies to be applied, so we need to wait.
166+
// If this test is randomly failing, consider increasing the attempts
167+
var attempts = 20;
168+
169+
int deliveryLimit = 0;
170+
171+
for (int i = 0; i < attempts; i++)
172+
{
173+
var queue = await managementClient.GetQueue(queueName);
174+
deliveryLimit = queue.GetDeliveryLimit();
175+
176+
if (deliveryLimit == -1 && queue.AppliedPolicyName == oldPolicyName)
177+
{
178+
// Policy applied successfully
179+
break;
180+
}
181+
182+
await Task.Delay(TimeSpan.FromSeconds(3));
183+
}
184+
185+
if (deliveryLimit != -1)
186+
{
187+
Assert.Fail($"Old Policy '{oldPolicyName}' was not applied to queue '{queueName}'.");
188+
}
189+
190+
await brokerVerifier.ValidateDeliveryLimit(queueName);
191+
192+
for (int i = 0; i < attempts; i++)
193+
{
194+
var queue = await managementClient.GetQueue(queueName);
195+
deliveryLimit = queue.GetDeliveryLimit();
196+
197+
if (deliveryLimit == Queue.BigValueInsteadOfActuallyUnlimited && queue.AppliedPolicyName == policyName)
198+
{
199+
// Policy applied successfully
200+
return;
201+
}
202+
203+
await Task.Delay(TimeSpan.FromSeconds(3));
204+
}
205+
206+
Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}'.");
207+
}
208+
136209
[Test]
137210
public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Does_Not_Exist()
138211
{
@@ -195,7 +268,7 @@ public async Task ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_
195268

196269
await managementClient.CreatePolicy(policyName, policy).ConfigureAwait(false);
197270

198-
// If this test appears flaky, the delay should be increased to give the broker more time to apply the oldUnlimitedPolicy before calling ValidateDeliveryLimit
271+
// If this test appears flaky, the delay should be increased to give the broker more time to apply the policy before calling ValidateDeliveryLimit
199272
await Task.Delay(TimeSpan.FromSeconds(30));
200273

201274
var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await brokerVerifier.ValidateDeliveryLimit(queueName));

src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ namespace NServiceBus.Transport.RabbitMQ;
55
using System;
66
using System.Net;
77
using System.Net.Http;
8+
using System.Text.RegularExpressions;
89
using System.Threading;
910
using System.Threading.Tasks;
1011
using NServiceBus.Transport.RabbitMQ.ManagementApi;
@@ -176,7 +177,7 @@ bool ShouldOverrideDeliveryLimit(Queue queue)
176177
return false;
177178
}
178179

179-
if (limit == -1 && queue.AppliedPolicyName == $"nsb.{queue.Name}.delivery-limit")
180+
if (limit == -1 && IsTransportPolicy(queue.AppliedPolicyName))
180181
{
181182
return true;
182183
}
@@ -196,18 +197,18 @@ async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellation
196197
throw new InvalidOperationException($"Cannot create unlimited delivery limit policies in RabbitMQ versions prior to 4.0. The version is: {brokerVersion}.");
197198
}
198199

199-
var policyName = $"nsb.{queue.Name}.delivery-limit";
200-
201-
if (!string.IsNullOrEmpty(queue.AppliedPolicyName) && queue.AppliedPolicyName != policyName)
200+
if (!string.IsNullOrEmpty(queue.AppliedPolicyName) && !IsTransportPolicy(queue.AppliedPolicyName))
202201
{
203202
throw new InvalidOperationException($"An unlimited delivery limit policy cannot be applied to the '{queue.Name}' queue because it already has a '{queue.AppliedPolicyName}' policy applied.");
204203
}
205204

205+
var policyName = $"nsb.{queue.Name}.delivery-limit";
206+
206207
var policy = new Policy
207208
{
208209
ApplyTo = PolicyTarget.QuorumQueues,
209210
Definition = new PolicyDefinition { DeliveryLimit = Queue.BigValueInsteadOfActuallyUnlimited },
210-
Pattern = queue.Name,
211+
Pattern = $"^{Regex.Escape(queue.Name)}$",
211212
Priority = 0
212213
};
213214

@@ -221,6 +222,8 @@ async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellation
221222
}
222223
}
223224

225+
static bool IsTransportPolicy(string? policyName) => policyName is not null && policyName.StartsWith("nsb.") && policyName.EndsWith(".delivery-limit");
226+
224227
static void LogWarning(string message)
225228
{
226229
#if !COMMANDLINE

0 commit comments

Comments
 (0)