From cb6614b39ef699afba6b3fe64550ac50cae93a8e Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 19 Aug 2025 11:28:55 +0200 Subject: [PATCH 1/3] Introduce mongo client provider --- .../When_custom_provider_registered.cs | 83 ++++++++++++ .../PersistenceTestsConfiguration.cs | 11 +- .../APIApprovals.Approve.approved.txt | 4 + .../Outbox/OutboxTestsConfiguration.cs | 2 +- .../SubscriptionStorageTests.cs | 6 +- .../DefaultMongoClientProvider.cs | 17 +++ .../Configuration/IMongoClientProvider.cs | 17 +++ .../MongoClientProvidedByConfiguration.cs | 8 ++ .../Configuration/MongoSettingsExtensions.cs | 2 +- .../Configuration/SettingsKeys.cs | 1 - .../MongoPersistence.cs | 9 +- .../Outbox/OutboxInstaller.cs | 6 +- .../Outbox/OutboxPersister.cs | 14 +- .../Outbox/OutboxStorage.cs | 9 +- .../Sagas/SagaInstaller.cs | 6 +- .../Subscriptions/SubscriptionInstaller.cs | 6 +- .../Subscriptions/SubscriptionPersister.cs | 11 +- .../Subscriptions/SubscriptionStorage.cs | 9 +- .../SynchronizedStorage.cs | 125 ++++++++++-------- 19 files changed, 235 insertions(+), 111 deletions(-) create mode 100644 src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs create mode 100644 src/NServiceBus.Storage.MongoDB/Configuration/DefaultMongoClientProvider.cs create mode 100644 src/NServiceBus.Storage.MongoDB/Configuration/IMongoClientProvider.cs create mode 100644 src/NServiceBus.Storage.MongoDB/Configuration/MongoClientProvidedByConfiguration.cs diff --git a/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs b/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs new file mode 100644 index 00000000..77877f4c --- /dev/null +++ b/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs @@ -0,0 +1,83 @@ +namespace NServiceBus.AcceptanceTests; + +using System; +using System.Threading.Tasks; +using AcceptanceTesting; +using EndpointTemplates; +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Driver; +using NUnit.Framework; +using Storage.MongoDB; + +public class When_custom_provider_registered : NServiceBusAcceptanceTest +{ + [Test] + public async Task Should_be_used() + { + Context context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.SendLocal(new StartSaga1 { DataId = Guid.NewGuid() }))) + .Done(c => c.SagaReceivedMessage) + .Run(); + + Assert.That(context.ProviderWasCalled, Is.True); + } + + public class Context : ScenarioContext + { + public bool SagaReceivedMessage { get; set; } + public bool ProviderWasCalled { get; set; } + } + + public class EndpointWithCustomProvider : EndpointConfigurationBuilder + { + public EndpointWithCustomProvider() => + EndpointSetup(config => + { + config.RegisterComponents(c => + c.AddSingleton(b => new CustomProvider(b.GetService()))); + }); + + public class JustASaga(Context testContext) : Saga, IAmStartedByMessages + { + public Task Handle(StartSaga1 message, IMessageHandlerContext context) + { + Data.DataId = message.DataId; + testContext.SagaReceivedMessage = true; + MarkAsComplete(); + return Task.CompletedTask; + } + + protected override void ConfigureHowToFindSaga(SagaPropertyMapper mapper) => mapper.ConfigureMapping(m => m.DataId).ToSaga(s => s.DataId); + } + + public class CustomProvider(Context testContext) : IMongoClientProvider + { + public IMongoClient Client + { + get + { + if (field is not null) + { + return field; + } + + var containerConnectionString = Environment.GetEnvironmentVariable("NServiceBusStorageMongoDB_ConnectionString"); + + field = string.IsNullOrWhiteSpace(containerConnectionString) ? new MongoClient() : new MongoClient(containerConnectionString); + testContext.ProviderWasCalled = true; + return field; + } + } + } + + public class JustASagaData : ContainSagaData + { + public virtual Guid DataId { get; set; } + } + } + + public class StartSaga1 : ICommand + { + public Guid DataId { get; set; } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB.PersistenceTests/PersistenceTestsConfiguration.cs b/src/NServiceBus.Storage.MongoDB.PersistenceTests/PersistenceTestsConfiguration.cs index 996205fd..76cc6e71 100644 --- a/src/NServiceBus.Storage.MongoDB.PersistenceTests/PersistenceTestsConfiguration.cs +++ b/src/NServiceBus.Storage.MongoDB.PersistenceTests/PersistenceTestsConfiguration.cs @@ -7,7 +7,6 @@ using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Bson.Serialization.Serializers; -using MongoDB.Driver; using NServiceBus.Outbox; using NServiceBus.Sagas; using Persistence; @@ -40,15 +39,9 @@ public async Task Configure(CancellationToken cancellationToken = default) BsonSerializer.TryRegisterSerializer(new GuidSerializer(GuidRepresentation.Standard)); var memberMapCache = new MemberMapCache(); - var databaseSettings = new MongoDatabaseSettings - { - ReadConcern = ReadConcern.Majority, - ReadPreference = ReadPreference.Primary, - WriteConcern = WriteConcern.WMajority - }; SagaStorageFeature.RegisterSagaEntityClassMappings(SagaMetadataCollection, memberMapCache); - await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, databaseSettings, memberMapCache, databaseName, + await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, MongoPersistence.DefaultDatabaseSettings, memberMapCache, databaseName, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, SagaMetadataCollection, cancellationToken); SagaStorage = new SagaPersister(SagaPersister.DefaultVersionElementName, memberMapCache); @@ -60,7 +53,7 @@ await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, OutboxStorageFeature.RegisterOutboxClassMappings(); await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, TimeSpan.FromDays(7), cancellationToken); - OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName, MongoPersistence.DefaultCollectionNamingConvention); + OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings); } public async Task Cleanup(CancellationToken cancellationToken = default) => diff --git a/src/NServiceBus.Storage.MongoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Storage.MongoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 3689cf7c..cdcc021b 100644 --- a/src/NServiceBus.Storage.MongoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Storage.MongoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -28,6 +28,10 @@ namespace NServiceBus } namespace NServiceBus.Storage.MongoDB { + public interface IMongoClientProvider + { + MongoDB.Driver.IMongoClient Client { get; } + } public interface IMongoSynchronizedStorageSession { MongoDB.Driver.IClientSessionHandle? MongoSession { get; } diff --git a/src/NServiceBus.Storage.MongoDB.Tests/Outbox/OutboxTestsConfiguration.cs b/src/NServiceBus.Storage.MongoDB.Tests/Outbox/OutboxTestsConfiguration.cs index 37d75e4e..a720def7 100644 --- a/src/NServiceBus.Storage.MongoDB.Tests/Outbox/OutboxTestsConfiguration.cs +++ b/src/NServiceBus.Storage.MongoDB.Tests/Outbox/OutboxTestsConfiguration.cs @@ -41,7 +41,7 @@ public async Task Configure() await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, TimeSpan.FromHours(1)); - OutboxStorage = new OutboxPersister(ClientProvider.Client, DatabaseName, CollectionNamingConvention); + OutboxStorage = new OutboxPersister(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention, MongoPersistence.DefaultCollectionSettings); } public async Task Cleanup() => await ClientProvider.Client.DropDatabaseAsync(DatabaseName); diff --git a/src/NServiceBus.Storage.MongoDB.Tests/SubscriptionStorageTests.cs b/src/NServiceBus.Storage.MongoDB.Tests/SubscriptionStorageTests.cs index 3864f7b0..a5be3455 100644 --- a/src/NServiceBus.Storage.MongoDB.Tests/SubscriptionStorageTests.cs +++ b/src/NServiceBus.Storage.MongoDB.Tests/SubscriptionStorageTests.cs @@ -18,12 +18,10 @@ public async Task OneTimeSetUp() { DatabaseName = "Test_" + DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture); - var subscriptionCollection = ClientProvider.Client.GetDatabase(DatabaseName, MongoPersistence.DefaultDatabaseSettings) - .GetCollection("eventsubscriptions", MongoPersistence.DefaultCollectionSettings); - await SubscriptionInstaller.CreateInfrastructureForSubscriptionTypes(ClientProvider.Client, MongoPersistence.DefaultDatabaseSettings, DatabaseName, MongoPersistence.DefaultCollectionSettings, _ => "eventsubscriptions"); - var subscriptionPersister = new SubscriptionPersister(subscriptionCollection); + var subscriptionPersister = new SubscriptionPersister(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings, + _ => "eventsubscriptions", MongoPersistence.DefaultCollectionSettings); storage = subscriptionPersister; } diff --git a/src/NServiceBus.Storage.MongoDB/Configuration/DefaultMongoClientProvider.cs b/src/NServiceBus.Storage.MongoDB/Configuration/DefaultMongoClientProvider.cs new file mode 100644 index 00000000..1c0e1392 --- /dev/null +++ b/src/NServiceBus.Storage.MongoDB/Configuration/DefaultMongoClientProvider.cs @@ -0,0 +1,17 @@ +namespace NServiceBus.Storage.MongoDB; + +using System.Diagnostics.CodeAnalysis; +using global::MongoDB.Driver; + +sealed class DefaultMongoClientProvider : IMongoClientProvider +{ + [field: AllowNull, MaybeNull] + public IMongoClient Client + { + get + { + field ??= new MongoClient(); + return field; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB/Configuration/IMongoClientProvider.cs b/src/NServiceBus.Storage.MongoDB/Configuration/IMongoClientProvider.cs new file mode 100644 index 00000000..f5ce8c10 --- /dev/null +++ b/src/NServiceBus.Storage.MongoDB/Configuration/IMongoClientProvider.cs @@ -0,0 +1,17 @@ +namespace NServiceBus.Storage.MongoDB; + +using global::MongoDB.Driver; + +/// +/// Provides a mongo client via dependency injection. A custom implementation can be registered on the container and will be picked up by the persistence. +/// +/// The client provided will not be disposed by the persistence. It is the responsibility of the provider to take care of proper resource disposal if necessary. +/// +/// +public interface IMongoClientProvider +{ + /// + /// The mongo client to use. + /// + IMongoClient Client { get; } +} \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB/Configuration/MongoClientProvidedByConfiguration.cs b/src/NServiceBus.Storage.MongoDB/Configuration/MongoClientProvidedByConfiguration.cs new file mode 100644 index 00000000..c3ab7dd2 --- /dev/null +++ b/src/NServiceBus.Storage.MongoDB/Configuration/MongoClientProvidedByConfiguration.cs @@ -0,0 +1,8 @@ +namespace NServiceBus.Storage.MongoDB; + +using global::MongoDB.Driver; + +class MongoClientProvidedByConfiguration(IMongoClient client) : IMongoClientProvider +{ + public IMongoClient Client { get; } = client; +} \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB/Configuration/MongoSettingsExtensions.cs b/src/NServiceBus.Storage.MongoDB/Configuration/MongoSettingsExtensions.cs index 5d360c47..d6e8302f 100644 --- a/src/NServiceBus.Storage.MongoDB/Configuration/MongoSettingsExtensions.cs +++ b/src/NServiceBus.Storage.MongoDB/Configuration/MongoSettingsExtensions.cs @@ -19,7 +19,7 @@ public static PersistenceExtensions MongoClient( ArgumentNullException.ThrowIfNull(persistenceExtensions); ArgumentNullException.ThrowIfNull(mongoClient); - persistenceExtensions.GetSettings().Set(SettingsKeys.MongoClient, () => mongoClient); + persistenceExtensions.GetSettings().Set(new MongoClientProvidedByConfiguration(mongoClient)); return persistenceExtensions; } diff --git a/src/NServiceBus.Storage.MongoDB/Configuration/SettingsKeys.cs b/src/NServiceBus.Storage.MongoDB/Configuration/SettingsKeys.cs index d1a8cac3..7435de24 100644 --- a/src/NServiceBus.Storage.MongoDB/Configuration/SettingsKeys.cs +++ b/src/NServiceBus.Storage.MongoDB/Configuration/SettingsKeys.cs @@ -6,7 +6,6 @@ static class SettingsKeys public const string VersionElementName = baseName + nameof(VersionElementName); public const string CollectionNamingConvention = baseName + nameof(CollectionNamingConvention); public const string DatabaseName = baseName + nameof(DatabaseName); - public const string MongoClient = baseName + nameof(MongoClient); public const string UseTransactions = baseName + nameof(UseTransactions); public const string TimeToKeepOutboxDeduplicationData = baseName + nameof(TimeToKeepOutboxDeduplicationData); } \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB/MongoPersistence.cs b/src/NServiceBus.Storage.MongoDB/MongoPersistence.cs index 567d84a4..6752418b 100644 --- a/src/NServiceBus.Storage.MongoDB/MongoPersistence.cs +++ b/src/NServiceBus.Storage.MongoDB/MongoPersistence.cs @@ -26,12 +26,7 @@ public MongoPersistence() // Can't do much earlier due to static nature of class mappings and serialization extensions. SafeRegisterDefaultGuidSerializer(); - s.SetDefault(SettingsKeys.MongoClient, static () => - { - defaultClient ??= new MongoClient(); - - return defaultClient; - }); + s.SetDefault(new DefaultMongoClientProvider()); s.SetDefault(SettingsKeys.DatabaseName, s.EndpointName()); @@ -97,6 +92,4 @@ internal static void SafeRegisterDefaultGuidSerializer() internal static readonly TimeSpan DefaultTransactionTimeout = TimeSpan.FromSeconds(60); internal static readonly Func DefaultCollectionNamingConvention = type => type.Name.ToLower(); - - static IMongoClient? defaultClient; } \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs index 3e0860e7..4e76ed5a 100644 --- a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs +++ b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs @@ -9,12 +9,12 @@ using Installation; using Settings; -sealed class OutboxInstaller(IReadOnlySettings settings) : INeedToInstallSomething +sealed class OutboxInstaller(IReadOnlySettings settings, IMongoClientProvider clientProvider) : INeedToInstallSomething { public async Task Install(string identity, CancellationToken cancellationToken = default) { var installerSettings = settings.Get(); - if (installerSettings.Disabled || !settings.TryGet>(SettingsKeys.MongoClient, out Func? client)) + if (installerSettings.Disabled) { return; } @@ -29,7 +29,7 @@ public async Task Install(string identity, CancellationToken cancellationToken = timeToKeepOutboxDeduplicationData = DefaultTimeToKeepOutboxDeduplicationData; } - await CreateInfrastructureForOutboxTypes(client(), databaseName, databaseSettings, collectionNamingConvention, collectionSettings, timeToKeepOutboxDeduplicationData, cancellationToken) + await CreateInfrastructureForOutboxTypes(clientProvider.Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings, timeToKeepOutboxDeduplicationData, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxPersister.cs b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxPersister.cs index a226a872..0c44792c 100644 --- a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxPersister.cs +++ b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxPersister.cs @@ -10,19 +10,11 @@ class OutboxPersister : IOutboxStorage { - public OutboxPersister(IMongoClient client, string databaseName, Func collectionNamingConvention) + public OutboxPersister(IMongoClient client, string databaseName, MongoDatabaseSettings databaseSettings, Func collectionNamingConvention, MongoCollectionSettings collectionSettings) { - outboxTransactionFactory = new MongoOutboxTransactionFactory(client, databaseName, collectionNamingConvention, - MongoPersistence.DefaultTransactionTimeout); + outboxTransactionFactory = new MongoOutboxTransactionFactory(client, databaseName, collectionNamingConvention, MongoPersistence.DefaultTransactionTimeout); - var collectionSettings = new MongoCollectionSettings - { - ReadConcern = ReadConcern.Majority, - ReadPreference = ReadPreference.Primary, - WriteConcern = WriteConcern.WMajority - }; - - outboxRecordCollection = client.GetDatabase(databaseName) + outboxRecordCollection = client.GetDatabase(databaseName, databaseSettings) .GetCollection(collectionNamingConvention(typeof(OutboxRecord)), collectionSettings); } diff --git a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxStorage.cs b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxStorage.cs index 8c24a981..631dd4b5 100644 --- a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxStorage.cs +++ b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxStorage.cs @@ -28,15 +28,12 @@ protected override void Setup(FeatureConfigurationContext context) $"Transactions are required when the Outbox is enabled, but they have been disabled by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().UseTransactions(false)'."); } - if (!context.Settings.TryGet>(SettingsKeys.MongoClient, out Func? client)) - { - return; - } - var databaseName = context.Settings.Get(SettingsKeys.DatabaseName); var collectionNamingConvention = context.Settings.Get>(SettingsKeys.CollectionNamingConvention); + var databaseSettings = context.Settings.Get(); + var collectionSettings = context.Settings.Get(); - context.Services.AddSingleton(new OutboxPersister(client(), databaseName, collectionNamingConvention)); + context.Services.AddSingleton(sp => new OutboxPersister(sp.GetRequiredService().Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings)); RegisterOutboxClassMappings(); } diff --git a/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs b/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs index 817fce24..33aa598c 100644 --- a/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs +++ b/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs @@ -9,12 +9,12 @@ using Sagas; using Settings; -sealed class SagaInstaller(IReadOnlySettings settings) : INeedToInstallSomething +sealed class SagaInstaller(IReadOnlySettings settings, IMongoClientProvider clientProvider) : INeedToInstallSomething { public async Task Install(string identity, CancellationToken cancellationToken = default) { var installerSettings = settings.Get(); - if (installerSettings.Disabled || !settings.TryGet>(SettingsKeys.MongoClient, out Func? client)) + if (installerSettings.Disabled) { return; } @@ -26,7 +26,7 @@ public async Task Install(string identity, CancellationToken cancellationToken = var collectionSettings = settings.Get(); var memberMapCache = settings.Get(); - await CreateInfrastructureForSagaDataTypes(client(), databaseSettings, memberMapCache, databaseName, collectionNamingConvention, collectionSettings, sagaMetadataCollection, cancellationToken) + await CreateInfrastructureForSagaDataTypes(clientProvider.Client, databaseSettings, memberMapCache, databaseName, collectionNamingConvention, collectionSettings, sagaMetadataCollection, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs index 7d7e3396..bb19579f 100644 --- a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs +++ b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs @@ -7,12 +7,12 @@ namespace NServiceBus.Storage.MongoDB; using Installation; using Settings; -sealed class SubscriptionInstaller(IReadOnlySettings settings) : INeedToInstallSomething +sealed class SubscriptionInstaller(IReadOnlySettings settings, IMongoClientProvider clientProvider) : INeedToInstallSomething { public async Task Install(string identity, CancellationToken cancellationToken = default) { var installerSettings = settings.Get(); - if (installerSettings.Disabled || !settings.TryGet>(SettingsKeys.MongoClient, out Func? client)) + if (installerSettings.Disabled) { return; } @@ -22,7 +22,7 @@ public async Task Install(string identity, CancellationToken cancellationToken = var collectionSettings = settings.Get(); var collectionNamingConvention = settings.Get>(SettingsKeys.CollectionNamingConvention); - await CreateInfrastructureForSubscriptionTypes(client(), databaseSettings, databaseName, collectionSettings, collectionNamingConvention, cancellationToken) + await CreateInfrastructureForSubscriptionTypes(clientProvider.Client, databaseSettings, databaseName, collectionSettings, collectionNamingConvention, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionPersister.cs b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionPersister.cs index d7f3a24e..2a94fca3 100644 --- a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionPersister.cs +++ b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionPersister.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Storage.MongoDB; +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -9,8 +10,14 @@ using Unicast.Subscriptions; using Unicast.Subscriptions.MessageDrivenSubscriptions; -class SubscriptionPersister(IMongoCollection subscriptionsCollection) : ISubscriptionStorage +class SubscriptionPersister : ISubscriptionStorage { + public SubscriptionPersister(IMongoClient client, string databaseName, MongoDatabaseSettings databaseSettings, Func collectionNamingConvention, MongoCollectionSettings collectionSettings) + { + var subscriptionCollectionName = collectionNamingConvention(typeof(EventSubscription)); + subscriptionsCollection = client.GetDatabase(databaseName, databaseSettings).GetCollection(subscriptionCollectionName, collectionSettings); + } + public Task Subscribe(Subscriber subscriber, MessageType messageType, ContextBag context, CancellationToken cancellationToken = default) { @@ -140,6 +147,8 @@ async Task AddOrUpdateSubscription(EventSubscription subscription, CancellationT } } + readonly IMongoCollection subscriptionsCollection; + const int DuplicateKeyErrorCode = 11000; static readonly ILog Log = LogManager.GetLogger(); static readonly FilterDefinitionBuilder filterBuilder = Builders.Filter; diff --git a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs index 66ec8604..f0a31385 100644 --- a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs +++ b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs @@ -12,14 +12,11 @@ class SubscriptionStorage : Feature protected override void Setup(FeatureConfigurationContext context) { - var client = context.Settings.Get>(SettingsKeys.MongoClient)(); var databaseName = context.Settings.Get(SettingsKeys.DatabaseName); + var collectionNamingConvention = context.Settings.Get>(SettingsKeys.CollectionNamingConvention); var databaseSettings = context.Settings.Get(); - var collectionNamingConvention = - context.Settings.Get>(SettingsKeys.CollectionNamingConvention); - var subscriptionCollectionName = collectionNamingConvention(typeof(EventSubscription)); - var collection = client.GetDatabase(databaseName, databaseSettings).GetCollection(subscriptionCollectionName); + var collectionSettings = context.Settings.Get(); - context.Services.AddSingleton(new SubscriptionPersister(collection)); + context.Services.AddSingleton(sp => new SubscriptionPersister(sp.GetRequiredService().Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings)); } } \ No newline at end of file diff --git a/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs b/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs index 76f42a0a..b1897ed5 100644 --- a/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs +++ b/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs @@ -1,10 +1,12 @@ namespace NServiceBus.Storage.MongoDB; using System; +using System.Threading; +using System.Threading.Tasks; using Features; -using global::MongoDB.Driver; using global::MongoDB.Driver.Core.Clusters; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Persistence; class SynchronizedStorage : Feature @@ -15,76 +17,91 @@ public SynchronizedStorage() => protected override void Setup(FeatureConfigurationContext context) { - var client = context.Settings.Get>(SettingsKeys.MongoClient)(); + var mongoClientProvider = context.Settings.Get(); + context.Services.TryAddSingleton(mongoClientProvider); + var databaseName = context.Settings.Get(SettingsKeys.DatabaseName); - var collectionNamingConvention = - context.Settings.Get>(SettingsKeys.CollectionNamingConvention); + var collectionNamingConvention = context.Settings.Get>(SettingsKeys.CollectionNamingConvention); if (!context.Settings.TryGet(SettingsKeys.UseTransactions, out bool useTransactions)) { useTransactions = true; } - try - { - var database = client.GetDatabase(databaseName); - - // perform a query to the server to make sure cluster details are loaded - database.ListCollectionNames(); + context.RegisterStartupTask(sp => new VerifyClusterDetails(sp.GetRequiredService(), databaseName, useTransactions)); - using var session = client.StartSession(); + context.Services.AddScoped(); + context.Services.AddScoped(sp => (sp.GetService() as IMongoSynchronizedStorageSession)!); + context.Services.AddSingleton(sp => new StorageSessionFactory(sp.GetRequiredService().Client, useTransactions, databaseName, collectionNamingConvention, MongoPersistence.DefaultTransactionTimeout)); + } - if (useTransactions) + class VerifyClusterDetails(IMongoClientProvider clientProvider, string databaseName, bool useTransactions) : FeatureStartupTask + { + protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default) + { + try { - var clusterType = client.Cluster.Description.Type; + var client = clientProvider.Client; + var database = client.GetDatabase(databaseName); - //HINT: cluster configuration check is needed as the built-in checks, executed during "StartTransaction() call, - // do not detect if the cluster configuration is a supported one. Only the version ranges are validated. - // Without this check, exceptions will be thrown during message processing. - if (clusterType is not ClusterType.ReplicaSet and not ClusterType.Sharded) - { - throw new Exception( - $"The cluster type in use is {clusterType}, but transactions are only supported on replica sets or sharded clusters. Disable support for transactions by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().UseTransactions(false)'."); - } + // perform a query to the server to make sure cluster details are loaded + await database.ListCollectionNamesAsync(cancellationToken: cancellationToken).ConfigureAwait(false); - try - { - session.StartTransaction(); - session.AbortTransaction(); - } - catch (NotSupportedException ex) + using var mongoSession = await client.StartSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + + if (useTransactions) { - throw new Exception( - $"Transactions are not supported by the MongoDB server. Disable support for transactions by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().UseTransactions(false)'.", - ex); + var clusterType = client.Cluster.Description.Type; + + //HINT: cluster configuration check is needed as the built-in checks, executed during "StartTransaction() call, + // do not detect if the cluster configuration is a supported one. Only the version ranges are validated. + // Without this check, exceptions will be thrown during message processing. + if (clusterType is not ClusterType.ReplicaSet and not ClusterType.Sharded) + { + throw new Exception( + $"The cluster type in use is {clusterType}, but transactions are only supported on replica sets or sharded clusters. Disable support for transactions by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().UseTransactions(false)'."); + } + + try + { + mongoSession.StartTransaction(); + await mongoSession.AbortTransactionAsync(cancellationToken).ConfigureAwait(false); + } + catch (NotSupportedException ex) + { + throw new Exception( + $"Transactions are not supported by the MongoDB server. Disable support for transactions by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().UseTransactions(false)'.", + ex); + } } } + catch (ArgumentException ex) + { + throw new Exception( + $"The persistence database name '{databaseName}' is invalid. Configure a valid database name by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().DatabaseName(databaseName)'.", + ex); + } + catch (NotSupportedException ex) + { + throw new Exception( + "Sessions are not supported by the MongoDB server. The NServiceBus.Storage.MongoDB persistence requires MongoDB server version 3.6 or greater.", + ex); + } + catch (TimeoutException ex) + { + throw new Exception( + "Unable to connect to the MongoDB server. Check the connection settings, and verify the server is running and accessible.", + ex); + } } - catch (ArgumentException ex) - { - throw new Exception( - $"The persistence database name '{databaseName}' is invalid. Configure a valid database name by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().DatabaseName(databaseName)'.", - ex); - } - catch (NotSupportedException ex) - { - throw new Exception( - "Sessions are not supported by the MongoDB server. The NServiceBus.Storage.MongoDB persistence requires MongoDB server version 3.6 or greater.", - ex); - } - catch (TimeoutException ex) + + protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) { - throw new Exception( - "Unable to connect to the MongoDB server. Check the connection settings, and verify the server is running and accessible.", - ex); + if (clientProvider is DefaultMongoClientProvider { Client: { } client }) + { + client.Dispose(); + } + return Task.CompletedTask; } - - context.Services.AddScoped(); - context.Services.AddScoped(sp => - (sp.GetService() as IMongoSynchronizedStorageSession)!); - - context.Services.AddSingleton( - new StorageSessionFactory(client, useTransactions, databaseName, collectionNamingConvention, - MongoPersistence.DefaultTransactionTimeout)); } } \ No newline at end of file From 8481aba5b1af856f2055ccc38a1a1e7889d4c79f Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 19 Aug 2025 12:07:54 +0200 Subject: [PATCH 2/3] Properly conditionally activate the installers --- .../Outbox/OutboxInstaller.cs | 11 ++++++++--- .../Sagas/SagaInstaller.cs | 9 +++++++-- .../Subscriptions/SubscriptionInstaller.cs | 9 +++++++-- .../Subscriptions/SubscriptionStorage.cs | 3 +++ .../SynchronizedStorage/SynchronizedStorage.cs | 4 ++-- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs index 4e76ed5a..c0bf693a 100644 --- a/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs +++ b/src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs @@ -4,17 +4,19 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Features; using global::MongoDB.Bson; using global::MongoDB.Driver; using Installation; +using Microsoft.Extensions.DependencyInjection; using Settings; -sealed class OutboxInstaller(IReadOnlySettings settings, IMongoClientProvider clientProvider) : INeedToInstallSomething +sealed class OutboxInstaller(IReadOnlySettings settings, IServiceProvider serviceProvider) : INeedToInstallSomething { public async Task Install(string identity, CancellationToken cancellationToken = default) { var installerSettings = settings.Get(); - if (installerSettings.Disabled) + if (installerSettings.Disabled || !settings.IsFeatureActive(typeof(OutboxStorage))) { return; } @@ -29,7 +31,10 @@ public async Task Install(string identity, CancellationToken cancellationToken = timeToKeepOutboxDeduplicationData = DefaultTimeToKeepOutboxDeduplicationData; } - await CreateInfrastructureForOutboxTypes(clientProvider.Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings, timeToKeepOutboxDeduplicationData, cancellationToken) + // We have to resolve the client provider here because at the time of the creation of the installer the provider might not be registered yet. + var clientProvider = serviceProvider.GetRequiredService(); + + await CreateInfrastructureForOutboxTypes(clientProvider!.Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings, timeToKeepOutboxDeduplicationData, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs b/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs index 33aa598c..0e0cda43 100644 --- a/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs +++ b/src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs @@ -3,18 +3,20 @@ using System; using System.Threading; using System.Threading.Tasks; +using Features; using global::MongoDB.Bson; using global::MongoDB.Driver; using Installation; +using Microsoft.Extensions.DependencyInjection; using Sagas; using Settings; -sealed class SagaInstaller(IReadOnlySettings settings, IMongoClientProvider clientProvider) : INeedToInstallSomething +sealed class SagaInstaller(IReadOnlySettings settings, IServiceProvider serviceProvider) : INeedToInstallSomething { public async Task Install(string identity, CancellationToken cancellationToken = default) { var installerSettings = settings.Get(); - if (installerSettings.Disabled) + if (installerSettings.Disabled || !settings.IsFeatureActive(typeof(SagaStorage))) { return; } @@ -26,6 +28,9 @@ public async Task Install(string identity, CancellationToken cancellationToken = var collectionSettings = settings.Get(); var memberMapCache = settings.Get(); + // We have to resolve the client provider here because at the time of the creation of the installer the provider might not be registered yet. + var clientProvider = serviceProvider.GetRequiredService(); + await CreateInfrastructureForSagaDataTypes(clientProvider.Client, databaseSettings, memberMapCache, databaseName, collectionNamingConvention, collectionSettings, sagaMetadataCollection, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs index bb19579f..879be6e0 100644 --- a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs +++ b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionInstaller.cs @@ -3,16 +3,18 @@ namespace NServiceBus.Storage.MongoDB; using System; using System.Threading; using System.Threading.Tasks; +using Features; using global::MongoDB.Driver; using Installation; +using Microsoft.Extensions.DependencyInjection; using Settings; -sealed class SubscriptionInstaller(IReadOnlySettings settings, IMongoClientProvider clientProvider) : INeedToInstallSomething +sealed class SubscriptionInstaller(IReadOnlySettings settings, IServiceProvider serviceProvider) : INeedToInstallSomething { public async Task Install(string identity, CancellationToken cancellationToken = default) { var installerSettings = settings.Get(); - if (installerSettings.Disabled) + if (installerSettings.Disabled || !settings.IsFeatureActive(typeof(SubscriptionStorage))) { return; } @@ -22,6 +24,9 @@ public async Task Install(string identity, CancellationToken cancellationToken = var collectionSettings = settings.Get(); var collectionNamingConvention = settings.Get>(SettingsKeys.CollectionNamingConvention); + // We have to resolve the client provider here because at the time of the creation of the installer the provider might not be registered yet. + var clientProvider = serviceProvider.GetRequiredService(); + await CreateInfrastructureForSubscriptionTypes(clientProvider.Client, databaseSettings, databaseName, collectionSettings, collectionNamingConvention, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs index f0a31385..e5ceb9ed 100644 --- a/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs +++ b/src/NServiceBus.Storage.MongoDB/Subscriptions/SubscriptionStorage.cs @@ -4,6 +4,7 @@ using Features; using global::MongoDB.Driver; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Unicast.Subscriptions.MessageDrivenSubscriptions; class SubscriptionStorage : Feature @@ -12,6 +13,8 @@ class SubscriptionStorage : Feature protected override void Setup(FeatureConfigurationContext context) { + context.Services.TryAddSingleton(context.Settings.Get()); + var databaseName = context.Settings.Get(SettingsKeys.DatabaseName); var collectionNamingConvention = context.Settings.Get>(SettingsKeys.CollectionNamingConvention); var databaseSettings = context.Settings.Get(); diff --git a/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs b/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs index b1897ed5..a6ac1e1d 100644 --- a/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs +++ b/src/NServiceBus.Storage.MongoDB/SynchronizedStorage/SynchronizedStorage.cs @@ -17,8 +17,8 @@ public SynchronizedStorage() => protected override void Setup(FeatureConfigurationContext context) { - var mongoClientProvider = context.Settings.Get(); - context.Services.TryAddSingleton(mongoClientProvider); + // In case the persistence is used without the SynchronizedStorage feature, we still need to try to register the IMongoClientProvider + context.Services.TryAddSingleton(context.Settings.Get()); var databaseName = context.Settings.Get(SettingsKeys.DatabaseName); var collectionNamingConvention = context.Settings.Get>(SettingsKeys.CollectionNamingConvention); From 0126fa9cd03d9460ed17633a68815c8394cbc7ab Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 19 Aug 2025 12:18:33 +0200 Subject: [PATCH 3/3] Field annotation --- .../When_custom_provider_registered.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs b/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs index 77877f4c..283a90c8 100644 --- a/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs +++ b/src/NServiceBus.Storage.MongoDB.AcceptanceTests/When_custom_provider_registered.cs @@ -1,6 +1,9 @@ -namespace NServiceBus.AcceptanceTests; +#nullable enable + +namespace NServiceBus.AcceptanceTests; using System; +using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using AcceptanceTesting; using EndpointTemplates; @@ -34,7 +37,7 @@ public EndpointWithCustomProvider() => EndpointSetup(config => { config.RegisterComponents(c => - c.AddSingleton(b => new CustomProvider(b.GetService()))); + c.AddSingleton(b => new CustomProvider(b.GetRequiredService()))); }); public class JustASaga(Context testContext) : Saga, IAmStartedByMessages @@ -52,6 +55,7 @@ public Task Handle(StartSaga1 message, IMessageHandlerContext context) public class CustomProvider(Context testContext) : IMongoClientProvider { + [field: AllowNull, MaybeNull] public IMongoClient Client { get