Skip to content

Commit 09d1705

Browse files
committed
Fix race condition in InMemoryQueue causing RunUntilEmptyAsync to exit prematurely
During AbandonAsync, there is a window between removing an entry from _dequeued and re-enqueueing it for retry where the item exists in neither collection. If RunUntilEmptyAsync checks queue stats during this gap, it sees Queued=0 + Working=0 and terminates the job loop while retryable items are still in flight. Add a _pendingRetryCount that bridges the gap: incremented before TryRemove, decremented after the item lands in its destination (re-queued, deadlettered, or scheduled for delayed retry). The count is included in the Queued stat so the continuation callback sees items in transit. For delayed retries (RetryDelay > 0), the counter is decremented immediately after scheduling since the item is intentionally parked and RunUntilEmptyAsync should not spin-wait for it. Fixes flaky CanRunQueueJobWithLockFailAsync test. Made-with: Cursor
1 parent 6c6608c commit 09d1705

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

src/Foundatio/Queues/InMemoryQueue.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class InMemoryQueue<T> : QueueBase<T, InMemoryQueueOptions<T>> where T :
2626
private int _abandonedCount;
2727
private int _workerErrorCount;
2828
private int _workerItemTimeoutCount;
29+
private int _pendingRetryCount;
2930

3031
public InMemoryQueue() : this(o => o) { }
3132

@@ -51,7 +52,7 @@ protected override QueueStats GetMetricsQueueStats()
5152
{
5253
return new QueueStats
5354
{
54-
Queued = _queue.Count,
55+
Queued = _queue.Count + _pendingRetryCount,
5556
Working = _dequeued.Count,
5657
Deadletter = _deadletterQueue.Count,
5758
Enqueued = _enqueuedCount,
@@ -292,8 +293,12 @@ public override async Task AbandonAsync(IQueueEntry<T> queueEntry)
292293
if (queueEntry.IsAbandoned || queueEntry.IsCompleted)
293294
throw new InvalidOperationException("Queue entry has already been completed or abandoned");
294295

296+
Interlocked.Increment(ref _pendingRetryCount);
297+
295298
if (!_dequeued.TryRemove(queueEntry.Id, out var targetEntry) || targetEntry == null)
296299
{
300+
Interlocked.Decrement(ref _pendingRetryCount);
301+
297302
foreach (var kvp in _queue)
298303
{
299304
if (kvp.Id == queueEntry.Id)
@@ -323,8 +328,9 @@ public override async Task AbandonAsync(IQueueEntry<T> queueEntry)
323328
var retryEntry = targetEntry.CreateRetryEntry();
324329
if (_options.RetryDelay > TimeSpan.Zero)
325330
{
331+
Interlocked.Decrement(ref _pendingRetryCount);
326332
_logger.LogTrace("Adding item to wait list for future retry: {QueueEntryId} Attempts: {QueueEntryAttempts}", queueEntry.Id, queueEntry.Attempts);
327-
var unawaited = Run.DelayedAsync(GetRetryDelay(targetEntry.Attempts), () =>
333+
_ = Run.DelayedAsync(GetRetryDelay(targetEntry.Attempts), () =>
328334
{
329335
Retry(retryEntry);
330336
return Task.CompletedTask;
@@ -334,12 +340,14 @@ public override async Task AbandonAsync(IQueueEntry<T> queueEntry)
334340
{
335341
_logger.LogTrace("Adding item back to queue for retry: {QueueEntryId} Attempts: {QueueEntryAttempts}", queueEntry.Id, queueEntry.Attempts);
336342
Retry(retryEntry);
343+
Interlocked.Decrement(ref _pendingRetryCount);
337344
}
338345
}
339346
else
340347
{
341348
_logger.LogInformation("Exceeded retry limit ({Attempts}/{Retries}), moving message {QueueEntryId} to dead letter", targetEntry.Attempts, _options.Retries, queueEntry.Id);
342349
_deadletterQueue.Enqueue(targetEntry);
350+
Interlocked.Decrement(ref _pendingRetryCount);
343351
}
344352
}
345353
}
@@ -373,6 +381,7 @@ protected override Task DeleteQueueImplAsync()
373381
_completedCount = 0;
374382
_abandonedCount = 0;
375383
_workerErrorCount = 0;
384+
_pendingRetryCount = 0;
376385

377386
return Task.CompletedTask;
378387
}

0 commit comments

Comments
 (0)