Skip to content

Commit 253ff23

Browse files
authored
Use large value instead of unlimited when setting delivery limit via policy (#1704)
1 parent cc53138 commit 253ff23

File tree

3 files changed

+88
-10
lines changed

3 files changed

+88
-10
lines changed

src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,79 @@ public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy()
4949
for (int i = 0; i < attempts; i++)
5050
{
5151
var queue = await managementClient.GetQueue(queueName);
52+
var deliveryLimit = queue.GetDeliveryLimit();
5253

53-
if (queue.DeliveryLimit == -1 && queue.AppliedPolicyName == policyName)
54+
if (deliveryLimit == Queue.BigValueInsteadOfActuallyUnlimited && queue.AppliedPolicyName == policyName)
55+
{
56+
// Policy applied successfully
57+
return;
58+
}
59+
60+
await Task.Delay(TimeSpan.FromSeconds(3));
61+
}
62+
63+
Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}'.");
64+
}
65+
66+
[Test]
67+
public async Task ValidateDeliveryLimit_Should_Update_Old_Unlimited_Policy_Created_By_Transport()
68+
{
69+
var managementClient = new ManagementClient(connectionConfiguration);
70+
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
71+
await brokerVerifier.Initialize();
72+
73+
if (brokerVerifier.BrokerVersion < BrokerVerifier.BrokerVersion4)
74+
{
75+
Assert.Ignore("Test not valid for broker versions before 4.0.0");
76+
}
77+
78+
var queueName = nameof(ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy);
79+
var policyName = $"nsb.{queueName}.delivery-limit";
80+
81+
var oldUnlimitedPolicy = new Policy
82+
{
83+
ApplyTo = PolicyTarget.QuorumQueues,
84+
Definition = new PolicyDefinition { DeliveryLimit = -1 },
85+
Pattern = queueName,
86+
Priority = 0
87+
};
88+
89+
await CreateQueue(queueName);
90+
await managementClient.CreatePolicy(policyName, oldUnlimitedPolicy);
91+
92+
// It can take some time for updated policies to be applied, so we need to wait.
93+
// If this test is randomly failing, consider increasing the attempts
94+
var attempts = 20;
95+
96+
int deliveryLimit = 0;
97+
98+
for (int i = 0; i < attempts; i++)
99+
{
100+
var queue = await managementClient.GetQueue(queueName);
101+
deliveryLimit = queue.GetDeliveryLimit();
102+
103+
if (deliveryLimit == -1 && queue.AppliedPolicyName == policyName)
104+
{
105+
// Policy applied successfully
106+
break;
107+
}
108+
109+
await Task.Delay(TimeSpan.FromSeconds(3));
110+
}
111+
112+
if (deliveryLimit != -1)
113+
{
114+
Assert.Fail($"Old unlimited Policy '{policyName}' was not applied to queue '{queueName}'.");
115+
}
116+
117+
await brokerVerifier.ValidateDeliveryLimit(queueName);
118+
119+
for (int i = 0; i < attempts; i++)
120+
{
121+
var queue = await managementClient.GetQueue(queueName);
122+
deliveryLimit = queue.GetDeliveryLimit();
123+
124+
if (deliveryLimit == Queue.BigValueInsteadOfActuallyUnlimited && queue.AppliedPolicyName == policyName)
54125
{
55126
// Policy applied successfully
56127
return;
@@ -124,7 +195,7 @@ public async Task ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_
124195

125196
await managementClient.CreatePolicy(policyName, policy).ConfigureAwait(false);
126197

127-
// If this test appears flaky, the delay should be increased to give the broker more time to apply the policy before calling ValidateDeliveryLimit
198+
// If this test appears flaky, the delay should be increased to give the broker more time to apply the oldUnlimitedPolicy before calling ValidateDeliveryLimit
128199
await Task.Delay(TimeSpan.FromSeconds(30));
129200

130201
var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await brokerVerifier.ValidateDeliveryLimit(queueName));

src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,16 @@ bool ShouldOverrideDeliveryLimit(Queue queue)
171171

172172
var limit = queue.GetDeliveryLimit();
173173

174-
if (limit == -1)
174+
if (limit is Queue.BigValueInsteadOfActuallyUnlimited)
175175
{
176176
return false;
177177
}
178178

179+
if (limit == -1 && queue.AppliedPolicyName == $"nsb.{queue.Name}.delivery-limit")
180+
{
181+
return true;
182+
}
183+
179184
if (queue.Arguments.DeliveryLimit.HasValue || (queue.EffectivePolicyDefinition?.DeliveryLimit.HasValue ?? false))
180185
{
181186
throw new InvalidOperationException($"The delivery limit for '{queue.Name}' is set to the non-default value of '{limit}'. Remove any delivery limit settings from queue arguments, user policies or operator policies to correct this.");
@@ -191,17 +196,17 @@ async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellation
191196
throw new InvalidOperationException($"Cannot create unlimited delivery limit policies in RabbitMQ versions prior to 4.0. The version is: {brokerVersion}.");
192197
}
193198

194-
if (!string.IsNullOrEmpty(queue.AppliedPolicyName))
199+
var policyName = $"nsb.{queue.Name}.delivery-limit";
200+
201+
if (!string.IsNullOrEmpty(queue.AppliedPolicyName) && queue.AppliedPolicyName != policyName)
195202
{
196203
throw new InvalidOperationException($"An unlimited delivery limit policy cannot be applied to the '{queue.Name}' queue because it already has a '{queue.AppliedPolicyName}' policy applied.");
197204
}
198205

199-
var policyName = $"nsb.{queue.Name}.delivery-limit";
200-
201206
var policy = new Policy
202207
{
203208
ApplyTo = PolicyTarget.QuorumQueues,
204-
Definition = new PolicyDefinition { DeliveryLimit = -1 },
209+
Definition = new PolicyDefinition { DeliveryLimit = Queue.BigValueInsteadOfActuallyUnlimited },
205210
Pattern = queue.Name,
206211
Priority = 0
207212
};

src/NServiceBus.Transport.RabbitMQ/Administration/ManagementApi/Models/Queue.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public int GetDeliveryLimit()
4343
}
4444

4545
// RabbitMQ 3.x
46-
// The broker doesn't tell us what the actual delivery limit is, so we have to figure it out
47-
// We have to find the lowest value from the possible places in can be configured
46+
// The broker doesn't tell us what the actual delivery limit is, so we have to figure it out.
47+
// We have to find the lowest value from the possible places it can be configured.
4848

4949
int? limit = null;
5050

@@ -67,6 +67,8 @@ public int GetDeliveryLimit()
6767
limit = 0;
6868
}
6969

70-
return limit ?? -1;
70+
return limit ?? BigValueInsteadOfActuallyUnlimited;
7171
}
72+
73+
public const int BigValueInsteadOfActuallyUnlimited = 100_000;
7274
}

0 commit comments

Comments
 (0)