Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
32b7c73
add installer
hasancanguler Jun 25, 2025
4edf443
Undo public API and feature rename
danielmarbach Aug 12, 2025
530991f
Installer Settings without public API
danielmarbach Aug 12, 2025
fac7a60
SubscriptionSchemaInstaller
danielmarbach Aug 12, 2025
d0a7c1b
Cleanup SagaStorage
danielmarbach Aug 12, 2025
f312a03
Unify database and collection settings for now
danielmarbach Aug 12, 2025
110b791
TODO
danielmarbach Aug 12, 2025
36ebee1
More passing of settings
danielmarbach Aug 12, 2025
b6c2421
Another TOOD
danielmarbach Aug 12, 2025
5f5e010
Alignment
danielmarbach Aug 12, 2025
9c690f2
Another TODO
danielmarbach Aug 12, 2025
0ff19a8
Move class map out of installer
danielmarbach Aug 13, 2025
4516c20
Remove unnecessary code
danielmarbach Aug 13, 2025
072df37
Register outbox class mapping
danielmarbach Aug 13, 2025
6dd1aa4
Use collection settings
danielmarbach Aug 13, 2025
0bf60dd
Async installers
danielmarbach Aug 13, 2025
b7d97c3
Rename outbox installer method
danielmarbach Aug 13, 2025
5d819a3
Rename for better clarity
danielmarbach Aug 13, 2025
fc2a64a
Create the outbox collection too
danielmarbach Aug 13, 2025
b1ccc28
Use installers where possible
danielmarbach Aug 13, 2025
a10efd9
Test collection creation
danielmarbach Aug 13, 2025
8a791df
Public API to disable installers
danielmarbach Aug 13, 2025
5d1ed09
Move slightly up
danielmarbach Aug 13, 2025
32b4e1b
Remove unnecessary check because collections can get auto-created on …
danielmarbach Aug 13, 2025
5e5b62e
Probably out of scope for now
danielmarbach Aug 13, 2025
4674775
Rename installers
danielmarbach Aug 13, 2025
ad06eb6
Preserve caching behavior and make sure class maps are registered first
danielmarbach Aug 13, 2025
b54b791
Get rid of installer feature since we don't need additional DI stuff …
danielmarbach Aug 13, 2025
d15fa8c
Also create subscription table
danielmarbach Aug 13, 2025
729a9c3
Move to extension method
danielmarbach Aug 14, 2025
ed8bca8
Extract default and move
danielmarbach Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using Persistence;
using Storage.MongoDB;
using Storage.MongoDB.Tests;
using OutboxStorageFeature = Storage.MongoDB.OutboxStorage;
using SagaStorageFeature = Storage.MongoDB.SagaStorage;
using SynchronizedStorageSession = Storage.MongoDB.SynchronizedStorageSession;

public partial class PersistenceTestsConfiguration
Expand All @@ -38,25 +40,27 @@ public async Task Configure(CancellationToken cancellationToken = default)
BsonSerializer.TryRegisterSerializer(new GuidSerializer(GuidRepresentation.Standard));

var memberMapCache = new MemberMapCache();
Storage.MongoDB.SagaStorage.InitializeSagaDataTypes(ClientProvider.Client, memberMapCache, databaseName,
MongoPersistence.DefaultCollectionNamingConvention, SagaMetadataCollection);
SagaStorage = new SagaPersister(SagaPersister.DefaultVersionElementName, memberMapCache);
var synchronizedStorage = new StorageSessionFactory(ClientProvider.Client, true, databaseName,
MongoPersistence.DefaultCollectionNamingConvention,
SessionTimeout ?? MongoPersistence.DefaultTransactionTimeout);
CreateStorageSession = () => new SynchronizedStorageSession(synchronizedStorage);

var databaseSettings = new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};
var database = ClientProvider.Client.GetDatabase(databaseName, databaseSettings);
await database.CreateCollectionAsync(MongoPersistence.DefaultCollectionNamingConvention(typeof(OutboxRecord)),
cancellationToken: cancellationToken);
OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName,
MongoPersistence.DefaultCollectionNamingConvention);

SagaStorageFeature.RegisterSagaEntityClassMappings(SagaMetadataCollection, memberMapCache);
await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, databaseSettings, memberMapCache, databaseName,
MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, SagaMetadataCollection, cancellationToken);

