Skip to content

Commit 486988b

Browse files
committed
#375 Outbox clean up in low priority batches with regular intervals
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
1 parent 17a8721 commit 486988b

File tree

22 files changed

+766
-337
lines changed

22 files changed

+766
-337
lines changed

docs/plugin_outbox.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
1212
- [UseTransactionScope](#usetransactionscope)
1313
- [UseSqlTransaction](#usesqltransaction)
1414
- [How it works](#how-it-works)
15+
- [Clean up](#clean-up)
1516
- [Important note](#important-note)
1617

1718
## Introduction
@@ -215,7 +216,15 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe
215216

216217
- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.
217218

218-
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
219+
## Clean up
220+
On starting SMB, messages that are older than `MessageCleanup.Age` will be removed from the `Outbox` table in batches of `MessageCleanup.BatchSize` until no sent messages of the specified age remain. The process is then repeated every `MessageCleanup.Interval` period.
221+
222+
| Property | Description | Default |
223+
| --------- | -------------------------------------------------- | ------- |
224+
| Enabled | `True` if messages are to be removed | true |
225+
| Interval | Time between exections | 1 hour |
226+
| Age | Minimum age of a sent message to delete | 1 hour |
227+
| BatchSize | Number of messages to be removed in each iteration | 10 000 |
219228

220229
## Important note
221230

docs/plugin_outbox.t.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
1212
- [UseTransactionScope](#usetransactionscope)
1313
- [UseSqlTransaction](#usesqltransaction)
1414
- [How it works](#how-it-works)
15+
- [Clean up](#clean-up)
1516
- [Important note](#important-note)
1617

1718
## Introduction
@@ -146,7 +147,15 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe
146147

147148
- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.
148149

149-
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
150+
## Clean up
151+
On starting SMB, messages that are older than `MessageCleanup.Age` will be removed from the `Outbox` table in batches of `MessageCleanup.BatchSize` until no sent messages of the specified age remain. The process is then repeated every `MessageCleanup.Interval` period.
152+
153+
| Property | Description | Default |
154+
| --------- | -------------------------------------------------- | ------- |
155+
| Enabled | `True` if messages are to be removed | true |
156+
| Interval | Time between exections | 1 hour |
157+
| Age | Minimum age of a sent message to delete | 1 hour |
158+
| BatchSize | Number of messages to be removed in each iteration | 10 000 |
150159

151160
## Important note
152161

src/SlimMessageBus.Host.Outbox.Sql/MeasuringSqlMessageOutboxRepositoryDecorator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cance
4242
public Task<IHasId> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
4343
=> MeasureMethod(nameof(Create), () => target.Create(busName, headers, path, messageType, messagePayload, cancellationToken));
4444

45-
public Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
46-
=> MeasureMethod(nameof(DeleteSent), () => target.DeleteSent(olderThan, cancellationToken));
45+
public Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken)
46+
=> MeasureMethod(nameof(DeleteSent), () => target.DeleteSent(olderThan, batchSize, cancellationToken));
4747

4848
public Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
4949
=> MeasureMethod(nameof(IncrementDeliveryAttempt), () => target.IncrementDeliveryAttempt(ids, maxDeliveryAttempts, cancellationToken));

src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessage.cs renamed to src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxAdminMessage.cs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
namespace SlimMessageBus.Host.Outbox.Sql;
2-
3-
public class SqlOutboxMessage : OutboxMessage<Guid>
4-
{
5-
public DateTime Timestamp { get; set; }
6-
public string LockInstanceId { get; set; } = string.Empty;
7-
public DateTime? LockExpiresOn { get; set; } = null;
8-
public int DeliveryAttempt { get; set; } = 0;
9-
public bool DeliveryComplete { get; set; } = false;
10-
public bool DeliveryAborted { get; set; } = false;
11-
}
1+
namespace SlimMessageBus.Host.Outbox.Sql;
2+
3+
public class SqlOutboxMessage : OutboxMessage<Guid>;
4+
5+
public class SqlOutboxAdminMessage : SqlOutboxMessage
6+
{
7+
public DateTime Timestamp { get; set; }
8+
public string InstanceId { get; set; }
9+
public string LockInstanceId { get; set; } = string.Empty;
10+
public DateTime? LockExpiresOn { get; set; } = null;
11+
public int DeliveryAttempt { get; set; } = 0;
12+
public bool DeliveryComplete { get; set; } = false;
13+
public bool DeliveryAborted { get; set; } = false;
14+
}

src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessageRepository.cs

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
using System;
44

55
/// <summary>
6-
/// The MS SQL implmentation of the <see cref="IOutboxMessageRepository{TOutboxMessage, TOutboxMessageKey}"/>
6+
/// The MS SQL implementation of the <see cref="IOutboxMessageRepository{TOutboxMessage, TOutboxMessageKey}"/>
77
/// </summary>
88
public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlMessageOutboxRepository
99
{
@@ -46,7 +46,7 @@ public SqlOutboxMessageRepository(
4646

4747
public virtual async Task<IHasId> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
4848
{
49-
var om = new SqlOutboxMessage
49+
var om = new SqlOutboxAdminMessage
5050
{
5151
Timestamp = _timeProvider.GetUtcNow().DateTime,
5252
InstanceId = _instanceIdProvider.GetInstanceId(),
@@ -102,7 +102,40 @@ public async Task<IReadOnlyCollection<SqlOutboxMessage>> LockAndSelect(string in
102102
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
103103
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;
104104

105-
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
105+
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
106+
107+
var idOrdinal = reader.GetOrdinal("Id");
108+
var busNameOrdinal = reader.GetOrdinal("BusName");
109+
var typeOrdinal = reader.GetOrdinal("MessageType");
110+
var payloadOrdinal = reader.GetOrdinal("MessagePayload");
111+
var headersOrdinal = reader.GetOrdinal("Headers");
112+
var pathOrdinal = reader.GetOrdinal("Path");
113+
114+
var items = new List<SqlOutboxMessage>();
115+
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
116+
{
117+
var headers = reader.IsDBNull(headersOrdinal)
118+
? null
119+
: reader.GetString(headersOrdinal);
120+
121+
var message = new SqlOutboxMessage
122+
{
123+
Id = reader.GetGuid(idOrdinal),
124+
BusName = reader.GetString(busNameOrdinal),
125+
MessageType = reader.GetString(typeOrdinal),
126+
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
127+
Headers = headers == null
128+
? null
129+
: JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
130+
Path = reader.IsDBNull(pathOrdinal)
131+
? null
132+
: reader.GetString(pathOrdinal)
133+
};
134+
135+
items.Add(message);
136+
}
137+
138+
return items;
106139
}
107140

108141
public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
@@ -182,17 +215,23 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
182215
}
183216
}
184217

185-
public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
218+
public async Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken)
186219
{
187220
await EnsureConnection();
188221

189222
var affected = await ExecuteNonQuery(
190223
Settings.SqlSettings.OperationRetry,
191224
_sqlTemplate.SqlOutboxMessageDeleteSent,
192-
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan,
225+
cmd =>
226+
{
227+
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
228+
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan;
229+
},
193230
cancellationToken);
194231

195232
Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);
233+
234+
return affected;
196235
}
197236

198237
public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)
@@ -207,18 +246,13 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc
207246
return await cmd.ExecuteNonQueryAsync(cancellationToken) > 0;
208247
}
209248

210-
internal async Task<IReadOnlyCollection<SqlOutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
249+
internal async Task<IReadOnlyCollection<SqlOutboxAdminMessage>> GetAllMessages(CancellationToken cancellationToken)
211250
{
212251
await EnsureConnection();
213252

214253
using var cmd = CreateCommand();
215254
cmd.CommandText = _sqlTemplate.SqlOutboxAllMessages;
216255

217-
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
218-
}
219-
220-
private static async Task<IReadOnlyCollection<SqlOutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
221-
{
222256
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
223257

224258
var idOrdinal = reader.GetOrdinal("Id");
@@ -235,14 +269,14 @@ private static async Task<IReadOnlyCollection<SqlOutboxMessage>> ReadMessages(Sq
235269
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
236270
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");
237271

238-
var items = new List<SqlOutboxMessage>();
272+
var items = new List<SqlOutboxAdminMessage>();
239273
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
240274
{
241275
var headers = reader.IsDBNull(headersOrdinal)
242276
? null
243277
: reader.GetString(headersOrdinal);
244278

245-
var message = new SqlOutboxMessage
279+
var message = new SqlOutboxAdminMessage
246280
{
247281
Id = reader.GetGuid(idOrdinal),
248282
Timestamp = reader.GetDateTime(timestampOrdinal),

src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,19 @@ await TryApplyMigration("20240831000000_SMB_RemoveOutboxIdType",
9292
DROP TYPE IF EXISTS {qualifiedOutboxIdTypeName};
9393
""",
9494
token);
95+
96+
await TryApplyMigration("20250307000000_SMB_BatchedDeletes",
97+
$"""
98+
-- drop old indexes
99+
DROP INDEX IF EXISTS IX_Outbox_LockExpiresOn_LockInstanceId_DeliveryComplete_0_DeliveryAborted_0 ON {qualifiedTableName};
100+
DROP INDEX IF EXISTS IX_Outbox_Timestamp_DeliveryComplete_1_DeliveryAborted_0 ON {qualifiedTableName};
101+
102+
-- SqlOutboxTemplate.SqlOutboxMessageDeleteSent + SqlOutboxTemplate.SqlOutboxMessageLockTableAndSelect
103+
CREATE CLUSTERED INDEX IX_Outbox_DeliveryComplete_Timestamp ON {qualifiedTableName} (DeliveryComplete, Timestamp);
104+
105+
-- SqlOutboxTemplate.SqlOutboxMessageDeleteSent
106+
CREATE INDEX IX_Outbox_LockExpiredOn_LockInstanceId__DeliveryComplete_0_DeliveryAborted_0 ON {qualifiedTableName} (LockExpiresOn, LockInstanceId) WHERE [DeliveryComplete] = 0 and [DeliveryAborted] = 0;
107+
""",
108+
token);
95109
}
96110
}

src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,19 @@ OUTPUT INSERTED.[Id]
3939
SqlOutboxMessageInsertWithDatabaseIdSequential = insertWith("NEWSEQUENTIALID()");
4040

4141
SqlOutboxMessageDeleteSent = $"""
42+
SET DEADLOCK_PRIORITY LOW;
43+
WITH CTE AS (SELECT TOP (@BatchSize) Id
44+
FROM {TableNameQualified} WITH (ROWLOCK, READPAST)
45+
WHERE DeliveryComplete = 1
46+
AND Timestamp < @Timestamp
47+
ORDER BY Timestamp ASC)
4248
DELETE FROM {TableNameQualified}
43-
WHERE [DeliveryComplete] = 1
44-
AND [Timestamp] < @Timestamp
49+
WHERE Id IN (SELECT Id FROM CTE);
4550
""";
4651

4752
SqlOutboxMessageLockAndSelect = $"""
4853
WITH Batch AS (SELECT TOP (@BatchSize) *
49-
FROM {TableNameQualified}
54+
FROM {TableNameQualified} WITH (ROWLOCK, UPDLOCK, READPAST)
5055
WHERE DeliveryComplete = 0
5156
AND (LockInstanceId = @InstanceId
5257
OR LockExpiresOn < GETUTCDATE())
@@ -56,18 +61,11 @@ UPDATE Batch
5661
SET LockInstanceId = @InstanceId,
5762
LockExpiresOn = DATEADD(SECOND, @LockDuration, GETUTCDATE())
5863
OUTPUT INSERTED.Id
59-
, INSERTED.Timestamp
6064
, INSERTED.BusName
6165
, INSERTED.MessageType
6266
, INSERTED.MessagePayload
6367
, INSERTED.Headers
64-
, INSERTED.Path
65-
, INSERTED.InstanceId
66-
, INSERTED.LockInstanceId
67-
, INSERTED.LockExpiresOn
68-
, INSERTED.DeliveryAttempt
69-
, INSERTED.DeliveryComplete
70-
, INSERTED.DeliveryAborted;
68+
, INSERTED.Path;
7169
""";
7270

7371
// Only create lock if there are no active locks from another instance.
@@ -93,18 +91,11 @@ UPDATE UpdatedRows
9391
END;
9492
9593
SELECT TOP (@BatchSize) Id
96-
, Timestamp
9794
, BusName
9895
, MessageType
9996
, MessagePayload
10097
, Headers
10198
, Path
102-
, InstanceId
103-
, LockInstanceId
104-
, LockExpiresOn
105-
, DeliveryAttempt
106-
, DeliveryComplete
107-
, DeliveryAborted
10899
FROM {TableNameQualified}
109100
WHERE LockInstanceId = @InstanceId
110101
AND LockExpiresOn > GETUTCDATE()
@@ -115,27 +106,30 @@ SELECT TOP (@BatchSize) Id
115106

116107
// See https://learn.microsoft.com/en-us/sql/t-sql/functions/string-split-transact-sql?view=sql-server-ver16
117108
// See https://stackoverflow.com/a/47777878/1906057
118-
var inIdsSql = $"SELECT CONVERT(uniqueidentifier, [value]) from STRING_SPLIT(@Ids, '{InIdsSeparator}')";
109+
var inIdsSql = $"SELECT CAST([value] AS uniqueidentifier) Id from STRING_SPLIT(@Ids, '{InIdsSeparator}')";
119110

120111
SqlOutboxMessageUpdateSent = $"""
121-
UPDATE {TableNameQualified}
112+
UPDATE T
122113
SET [DeliveryComplete] = 1,
123114
[DeliveryAttempt] = DeliveryAttempt + 1
124-
WHERE [Id] IN ({inIdsSql});
115+
FROM {TableNameQualified} T
116+
INNER JOIN ({inIdsSql}) Ids ON T.Id = Ids.id;
125117
""";
126118

127119
SqlOutboxMessageIncrementDeliveryAttempt = $"""
128-
UPDATE {TableNameQualified}
120+
UPDATE T
129121
SET [DeliveryAttempt] = DeliveryAttempt + 1,
130122
[DeliveryAborted] = CASE WHEN [DeliveryAttempt] >= @MaxDeliveryAttempts THEN 1 ELSE 0 END
131-
WHERE [Id] IN ({inIdsSql});
123+
FROM {TableNameQualified} T
124+
INNER JOIN ({inIdsSql}) Ids ON T.Id = Ids.id;
132125
""";
133126

134127
SqlOutboxMessageAbortDelivery = $"""
135-
UPDATE {TableNameQualified}
128+
UPDATE T
136129
SET [DeliveryAttempt] = DeliveryAttempt + 1,
137130
[DeliveryAborted] = 1
138-
WHERE [Id] IN ({inIdsSql});
131+
FROM {TableNameQualified} T
132+
INNER JOIN ({inIdsSql}) Ids ON T.Id = Ids.id;
139133
""";
140134

141135
SqlOutboxMessageRenewLock = $"""

src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public static MessageBusBuilder AddOutbox<TOutboxMessage, TOutboxMessageKey>(thi
3838
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>()));
3939
services.TryAddSingleton<IOutboxNotificationService>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>());
4040

41+
services.AddSingleton<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>();
42+
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>(sp => sp.GetRequiredService<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>()));
43+
4144
services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
4245
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory<TOutboxMessage, TOutboxMessageKey>>();
4346

src/SlimMessageBus.Host.Outbox/Configuration/OutboxMessageCleanupSettings.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@ public class OutboxMessageCleanupSettings
1414
/// Message age from which the sent messages are removed from.
1515
/// </summary>
1616
public TimeSpan Age { get; set; } = TimeSpan.FromHours(1);
17+
/// <summary>
18+
/// Maximum number of sent messages to remove per statement.
19+
/// </summary>
20+
public int BatchSize { get; set; } = 10_000;
1721
}

src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageRepository.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ public interface IOutboxMessageRepository<TOutboxMessage, TOutboxMessageKey>
77
Task AbortDelivery(IReadOnlyCollection<TOutboxMessageKey> ids, CancellationToken cancellationToken);
88
Task UpdateToSent(IReadOnlyCollection<TOutboxMessageKey> ids, CancellationToken cancellationToken);
99
Task IncrementDeliveryAttempt(IReadOnlyCollection<TOutboxMessageKey> ids, int maxDeliveryAttempts, CancellationToken cancellationToken);
10-
Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken);
10+
Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken);
1111
Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken);
12-
}
13-
12+
}

0 commit comments

Comments
 (0)