Skip to content

Commit 0547739

Browse files
committed
#375 Outbox clean up in low priority batches with regular intervals
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
1 parent 17a8721 commit 0547739

File tree

17 files changed

+673
-284
lines changed

17 files changed

+673
-284
lines changed

docs/plugin_outbox.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
1212
- [UseTransactionScope](#usetransactionscope)
1313
- [UseSqlTransaction](#usesqltransaction)
1414
- [How it works](#how-it-works)
15+
- [Clean up](#clean-up)
1516
- [Important note](#important-note)
1617

1718
## Introduction
@@ -215,7 +216,15 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe
215216

216217
- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.
217218

218-
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
219+
## Clean up
220+
On starting SMB, messages that are older than `MessageCleanup.Age` will be removed from the `Outbox` table in batches of `MessageCleanup.BatchSize` until no sent messages of the specified age remain. The process is then repeated every `MessageCleanup.Interval` period.
221+
222+
| Property | Description | Default |
223+
| --------- | -------------------------------------------------- | ------- |
224+
| Enabled | `True` if messages are to be removed | true |
225+
| Interval | Time between exections | 1 hour |
226+
| Age | Minimum age of a sent message to delete | 1 hour |
227+
| BatchSize | Number of messages to be removed in each iteration | 10 000 |
219228

220229
## Important note
221230

docs/plugin_outbox.t.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
1212
- [UseTransactionScope](#usetransactionscope)
1313
- [UseSqlTransaction](#usesqltransaction)
1414
- [How it works](#how-it-works)
15+
- [Clean up](#clean-up)
1516
- [Important note](#important-note)
1617

1718
## Introduction
@@ -146,7 +147,15 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe
146147

147148
- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.
148149

149-
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
150+
## Clean up
151+
On starting SMB, messages that are older than `MessageCleanup.Age` will be removed from the `Outbox` table in batches of `MessageCleanup.BatchSize` until no sent messages of the specified age remain. The process is then repeated every `MessageCleanup.Interval` period.
152+
153+
| Property | Description | Default |
154+
| --------- | -------------------------------------------------- | ------- |
155+
| Enabled | `True` if messages are to be removed | true |
156+
| Interval | Time between exections | 1 hour |
157+
| Age | Minimum age of a sent message to delete | 1 hour |
158+
| BatchSize | Number of messages to be removed in each iteration | 10 000 |
150159

151160
## Important note
152161

src/SlimMessageBus.Host.Outbox.Sql/MeasuringSqlMessageOutboxRepositoryDecorator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cance
4242
public Task<IHasId> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
4343
=> MeasureMethod(nameof(Create), () => target.Create(busName, headers, path, messageType, messagePayload, cancellationToken));
4444

45-
public Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
46-
=> MeasureMethod(nameof(DeleteSent), () => target.DeleteSent(olderThan, cancellationToken));
45+
public Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken)
46+
=> MeasureMethod(nameof(DeleteSent), () => target.DeleteSent(olderThan, batchSize, cancellationToken));
4747

4848
public Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
4949
=> MeasureMethod(nameof(IncrementDeliveryAttempt), () => target.IncrementDeliveryAttempt(ids, maxDeliveryAttempts, cancellationToken));

src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessageRepository.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
using System;
44

55
/// <summary>
6-
/// The MS SQL implmentation of the <see cref="IOutboxMessageRepository{TOutboxMessage, TOutboxMessageKey}"/>
6+
/// The MS SQL implementation of the <see cref="IOutboxMessageRepository{TOutboxMessage, TOutboxMessageKey}"/>
77
/// </summary>
88
public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlMessageOutboxRepository
99
{
@@ -182,17 +182,23 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
182182
}
183183
}
184184

185-
public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
185+
public async Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken)
186186
{
187187
await EnsureConnection();
188188

189189
var affected = await ExecuteNonQuery(
190190
Settings.SqlSettings.OperationRetry,
191191
_sqlTemplate.SqlOutboxMessageDeleteSent,
192-
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan,
192+
cmd =>
193+
{
194+
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
195+
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan;
196+
},
193197
cancellationToken);
194198

195199
Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);
200+
201+
return affected;
196202
}
197203

198204
public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)

src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,18 @@ OUTPUT INSERTED.[Id]
3939
SqlOutboxMessageInsertWithDatabaseIdSequential = insertWith("NEWSEQUENTIALID()");
4040

4141
SqlOutboxMessageDeleteSent = $"""
42+
SET DEADLOCK_PRIORITY LOW;
43+
WITH CTE AS (SELECT TOP (@BatchSize) Id
44+
FROM {TableNameQualified}
45+
WHERE DeliveryComplete = 1
46+
AND Timestamp < @Timestamp)
4247
DELETE FROM {TableNameQualified}
43-
WHERE [DeliveryComplete] = 1
44-
AND [Timestamp] < @Timestamp
48+
WHERE Id IN (SELECT Id FROM CTE);
4549
""";
4650

4751
SqlOutboxMessageLockAndSelect = $"""
4852
WITH Batch AS (SELECT TOP (@BatchSize) *
49-
FROM {TableNameQualified}
53+
FROM {TableNameQualified} WITH (ROWLOCK, UPDLOCK, READPAST)
5054
WHERE DeliveryComplete = 0
5155
AND (LockInstanceId = @InstanceId
5256
OR LockExpiresOn < GETUTCDATE())

src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public static MessageBusBuilder AddOutbox<TOutboxMessage, TOutboxMessageKey>(thi
3838
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>()));
3939
services.TryAddSingleton<IOutboxNotificationService>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>());
4040

41+
services.AddSingleton<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>();
42+
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>(sp => sp.GetRequiredService<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>()));
43+
4144
services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
4245
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory<TOutboxMessage, TOutboxMessageKey>>();
4346

src/SlimMessageBus.Host.Outbox/Configuration/OutboxMessageCleanupSettings.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@ public class OutboxMessageCleanupSettings
1414
/// Message age from which the sent messages are removed from.
1515
/// </summary>
1616
public TimeSpan Age { get; set; } = TimeSpan.FromHours(1);
17+
/// <summary>
18+
/// Maximum number of sent messages to remove per statement.
19+
/// </summary>
20+
public int BatchSize { get; set; } = 10_000;
1721
}

src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageRepository.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ public interface IOutboxMessageRepository<TOutboxMessage, TOutboxMessageKey>
77
Task AbortDelivery(IReadOnlyCollection<TOutboxMessageKey> ids, CancellationToken cancellationToken);
88
Task UpdateToSent(IReadOnlyCollection<TOutboxMessageKey> ids, CancellationToken cancellationToken);
99
Task IncrementDeliveryAttempt(IReadOnlyCollection<TOutboxMessageKey> ids, int maxDeliveryAttempts, CancellationToken cancellationToken);
10-
Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken);
10+
Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken);
1111
Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken);
12-
}
13-
12+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
#nullable enable
2+
namespace SlimMessageBus.Host.Outbox.Services;
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
using Microsoft.Extensions.Hosting;
9+
10+
using SlimMessageBus;
11+
12+
public class OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey> : IMessageBusLifecycleInterceptor, IAsyncDisposable
13+
where TOutboxMessage : OutboxMessage<TOutboxMessageKey>
14+
{
15+
private readonly ILogger<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>> _logger;
16+
private readonly OutboxSettings _outboxSettings;
17+
private readonly TimeProvider _timeProvider;
18+
private readonly IHostApplicationLifetime _hostApplicationLifetime;
19+
private readonly IServiceProvider _serviceProvider;
20+
private readonly SemaphoreSlim _slim;
21+
22+
private CancellationTokenSource? _cts;
23+
private Task? _executingTask;
24+
private int _busStartCount;
25+
26+
public OutboxCleanUpTask(
27+
ILogger<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>> logger,
28+
OutboxSettings outboxSettings,
29+
TimeProvider currentTimeProvider,
30+
IHostApplicationLifetime hostApplicationLifetime,
31+
IServiceProvider serviceProvider)
32+
{
33+
_logger = logger;
34+
_outboxSettings = outboxSettings;
35+
_timeProvider = currentTimeProvider;
36+
_hostApplicationLifetime = hostApplicationLifetime;
37+
_serviceProvider = serviceProvider;
38+
_slim = new SemaphoreSlim(1);
39+
}
40+
41+
public async ValueTask DisposeAsync()
42+
{
43+
await Shutdown();
44+
GC.SuppressFinalize(this);
45+
}
46+
47+
public async Task OnBusLifecycle(MessageBusLifecycleEventType eventType, IMessageBus bus)
48+
{
49+
if (!_outboxSettings.MessageCleanup.Enabled)
50+
{
51+
return;
52+
}
53+
54+
switch (eventType)
55+
{
56+
case MessageBusLifecycleEventType.Started:
57+
if (Interlocked.Increment(ref _busStartCount) != 1)
58+
{
59+
return;
60+
}
61+
62+
await Startup();
63+
break;
64+
65+
case MessageBusLifecycleEventType.Stopping:
66+
if (Interlocked.Decrement(ref _busStartCount) != 0 || _executingTask == null)
67+
{
68+
return;
69+
}
70+
71+
await Shutdown();
72+
break;
73+
}
74+
}
75+
76+
protected async Task CleanUpLoop(CancellationToken cancellationToken)
77+
{
78+
var batchSize = _outboxSettings.MessageCleanup.BatchSize;
79+
while (!cancellationToken.IsCancellationRequested)
80+
{
81+
var scope = _serviceProvider.CreateScope();
82+
try
83+
{
84+
var outboxRepository = scope.ServiceProvider.GetRequiredService<IOutboxMessageRepository<TOutboxMessage, TOutboxMessageKey>>();
85+
var timestamp = _timeProvider.GetUtcNow().Add(-_outboxSettings.MessageCleanup.Age);
86+
while (!cancellationToken.IsCancellationRequested)
87+
{
88+
if (await outboxRepository.DeleteSent(timestamp, batchSize, cancellationToken) < batchSize)
89+
{
90+
break;
91+
}
92+
}
93+
94+
}
95+
catch (Exception ex)
96+
{
97+
_logger.LogError(ex, "Error while processing outbox clean up");
98+
}
99+
finally
100+
{
101+
if (scope is IAsyncDisposable asyncDisposable)
102+
{
103+
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
104+
}
105+
else
106+
{
107+
scope.Dispose();
108+
}
109+
110+
_logger.LogDebug("Outbox clean up loop stopped");
111+
}
112+
113+
await Sleep(_outboxSettings.MessageCleanup.Interval, cancellationToken).ConfigureAwait(false);
114+
}
115+
}
116+
117+
protected virtual async Task Sleep(TimeSpan delay, CancellationToken cancellationToken)
118+
{
119+
try
120+
{
121+
#if !NET8_0_OR_GREATER
122+
await _timeProvider.Delay(delay, cancellationToken).ConfigureAwait(false);
123+
#endif
124+
125+
#if NET8_0_OR_GREATER
126+
await Task.Delay(delay, _timeProvider, cancellationToken).ConfigureAwait(false);
127+
#endif
128+
}
129+
catch (TaskCanceledException)
130+
{
131+
// do nothing, will be evaluated in calling method
132+
}
133+
}
134+
135+
private async Task Startup()
136+
{
137+
await _slim.WaitAsync(_hostApplicationLifetime.ApplicationStopping);
138+
try
139+
{
140+
Debug.Assert(_executingTask == null);
141+
Debug.Assert(_cts == null);
142+
143+
_cts = CancellationTokenSource.CreateLinkedTokenSource(_hostApplicationLifetime.ApplicationStopping);
144+
_executingTask = CleanUpLoop(_cts.Token);
145+
}
146+
finally
147+
{
148+
_slim.Release();
149+
}
150+
}
151+
152+
private async Task Shutdown()
153+
{
154+
if (_cts == null)
155+
{
156+
Debug.Assert(_executingTask == null);
157+
return;
158+
}
159+
160+
await _slim.WaitAsync();
161+
try
162+
{
163+
if (_executingTask == null)
164+
{
165+
return;
166+
}
167+
168+
try
169+
{
170+
await _cts!.CancelAsync();
171+
await _executingTask;
172+
}
173+
finally
174+
{
175+
_cts!.Dispose();
176+
_cts = null;
177+
_executingTask = null;
178+
}
179+
}
180+
finally
181+
{
182+
_slim.Release();
183+
}
184+
}
185+
}

src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
internal class OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>(
44
ILoggerFactory loggerFactory,
55
OutboxSettings outboxSettings,
6-
TimeProvider timeProvider,
76
IServiceProvider serviceProvider)
87
: IMessageBusLifecycleInterceptor, IOutboxNotificationService, IAsyncDisposable
98
where TOutboxMessage : OutboxMessage<TOutboxMessageKey>
@@ -23,24 +22,6 @@ internal class OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>(
2322

2423
private int _busStartCount;
2524

26-
private DateTimeOffset? _cleanupNextRun;
27-
28-
private bool ShouldRunCleanup()
29-
{
30-
if (_outboxSettings.MessageCleanup?.Enabled == true)
31-
{
32-
var currentTime = timeProvider.GetUtcNow();
33-
var trigger = !_cleanupNextRun.HasValue || currentTime > _cleanupNextRun.Value;
34-
if (trigger)
35-
{
36-
_cleanupNextRun = currentTime.Add(_outboxSettings.MessageCleanup.Interval);
37-
}
38-
39-
return trigger;
40-
}
41-
return false;
42-
}
43-
4425
public async ValueTask DisposeAsync()
4526
{
4627
await DisposeAsyncCore().ConfigureAwait(false);
@@ -177,12 +158,6 @@ private async Task Run()
177158
try
178159
{
179160
await SendMessages(scope.ServiceProvider, outboxRepository, _loopCts.Token);
180-
181-
if (!_loopCts.IsCancellationRequested && ShouldRunCleanup())
182-
{
183-
_logger.LogTrace("Running cleanup of sent messages");
184-
await outboxRepository.DeleteSent(timeProvider.GetUtcNow().DateTime.Add(-_outboxSettings.MessageCleanup.Age), _loopCts.Token).ConfigureAwait(false);
185-
}
186161
}
187162
catch (Exception e)
188163
{

0 commit comments

Comments
 (0)