Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/plugin_outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [UseTransactionScope](#usetransactionscope)
- [UseSqlTransaction](#usesqltransaction)
- [How it works](#how-it-works)
- [Clean up](#clean-up)
- [Important note](#important-note)

## Introduction
Expand Down Expand Up @@ -215,7 +216,15 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe

- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.

- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
## Clean up
On starting SMB, messages that are older than `MessageCleanup.Age` will be removed from the `Outbox` table in batches of `MessageCleanup.BatchSize` until no sent messages of the specified age remain. The process is then repeated every `MessageCleanup.Interval` period.

| Property | Description | Default |
| --------- | -------------------------------------------------- | ------- |
| Enabled | `True` if messages are to be removed | true |
| Interval | Time between exections | 1 hour |
| Age | Minimum age of a sent message to delete | 1 hour |
| BatchSize | Number of messages to be removed in each iteration | 10 000 |

## Important note

Expand Down
11 changes: 10 additions & 1 deletion docs/plugin_outbox.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [UseTransactionScope](#usetransactionscope)
- [UseSqlTransaction](#usesqltransaction)
- [How it works](#how-it-works)
- [Clean up](#clean-up)
- [Important note](#important-note)

## Introduction
Expand Down Expand Up @@ -146,7 +147,15 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe

- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.

- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
## Clean up
On starting SMB, messages that are older than `MessageCleanup.Age` will be removed from the `Outbox` table in batches of `MessageCleanup.BatchSize` until no sent messages of the specified age remain. The process is then repeated every `MessageCleanup.Interval` period.

| Property | Description | Default |
| --------- | -------------------------------------------------- | ------- |
| Enabled | `True` if messages are to be removed | true |
| Interval | Time between exections | 1 hour |
| Age | Minimum age of a sent message to delete | 1 hour |
| BatchSize | Number of messages to be removed in each iteration | 10 000 |

## Important note

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cance
public Task<IHasId> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
=> MeasureMethod(nameof(Create), () => target.Create(busName, headers, path, messageType, messagePayload, cancellationToken));

public Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
=> MeasureMethod(nameof(DeleteSent), () => target.DeleteSent(olderThan, cancellationToken));
public Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken)
=> MeasureMethod(nameof(DeleteSent), () => target.DeleteSent(olderThan, batchSize, cancellationToken));

public Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
=> MeasureMethod(nameof(IncrementDeliveryAttempt), () => target.IncrementDeliveryAttempt(ids, maxDeliveryAttempts, cancellationToken));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class SqlOutboxMessage : OutboxMessage<Guid>
{
public DateTime Timestamp { get; set; }
public string LockInstanceId { get; set; } = string.Empty;
public DateTime? LockExpiresOn { get; set; } = null;
public int DeliveryAttempt { get; set; } = 0;
public bool DeliveryComplete { get; set; } = false;
public bool DeliveryAborted { get; set; } = false;
}
namespace SlimMessageBus.Host.Outbox.Sql;

public class SqlOutboxMessage : OutboxMessage<Guid>;

public class SqlOutboxAdminMessage : SqlOutboxMessage
{
public DateTime Timestamp { get; set; }
public string InstanceId { get; set; }
public string LockInstanceId { get; set; } = string.Empty;
public DateTime? LockExpiresOn { get; set; } = null;
public int DeliveryAttempt { get; set; } = 0;
public bool DeliveryComplete { get; set; } = false;
public bool DeliveryAborted { get; set; } = false;
}
60 changes: 47 additions & 13 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessageRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System;

/// <summary>
/// The MS SQL implmentation of the <see cref="IOutboxMessageRepository{TOutboxMessage, TOutboxMessageKey}"/>
/// The MS SQL implementation of the <see cref="IOutboxMessageRepository{TOutboxMessage, TOutboxMessageKey}"/>
/// </summary>
public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlMessageOutboxRepository
{
Expand Down Expand Up @@ -46,7 +46,7 @@ public SqlOutboxMessageRepository(

public virtual async Task<IHasId> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
{
var om = new SqlOutboxMessage
var om = new SqlOutboxAdminMessage
{
Timestamp = _timeProvider.GetUtcNow().DateTime,
InstanceId = _instanceIdProvider.GetInstanceId(),
Expand Down Expand Up @@ -102,7 +102,40 @@ public async Task<IReadOnlyCollection<SqlOutboxMessage>> LockAndSelect(string in
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

var idOrdinal = reader.GetOrdinal("Id");
var busNameOrdinal = reader.GetOrdinal("BusName");
var typeOrdinal = reader.GetOrdinal("MessageType");
var payloadOrdinal = reader.GetOrdinal("MessagePayload");
var headersOrdinal = reader.GetOrdinal("Headers");
var pathOrdinal = reader.GetOrdinal("Path");

var items = new List<SqlOutboxMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var headers = reader.IsDBNull(headersOrdinal)
? null
: reader.GetString(headersOrdinal);

var message = new SqlOutboxMessage
{
Id = reader.GetGuid(idOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = reader.GetString(typeOrdinal),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null
? null
: JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal)
? null
: reader.GetString(pathOrdinal)
};

items.Add(message);
}

return items;
}

public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
Expand Down Expand Up @@ -182,17 +215,23 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
}
}

public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
public async Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken)
{
await EnsureConnection();

var affected = await ExecuteNonQuery(
Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageDeleteSent,
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan,
cmd =>
{
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan;
},
cancellationToken);

Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);

return affected;
}

public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)
Expand All @@ -207,18 +246,13 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc
return await cmd.ExecuteNonQueryAsync(cancellationToken) > 0;
}

internal async Task<IReadOnlyCollection<SqlOutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
internal async Task<IReadOnlyCollection<SqlOutboxAdminMessage>> GetAllMessages(CancellationToken cancellationToken)
{
await EnsureConnection();

using var cmd = CreateCommand();
cmd.CommandText = _sqlTemplate.SqlOutboxAllMessages;

return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

private static async Task<IReadOnlyCollection<SqlOutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
{
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

var idOrdinal = reader.GetOrdinal("Id");
Expand All @@ -235,14 +269,14 @@ private static async Task<IReadOnlyCollection<SqlOutboxMessage>> ReadMessages(Sq
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");

var items = new List<SqlOutboxMessage>();
var items = new List<SqlOutboxAdminMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var headers = reader.IsDBNull(headersOrdinal)
? null
: reader.GetString(headersOrdinal);

var message = new SqlOutboxMessage
var message = new SqlOutboxAdminMessage
{
Id = reader.GetGuid(idOrdinal),
Timestamp = reader.GetDateTime(timestampOrdinal),
Expand Down
14 changes: 14 additions & 0 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,19 @@ await TryApplyMigration("20240831000000_SMB_RemoveOutboxIdType",
DROP TYPE IF EXISTS {qualifiedOutboxIdTypeName};
""",
token);

await TryApplyMigration("20250307000000_SMB_BatchedDeletes",
$"""
-- drop old indexes
DROP INDEX IF EXISTS IX_Outbox_LockExpiresOn_LockInstanceId_DeliveryComplete_0_DeliveryAborted_0 ON {qualifiedTableName};
DROP INDEX IF EXISTS IX_Outbox_Timestamp_DeliveryComplete_1_DeliveryAborted_0 ON {qualifiedTableName};

-- SqlOutboxTemplate.SqlOutboxMessageDeleteSent + SqlOutboxTemplate.SqlOutboxMessageLockTableAndSelect
CREATE CLUSTERED INDEX IX_Outbox_DeliveryComplete_Timestamp ON {qualifiedTableName} (DeliveryComplete, Timestamp);

-- SqlOutboxTemplate.SqlOutboxMessageDeleteSent
CREATE INDEX IX_Outbox_LockExpiredOn_LockInstanceId__DeliveryComplete_0_DeliveryAborted_0 ON {qualifiedTableName} (LockExpiresOn, LockInstanceId) WHERE [DeliveryComplete] = 0 and [DeliveryAborted] = 0;
""",
token);
}
}
44 changes: 19 additions & 25 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,19 @@ OUTPUT INSERTED.[Id]
SqlOutboxMessageInsertWithDatabaseIdSequential = insertWith("NEWSEQUENTIALID()");

SqlOutboxMessageDeleteSent = $"""
SET DEADLOCK_PRIORITY LOW;
WITH CTE AS (SELECT TOP (@BatchSize) Id
FROM {TableNameQualified} WITH (ROWLOCK, READPAST)
WHERE DeliveryComplete = 1
AND Timestamp < @Timestamp
ORDER BY Timestamp ASC)
DELETE FROM {TableNameQualified}
WHERE [DeliveryComplete] = 1
AND [Timestamp] < @Timestamp
WHERE Id IN (SELECT Id FROM CTE);
""";

SqlOutboxMessageLockAndSelect = $"""
WITH Batch AS (SELECT TOP (@BatchSize) *
FROM {TableNameQualified}
FROM {TableNameQualified} WITH (ROWLOCK, UPDLOCK, READPAST)
WHERE DeliveryComplete = 0
AND (LockInstanceId = @InstanceId
OR LockExpiresOn < GETUTCDATE())
Expand All @@ -56,18 +61,11 @@ UPDATE Batch
SET LockInstanceId = @InstanceId,
LockExpiresOn = DATEADD(SECOND, @LockDuration, GETUTCDATE())
OUTPUT INSERTED.Id
, INSERTED.Timestamp
, INSERTED.BusName
, INSERTED.MessageType
, INSERTED.MessagePayload
, INSERTED.Headers
, INSERTED.Path
, INSERTED.InstanceId
, INSERTED.LockInstanceId
, INSERTED.LockExpiresOn
, INSERTED.DeliveryAttempt
, INSERTED.DeliveryComplete
, INSERTED.DeliveryAborted;
, INSERTED.Path;
""";

// Only create lock if there are no active locks from another instance.
Expand All @@ -93,18 +91,11 @@ UPDATE UpdatedRows
END;

SELECT TOP (@BatchSize) Id
, Timestamp
, BusName
, MessageType
, MessagePayload
, Headers
, Path
, InstanceId
, LockInstanceId
, LockExpiresOn
, DeliveryAttempt
, DeliveryComplete
, DeliveryAborted
FROM {TableNameQualified}
WHERE LockInstanceId = @InstanceId
AND LockExpiresOn > GETUTCDATE()
Expand All @@ -115,27 +106,30 @@ SELECT TOP (@BatchSize) Id

// See https://learn.microsoft.com/en-us/sql/t-sql/functions/string-split-transact-sql?view=sql-server-ver16
// See https://stackoverflow.com/a/47777878/1906057
var inIdsSql = $"SELECT CONVERT(uniqueidentifier, [value]) from STRING_SPLIT(@Ids, '{InIdsSeparator}')";
var inIdsSql = $"SELECT CAST([value] AS uniqueidentifier) Id from STRING_SPLIT(@Ids, '{InIdsSeparator}')";

SqlOutboxMessageUpdateSent = $"""
UPDATE {TableNameQualified}
UPDATE T
SET [DeliveryComplete] = 1,
[DeliveryAttempt] = DeliveryAttempt + 1
WHERE [Id] IN ({inIdsSql});
FROM {TableNameQualified} T
INNER JOIN ({inIdsSql}) Ids ON T.Id = Ids.id;
""";

SqlOutboxMessageIncrementDeliveryAttempt = $"""
UPDATE {TableNameQualified}
UPDATE T
SET [DeliveryAttempt] = DeliveryAttempt + 1,
[DeliveryAborted] = CASE WHEN [DeliveryAttempt] >= @MaxDeliveryAttempts THEN 1 ELSE 0 END
WHERE [Id] IN ({inIdsSql});
FROM {TableNameQualified} T
INNER JOIN ({inIdsSql}) Ids ON T.Id = Ids.id;
""";

SqlOutboxMessageAbortDelivery = $"""
UPDATE {TableNameQualified}
UPDATE T
SET [DeliveryAttempt] = DeliveryAttempt + 1,
[DeliveryAborted] = 1
WHERE [Id] IN ({inIdsSql});
FROM {TableNameQualified} T
INNER JOIN ({inIdsSql}) Ids ON T.Id = Ids.id;
""";

SqlOutboxMessageRenewLock = $"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public static MessageBusBuilder AddOutbox<TOutboxMessage, TOutboxMessageKey>(thi
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>()));
services.TryAddSingleton<IOutboxNotificationService>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxMessage, TOutboxMessageKey>>());

services.AddSingleton<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>(sp => sp.GetRequiredService<OutboxCleanUpTask<TOutboxMessage, TOutboxMessageKey>>()));

services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory<TOutboxMessage, TOutboxMessageKey>>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ public class OutboxMessageCleanupSettings
/// Message age from which the sent messages are removed from.
/// </summary>
public TimeSpan Age { get; set; } = TimeSpan.FromHours(1);
/// <summary>
/// Maximum number of sent messages to remove per statement.
/// </summary>
public int BatchSize { get; set; } = 10_000;
}
2 changes: 2 additions & 0 deletions src/SlimMessageBus.Host.Outbox/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
global using Microsoft.Extensions.Logging;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Hosting;

global using System.Diagnostics;

global using SlimMessageBus;
global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Interceptor;
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ public interface IOutboxMessageRepository<TOutboxMessage, TOutboxMessageKey>
Task AbortDelivery(IReadOnlyCollection<TOutboxMessageKey> ids, CancellationToken cancellationToken);
Task UpdateToSent(IReadOnlyCollection<TOutboxMessageKey> ids, CancellationToken cancellationToken);
Task IncrementDeliveryAttempt(IReadOnlyCollection<TOutboxMessageKey> ids, int maxDeliveryAttempts, CancellationToken cancellationToken);
Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken);
Task<int> DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken);
Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken);
}

}
3 changes: 0 additions & 3 deletions src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Outbox;



public class OutboxMessage<TOutboxMessageKey> : IHasId<TOutboxMessageKey>
{
public TOutboxMessageKey Id { get; set; }
Expand All @@ -10,7 +8,6 @@ public class OutboxMessage<TOutboxMessageKey> : IHasId<TOutboxMessageKey>
public byte[] MessagePayload { get; set; }
public string Path { get; set; }
public IDictionary<string, object> Headers { get; set; }
public string InstanceId { get; set; }

object IHasId.Id => Id;
}
Loading
Loading