SagaStorage = new SagaPersister(SagaPersister.DefaultVersionElementName, memberMapCache);
var synchronizedStorage = new StorageSessionFactory(ClientProvider.Client, true, databaseName,
MongoPersistence.DefaultCollectionNamingConvention,
SessionTimeout ?? MongoPersistence.DefaultTransactionTimeout);
CreateStorageSession = () => new SynchronizedStorageSession(synchronizedStorage);

OutboxStorageFeature.RegisterOutboxClassMappings();
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, TimeSpan.FromDays(7), cancellationToken);

OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName, MongoPersistence.DefaultCollectionNamingConvention);
}

public async Task Cleanup(CancellationToken cancellationToken = default) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace NServiceBus
{
public static NServiceBus.CompatibilitySettings CommunityPersistenceCompatibility(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions) { }
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> DatabaseName(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, string databaseName) { }
public static void DisableInstaller(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions) { }
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> MongoClient(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, MongoDB.Driver.IMongoClient mongoClient) { }
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> TimeToKeepOutboxDeduplicationData(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, System.TimeSpan timeToKeepOutboxDeduplicationData) { }
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> UseTransactions(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, bool useTransactions) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,54 @@ namespace NServiceBus.Storage.MongoDB.Tests;
public class OutboxInitializationTests
{
[OneTimeSetUp]
public async Task OneTimeSetUp()
public Task OneTimeSetUp()
{
databaseName = "Test_" + DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture);

var databaseSettings = new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};
database = ClientProvider.Client.GetDatabase(databaseName, MongoPersistence.DefaultDatabaseSettings);
outboxCollection = database.GetCollection<OutboxRecord>(CollectionNamingConvention<OutboxRecord>(), MongoPersistence.DefaultCollectionSettings);

var database = ClientProvider.Client.GetDatabase(databaseName, databaseSettings);

await database.CreateCollectionAsync(CollectionNamingConvention<OutboxRecord>());

outboxCollection = ClientProvider.Client.GetDatabase(databaseName)
.GetCollection<OutboxRecord>(CollectionNamingConvention<OutboxRecord>());
return Task.CompletedTask;
}

static string CollectionNamingConvention<T>() => CollectionNamingConvention(typeof(T));

static string CollectionNamingConvention(Type type) => type.Name.ToLower();

[SetUp]
public async Task Setup() => await outboxCollection.Indexes.DropAllAsync();
public async Task Setup() => await database.DropCollectionAsync(CollectionNamingConvention<OutboxRecord>());

[Theory]
public async Task Should_create_collection_when_it_doesnt_exist(TimeSpan timeToKeepOutboxDeduplicationData)
{
await database.DropCollectionAsync(CollectionNamingConvention<OutboxRecord>());

await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, timeToKeepOutboxDeduplicationData);

var collections = await (await database
.ListCollectionsAsync()).ToListAsync();

Assert.That(collections, Has.One.Matches<BsonDocument>(b => b.GetElement("name").Value == CollectionNamingConvention<OutboxRecord>()));
}

[Theory]
public async Task Should_create_index_when_it_doesnt_exist(TimeSpan timeToKeepOutboxDeduplicationData)
{
OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention,
timeToKeepOutboxDeduplicationData);
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, timeToKeepOutboxDeduplicationData);

await AssertIndexCorrect(outboxCollection, timeToKeepOutboxDeduplicationData);
}

[Theory]
public async Task Should_recreate_when_expiry_drifts(TimeSpan timeToKeepOutboxDeduplicationData)
{
OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention,
timeToKeepOutboxDeduplicationData.Add(TimeSpan.FromSeconds(30)));
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, timeToKeepOutboxDeduplicationData.Add(TimeSpan.FromSeconds(30)));

OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention,
timeToKeepOutboxDeduplicationData);
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, timeToKeepOutboxDeduplicationData);

await AssertIndexCorrect(outboxCollection, timeToKeepOutboxDeduplicationData);
}
Expand All @@ -64,22 +69,22 @@ public async Task Should_recreate_when_expiry_column_dropped(TimeSpan timeToKeep
{
var indexModel = new CreateIndexModel<OutboxRecord>(
Builders<OutboxRecord>.IndexKeys.Ascending(record => record.Dispatched),
new CreateIndexOptions { Name = OutboxStorage.OutboxCleanupIndexName, Background = true });
new CreateIndexOptions { Name = OutboxInstaller.OutboxCleanupIndexName, Background = true });
await outboxCollection.Indexes.CreateOneAsync(indexModel);

OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, databaseName, CollectionNamingConvention,
timeToKeepOutboxDeduplicationData);
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, timeToKeepOutboxDeduplicationData);

