Skip to content

Commit 47a596b

Browse files
Add QueueDeleted event to IQueue<T> for queue deletion notifications
Adds a QueueDeleted async event raised after a queue is deleted via DeleteQueueAsync. This enables behaviors and event handlers to react to queue deletion (e.g., cleanup, logging, cache invalidation). - Add QueueDeletedEventArgs<T> and QueueDeleted event to IQueue<T> - Implement OnQueueDeletedAsync in QueueBase with parallel invocation - Refactor DeleteQueueAsync into base class with DeleteQueueImplAsync pattern - Move trace logging from InMemoryQueue to QueueBase for all implementations - Wire QueueDeleted into QueueBehaviorBase for behavior support - Dispose QueueDeleted event in QueueBase.Dispose - Update queue documentation with QueueDeleted event and behavior examples Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 41c091f commit 47a596b

File tree

6 files changed

+109
-6
lines changed

6 files changed

+109
-6
lines changed

docs/guide/queues.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public interface IQueue<T> : IQueue where T : class
1515
AsyncEvent<LockRenewedEventArgs<T>> LockRenewed { get; }
1616
AsyncEvent<CompletedEventArgs<T>> Completed { get; }
1717
AsyncEvent<AbandonedEventArgs<T>> Abandoned { get; }
18+
AsyncEvent<QueueDeletedEventArgs<T>> QueueDeleted { get; }
1819

