Skip to content

Commit bb7146e

Browse files
authored
Fix race condition in InMemoryQueue.AbandonAsync causing flaky RunUntilEmptyAsync (#470)
* Fix race condition in InMemoryQueue causing RunUntilEmptyAsync to exit prematurely During AbandonAsync, there is a window between removing an entry from _dequeued and re-enqueueing it for retry where the item exists in neither collection. If RunUntilEmptyAsync checks queue stats during this gap, it sees Queued=0 + Working=0 and terminates the job loop while retryable items are still in flight. Add a _pendingRetryCount that bridges the gap: incremented before TryRemove, decremented after the item lands in its destination (re-queued, deadlettered, or scheduled for delayed retry). The count is included in the Queued stat so the continuation callback sees items in transit. For delayed retries (RetryDelay > 0), the counter is decremented immediately after scheduling since the item is intentionally parked and RunUntilEmptyAsync should not spin-wait for it. Fixes flaky CanRunQueueJobWithLockFailAsync test. Made-with: Cursor * Move _pendingRetryCount decrement before _deadletterQueue.Enqueue In the deadletter path, the item moves out of Queued/Working entirely, so decrementing _pendingRetryCount before enqueuing to the deadletter queue avoids a transient overcount where the item appears in both the Queued stat (via _pendingRetryCount) and the Deadletter stat (via _deadletterQueue.Count). The synchronous retry path intentionally keeps the current ordering (Retry then Decrement) because decrementing first would re-open the race window this PR fixes: between the decrement and _queue.Enqueue inside Retry(), both _pendingRetryCount and _queue.Count would be zero, allowing RunUntilEmptyAsync to exit prematurely. Made-with: Cursor
1 parent 877f769 commit bb7146e

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

src/Foundatio/Extensions/CacheClientExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static class CacheClientExtensions
1616
/// below any real-world cache TTL.
1717
/// </summary>
1818
public static readonly TimeSpan MinimumExpiration = TimeSpan.FromMilliseconds(5);
19-
19+
2020
public static async Task<T> GetAsync<T>(this ICacheClient client, string key, T defaultValue)
2121
{
2222
var cacheValue = await client.GetAsync<T>(key).AnyContext();

src/Foundatio/Queues/InMemoryQueue.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class InMemoryQueue<T> : QueueBase<T, InMemoryQueueOptions<T>> where T :
2626
private int _abandonedCount;
2727
private int _workerErrorCount;
2828
private int _workerItemTimeoutCount;
29+
private int _pendingRetryCount;
2930

3031
public InMemoryQueue() : this(o => o) { }
3132

@@ -51,7 +52,7 @@ protected override QueueStats GetMetricsQueueStats()
5152
{
5253
return new QueueStats
5354
{
54-
Queued = _queue.Count,
55+
Queued = _queue.Count + _pendingRetryCount,
5556
Working = _dequeued.Count,
5657
Deadletter = _deadletterQueue.Count,
5758
Enqueued = _enqueuedCount,
@@ -292,8 +293,12 @@ public override async Task AbandonAsync(IQueueEntry<T> queueEntry)
292293
if (queueEntry.IsAbandoned || queueEntry.IsCompleted)
293294
throw new InvalidOperationException("Queue entry has already been completed or abandoned");
294295

296+
Interlocked.Increment(ref _pendingRetryCount);
297+
295298
if (!_dequeued.TryRemove(queueEntry.Id, out var targetEntry) || targetEntry == null)
296299
{
300+
Interlocked.Decrement(ref _pendingRetryCount);
301+
297302
foreach (var kvp in _queue)
298303
{
299304
if (kvp.Id == queueEntry.Id)
@@ -323,8 +328,9 @@ public override async Task AbandonAsync(IQueueEntry<T> queueEntry)
323328
var retryEntry = targetEntry.CreateRetryEntry();
324329
if (_options.RetryDelay > TimeSpan.Zero)
325330
{
331+
Interlocked.Decrement(ref _pendingRetryCount);
326332
_logger.LogTrace("Adding item to wait list for future retry: {QueueEntryId} Attempts: {QueueEntryAttempts}", queueEntry.Id, queueEntry.Attempts);
327-
var unawaited = Run.DelayedAsync(GetRetryDelay(targetEntry.Attempts), () =>
333+
_ = Run.DelayedAsync(GetRetryDelay(targetEntry.Attempts), () =>
328334
{
329335
Retry(retryEntry);
330336
return Task.CompletedTask;
@@ -334,11 +340,13 @@ public override async Task AbandonAsync(IQueueEntry<T> queueEntry)
334340
{
335341
_logger.LogTrace("Adding item back to queue for retry: {QueueEntryId} Attempts: {QueueEntryAttempts}", queueEntry.Id, queueEntry.Attempts);
336342
Retry(retryEntry);
343+
Interlocked.Decrement(ref _pendingRetryCount);
337344
}
338345
}
339346
else
340347
{
341348
_logger.LogInformation("Exceeded retry limit ({Attempts}/{Retries}), moving message {QueueEntryId} to dead letter", targetEntry.Attempts, _options.Retries, queueEntry.Id);
349+
Interlocked.Decrement(ref _pendingRetryCount);
342350
_deadletterQueue.Enqueue(targetEntry);
343351
}
344352
}
@@ -373,6 +381,7 @@ protected override Task DeleteQueueImplAsync()
373381
_completedCount = 0;
374382
_abandonedCount = 0;
375383
_workerErrorCount = 0;
384+
_pendingRetryCount = 0;
376385

377386
return Task.CompletedTask;
378387
}

src/Foundatio/Utility/FoundatioDiagnostics.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,4 @@ public static Activity AddException(this Activity activity, Exception exception,
6969
return activity.AddEvent(new ActivityEvent(ExceptionEventName, timestamp, exceptionTags));
7070
}
7171
}
72-
#endif
72+
#endif

0 commit comments

Comments
 (0)