await AssertIndexCorrect(outboxCollection, timeToKeepOutboxDeduplicationData);
}

[DatapointSource]
public TimeSpan[] Expiry = new TimeSpan[] { TimeSpan.FromHours(1), TimeSpan.FromHours(3), TimeSpan.FromDays(1) };
public TimeSpan[] Expiry = [TimeSpan.FromHours(1), TimeSpan.FromHours(3), TimeSpan.FromDays(1)];

static async Task AssertIndexCorrect(IMongoCollection<OutboxRecord> outboxCollection, TimeSpan expiry)
{
var outboxCleanupIndex = (await outboxCollection.Indexes.ListAsync()).ToList().SingleOrDefault(indexDocument =>
indexDocument.GetElement("name").Value == OutboxStorage.OutboxCleanupIndexName);
indexDocument.GetElement("name").Value == OutboxInstaller.OutboxCleanupIndexName);

Assert.That(outboxCleanupIndex, Is.Not.Null);

Expand All @@ -92,4 +97,5 @@ static async Task AssertIndexCorrect(IMongoCollection<OutboxRecord> outboxCollec

IMongoCollection<OutboxRecord> outboxCollection;
string databaseName;
IMongoDatabase database;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
using System.Globalization;
using System.Threading.Tasks;
using Extensibility;
using global::MongoDB.Driver;
using NServiceBus.Outbox;
using Outbox;

public class OutboxTestsConfiguration
{
Expand All @@ -31,34 +30,21 @@ public OutboxTestsConfiguration(TimeSpan? transactionTimeout = null) : this(t =>

public IOutboxStorage OutboxStorage { get; private set; }

public Task<IOutboxTransaction> CreateTransaction(ContextBag context)
{
return transactionFactory.BeginTransaction(context);
}
public Task<IOutboxTransaction> CreateTransaction(ContextBag context) => transactionFactory.BeginTransaction(context);

public async Task Configure()
{
var databaseSettings = new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};

var database = ClientProvider.Client.GetDatabase(DatabaseName, databaseSettings);
var database = ClientProvider.Client.GetDatabase(DatabaseName, MongoPersistence.DefaultDatabaseSettings);

await database.CreateCollectionAsync(CollectionNamingConvention(typeof(OutboxRecord)));

MongoDB.OutboxStorage.InitializeOutboxTypes(ClientProvider.Client, DatabaseName, CollectionNamingConvention,
TimeSpan.FromHours(1));
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, TimeSpan.FromHours(1));

OutboxStorage = new OutboxPersister(ClientProvider.Client, DatabaseName, CollectionNamingConvention);
}

public async Task Cleanup()
{
await ClientProvider.Client.DropDatabaseAsync(DatabaseName);
}
public async Task Cleanup() => await ClientProvider.Client.DropDatabaseAsync(DatabaseName);

readonly MongoOutboxTransactionFactory transactionFactory;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using global::MongoDB.Driver;
using MongoDB;
using Sagas;
using SagaStorageFeature = SagaStorage;
using SynchronizedStorageSession = SynchronizedStorageSession;

public class SagaTestsConfiguration
Expand Down Expand Up @@ -76,12 +77,8 @@ public async Task Configure()
WriteConcern = WriteConcern.WMajority
};

var database = ClientProvider.Client.GetDatabase(DatabaseName, databaseSettings);

await database.CreateCollectionAsync(CollectionNamingConvention(typeof(OutboxRecord)));

MongoDB.SagaStorage.InitializeSagaDataTypes(ClientProvider.Client, memberMapCache, DatabaseName,
CollectionNamingConvention, SagaMetadataCollection);
SagaStorageFeature.RegisterSagaEntityClassMappings(SagaMetadataCollection, memberMapCache);
await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, databaseSettings, memberMapCache, DatabaseName, CollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, SagaMetadataCollection);
}

public async Task Cleanup() => await ClientProvider.Client.DropDatabaseAsync(DatabaseName);
Expand Down
Loading
Loading