1920
void AttachBehavior(IQueueBehavior<T> behavior);
2021
Task<string> EnqueueAsync(T data, QueueEntryOptions options = null);
@@ -414,6 +415,11 @@ queue.Abandoned.AddHandler(async (sender, args) =>
414415
{
415416
_logger.LogWarning("Abandoned: {Id}", args.Entry.Id);
416417
});
418+
419+
queue.QueueDeleted.AddHandler(async (sender, args) =>
420+
{
421+
_logger.LogInformation("Queue deleted");
422+
});
417423
```
418424

419425
## Queue Behaviors
@@ -454,6 +460,12 @@ public class LoggingQueueBehavior<T> : QueueBehaviorBase<T> where T : class
454460
args.Entry.Id, args.Entry.Attempts);
455461
return Task.CompletedTask;
456462
}
463+
464+
protected override Task OnQueueDeleted(object sender, QueueDeletedEventArgs<T> args)
465+
{
466+
_logger.LogInformation("Queue deleted");
467+
return Task.CompletedTask;
468+
}
457469
}
458470

459471
// Attach to queue

src/Foundatio/Queues/IQueue.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public interface IQueue<T> : IQueue where T : class
4646
/// </summary>
4747
AsyncEvent<AbandonedEventArgs<T>> Abandoned { get; }
4848

49+
/// <summary>
50+
/// Raised after the queue has been deleted.
51+
/// </summary>
52+
AsyncEvent<QueueDeletedEventArgs<T>> QueueDeleted { get; }
53+
4954
/// <summary>
5055
/// Attaches a behavior that can intercept and modify queue operations.
5156
/// </summary>
@@ -326,3 +331,15 @@ public class AbandonedEventArgs<T> : EventArgs where T : class
326331
/// </summary>
327332
public IQueueEntry<T> Entry { get; set; }
328333
}
334+
335+
/// <summary>
336+
/// Event arguments for the <see cref="IQueue{T}.QueueDeleted"/> event.
337+
/// </summary>
338+
/// <typeparam name="T">The type of message payload.</typeparam>
339+
public class QueueDeletedEventArgs<T> : EventArgs where T : class
340+
{
341+
/// <summary>
342+
/// The queue raising the event.
343+
/// </summary>
344+
public IQueue<T> Queue { get; set; }
345+
}

src/Foundatio/Queues/InMemoryQueue.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,10 +350,8 @@ protected override Task<IEnumerable<T>> GetDeadletterItemsImplAsync(Cancellation
350350
return Task.FromResult(_deadletterQueue.Select(i => i.Value));
351351
}
352352

353-
public override Task DeleteQueueAsync()
353+
protected override Task DeleteQueueImplAsync()
354354
{
355-
_logger.LogTrace("Deleting queue: {QueueName} ({QueueId})", _options.Name, QueueId);
356-
357355
_queue.Clear();
358356
_deadletterQueue.Clear();
359357
_dequeued.Clear();

src/Foundatio/Queues/QueueBase.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,14 @@ protected virtual QueueStats GetMetricsQueueStats()
178178
return GetQueueStatsAsync().AnyContext().GetAwaiter().GetResult();
179179
}
180180

181-
public abstract Task DeleteQueueAsync();
181+
protected abstract Task DeleteQueueImplAsync();
182+
183+
public async Task DeleteQueueAsync()
184+
{
185+
_logger.LogTrace("Deleting queue: {QueueName} ({QueueId})", _options.Name, QueueId);
186+
await DeleteQueueImplAsync().AnyContext();
187+
await OnQueueDeletedAsync().AnyContext();
188+
}
182189

183190
protected abstract void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken);
184191
public async Task StartWorkingAsync(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete = false, CancellationToken cancellationToken = default)
@@ -335,6 +342,17 @@ protected virtual async Task OnAbandonedAsync(IQueueEntry<T> entry)
335342
}
336343
}
337344

345+
public AsyncEvent<QueueDeletedEventArgs<T>> QueueDeleted { get; } = new AsyncEvent<QueueDeletedEventArgs<T>>(true);
346+
347+
protected virtual async Task OnQueueDeletedAsync()
348+
{
349+
if (QueueDeleted is not null)
350+
{
351+
var args = new QueueDeletedEventArgs<T> { Queue = this };
352+
await QueueDeleted.InvokeAsync(this, args).AnyContext();
353+
}
354+
}
355+
338356
protected string GetSubMetricName(T data)
339357
{
340358
var haveStatName = data as IHaveSubMetricName;
@@ -402,6 +420,7 @@ public override void Dispose()
402420
Enqueued?.Dispose();
403421
Enqueuing?.Dispose();
404422
LockRenewed?.Dispose();
423+
QueueDeleted?.Dispose();
405424

406425
foreach (var behavior in _behaviors.OfType<IDisposable>())
407426
behavior.Dispose();

src/Foundatio/Queues/QueueBehaviour.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Threading.Tasks;
44

@@ -24,6 +24,7 @@ public virtual void Attach(IQueue<T> queue)
2424
_disposables.Add(_queue.LockRenewed.AddHandler(OnLockRenewed));
2525
_disposables.Add(_queue.Completed.AddHandler(OnCompleted));
2626
_disposables.Add(_queue.Abandoned.AddHandler(OnAbandoned));
27+
_disposables.Add(_queue.QueueDeleted.AddHandler(OnQueueDeleted));
2728
}
2829

2930
protected virtual Task OnEnqueuing(object sender, EnqueuingEventArgs<T> enqueuingEventArgs)
@@ -56,6 +57,11 @@ protected virtual Task OnAbandoned(object sender, AbandonedEventArgs<T> abandone
5657
return Task.CompletedTask;
5758
}
5859

60+
protected virtual Task OnQueueDeleted(object sender, QueueDeletedEventArgs<T> queueDeletedEventArgs)
61+
{
62+
return Task.CompletedTask;
63+
}
64+
5965
public virtual void Dispose()
6066
{
6167
foreach (var disposable in _disposables)

tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Threading;
@@ -290,6 +290,57 @@ public async Task CanGetCompletedEntries()
290290
Assert.Equal(10, q.GetCompletedEntries().Count);
291291
}
292292

293+
[Fact]
294+
public async Task DeleteQueueAsync_WithEventHandler_RaisesQueueDeletedEvent()
295+
{
296+
// Arrange
297+
using var q = new InMemoryQueue<SimpleWorkItem>(o => o.LoggerFactory(Log));
298+
bool eventFired = false;
299+
300+
using var handler = q.QueueDeleted.AddHandler((sender, args) =>
301+
{
302+
eventFired = true;
303+
Assert.Same(q, args.Queue);
304+
return Task.CompletedTask;
305+
});
306+
307+
await q.EnqueueAsync(new SimpleWorkItem());
308+
309+
// Act
310+
await q.DeleteQueueAsync();
311+
312+
// Assert
313+
Assert.True(eventFired);
314+
}
315+
316+
[Fact]
317+
public async Task DeleteQueueAsync_WithAttachedBehavior_InvokesBehaviorOnQueueDeleted()
318+
{
319+
// Arrange
320+
using var q = new InMemoryQueue<SimpleWorkItem>(o => o.LoggerFactory(Log));
321+
var behavior = new QueueDeletedTestBehavior<SimpleWorkItem>();
322+
q.AttachBehavior(behavior);
323+
324+
await q.EnqueueAsync(new SimpleWorkItem());
325+
326+
// Act
327+
await q.DeleteQueueAsync();
328+
329+
// Assert
330+
Assert.True(behavior.QueueDeletedCalled);
331+
}
332+
333+
private class QueueDeletedTestBehavior<T> : QueueBehaviorBase<T> where T : class
334+
{
335+
public bool QueueDeletedCalled { get; private set; }
336+
337+
protected override Task OnQueueDeleted(object sender, QueueDeletedEventArgs<T> queueDeletedEventArgs)
338+
{
339+
QueueDeletedCalled = true;
340+
return Task.CompletedTask;
341+
}
342+
}
343+
293344
#region Issue239
294345

295346
class QueueEntry_Issue239<T> : IQueueEntry<T> where T : class

0 commit comments

Comments
 (0)