Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#nullable enable

namespace NServiceBus.AcceptanceTests;

using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;
using NUnit.Framework;
using Storage.MongoDB;

public class When_custom_provider_registered : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_be_used()
{
Context context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithCustomProvider>(b => b.When(session => session.SendLocal(new StartSaga1 { DataId = Guid.NewGuid() })))
.Done(c => c.SagaReceivedMessage)
.Run();

Assert.That(context.ProviderWasCalled, Is.True);
}

public class Context : ScenarioContext
{
public bool SagaReceivedMessage { get; set; }
public bool ProviderWasCalled { get; set; }
}

public class EndpointWithCustomProvider : EndpointConfigurationBuilder
{
public EndpointWithCustomProvider() =>
EndpointSetup<DefaultServer>(config =>
{
config.RegisterComponents(c =>
c.AddSingleton<IMongoClientProvider>(b => new CustomProvider(b.GetRequiredService<Context>())));
});

public class JustASaga(Context testContext) : Saga<JustASagaData>, IAmStartedByMessages<StartSaga1>
{
public Task Handle(StartSaga1 message, IMessageHandlerContext context)
{
Data.DataId = message.DataId;
testContext.SagaReceivedMessage = true;
MarkAsComplete();
return Task.CompletedTask;
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<JustASagaData> mapper) => mapper.ConfigureMapping<StartSaga1>(m => m.DataId).ToSaga(s => s.DataId);
}

public class CustomProvider(Context testContext) : IMongoClientProvider
{
[field: AllowNull, MaybeNull]
public IMongoClient Client
{
get
{
if (field is not null)
{
return field;
}

var containerConnectionString = Environment.GetEnvironmentVariable("NServiceBusStorageMongoDB_ConnectionString");

field = string.IsNullOrWhiteSpace(containerConnectionString) ? new MongoClient() : new MongoClient(containerConnectionString);
testContext.ProviderWasCalled = true;
return field;
}
}
}

public class JustASagaData : ContainSagaData
{
public virtual Guid DataId { get; set; }
}
}

public class StartSaga1 : ICommand
{
public Guid DataId { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Serializers;
using MongoDB.Driver;
using NServiceBus.Outbox;
using NServiceBus.Sagas;
using Persistence;
Expand Down Expand Up @@ -40,15 +39,9 @@ public async Task Configure(CancellationToken cancellationToken = default)
BsonSerializer.TryRegisterSerializer(new GuidSerializer(GuidRepresentation.Standard));

var memberMapCache = new MemberMapCache();
var databaseSettings = new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};

SagaStorageFeature.RegisterSagaEntityClassMappings(SagaMetadataCollection, memberMapCache);
await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, databaseSettings, memberMapCache, databaseName,
await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client, MongoPersistence.DefaultDatabaseSettings, memberMapCache, databaseName,
MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, SagaMetadataCollection, cancellationToken);

SagaStorage = new SagaPersister(SagaPersister.DefaultVersionElementName, memberMapCache);
Expand All @@ -60,7 +53,7 @@ await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client,
OutboxStorageFeature.RegisterOutboxClassMappings();
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, TimeSpan.FromDays(7), cancellationToken);

OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName, MongoPersistence.DefaultCollectionNamingConvention);
OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings);
}

public async Task Cleanup(CancellationToken cancellationToken = default) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ namespace NServiceBus
}
namespace NServiceBus.Storage.MongoDB
{
public interface IMongoClientProvider
{
MongoDB.Driver.IMongoClient Client { get; }
}
public interface IMongoSynchronizedStorageSession
{
MongoDB.Driver.IClientSessionHandle? MongoSession { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task Configure()
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, TimeSpan.FromHours(1));

OutboxStorage = new OutboxPersister(ClientProvider.Client, DatabaseName, CollectionNamingConvention);
OutboxStorage = new OutboxPersister(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings, CollectionNamingConvention, MongoPersistence.DefaultCollectionSettings);
}

public async Task Cleanup() => await ClientProvider.Client.DropDatabaseAsync(DatabaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ public async Task OneTimeSetUp()
{
DatabaseName = "Test_" + DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture);

var subscriptionCollection = ClientProvider.Client.GetDatabase(DatabaseName, MongoPersistence.DefaultDatabaseSettings)
.GetCollection<EventSubscription>("eventsubscriptions", MongoPersistence.DefaultCollectionSettings);

await SubscriptionInstaller.CreateInfrastructureForSubscriptionTypes(ClientProvider.Client, MongoPersistence.DefaultDatabaseSettings, DatabaseName, MongoPersistence.DefaultCollectionSettings, _ => "eventsubscriptions");

var subscriptionPersister = new SubscriptionPersister(subscriptionCollection);
var subscriptionPersister = new SubscriptionPersister(ClientProvider.Client, DatabaseName, MongoPersistence.DefaultDatabaseSettings,
_ => "eventsubscriptions", MongoPersistence.DefaultCollectionSettings);
storage = subscriptionPersister;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NServiceBus.Storage.MongoDB;

using System.Diagnostics.CodeAnalysis;
using global::MongoDB.Driver;

sealed class DefaultMongoClientProvider : IMongoClientProvider
{
[field: AllowNull, MaybeNull]
public IMongoClient Client
{
get
{
field ??= new MongoClient();
return field;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NServiceBus.Storage.MongoDB;

using global::MongoDB.Driver;

/// <summary>
/// Provides a mongo client via dependency injection. A custom implementation can be registered on the container and will be picked up by the persistence.
/// <remarks>
/// The client provided will not be disposed by the persistence. It is the responsibility of the provider to take care of proper resource disposal if necessary.
/// </remarks>
/// </summary>
public interface IMongoClientProvider
{
/// <summary>
/// The mongo client to use.
/// </summary>
IMongoClient Client { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace NServiceBus.Storage.MongoDB;

using global::MongoDB.Driver;

class MongoClientProvidedByConfiguration(IMongoClient client) : IMongoClientProvider
{
public IMongoClient Client { get; } = client;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static PersistenceExtensions<MongoPersistence> MongoClient(
ArgumentNullException.ThrowIfNull(persistenceExtensions);
ArgumentNullException.ThrowIfNull(mongoClient);

persistenceExtensions.GetSettings().Set(SettingsKeys.MongoClient, () => mongoClient);
persistenceExtensions.GetSettings().Set<IMongoClientProvider>(new MongoClientProvidedByConfiguration(mongoClient));
return persistenceExtensions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ static class SettingsKeys
public const string VersionElementName = baseName + nameof(VersionElementName);
public const string CollectionNamingConvention = baseName + nameof(CollectionNamingConvention);
public const string DatabaseName = baseName + nameof(DatabaseName);
public const string MongoClient = baseName + nameof(MongoClient);
public const string UseTransactions = baseName + nameof(UseTransactions);
public const string TimeToKeepOutboxDeduplicationData = baseName + nameof(TimeToKeepOutboxDeduplicationData);
}
9 changes: 1 addition & 8 deletions src/NServiceBus.Storage.MongoDB/MongoPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ public MongoPersistence()
// Can't do much earlier due to static nature of class mappings and serialization extensions.
SafeRegisterDefaultGuidSerializer();

s.SetDefault(SettingsKeys.MongoClient, static () =>
{
defaultClient ??= new MongoClient();

return defaultClient;
});
s.SetDefault<IMongoClientProvider>(new DefaultMongoClientProvider());

s.SetDefault(SettingsKeys.DatabaseName, s.EndpointName());

Expand Down Expand Up @@ -97,6 +92,4 @@ internal static void SafeRegisterDefaultGuidSerializer()

internal static readonly TimeSpan DefaultTransactionTimeout = TimeSpan.FromSeconds(60);
internal static readonly Func<Type, string> DefaultCollectionNamingConvention = type => type.Name.ToLower();

static IMongoClient? defaultClient;
}
11 changes: 8 additions & 3 deletions src/NServiceBus.Storage.MongoDB/Outbox/OutboxInstaller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Features;
using global::MongoDB.Bson;
using global::MongoDB.Driver;
using Installation;
using Microsoft.Extensions.DependencyInjection;
using Settings;

sealed class OutboxInstaller(IReadOnlySettings settings) : INeedToInstallSomething
sealed class OutboxInstaller(IReadOnlySettings settings, IServiceProvider serviceProvider) : INeedToInstallSomething
{
public async Task Install(string identity, CancellationToken cancellationToken = default)
{
var installerSettings = settings.Get<InstallerSettings>();
if (installerSettings.Disabled || !settings.TryGet<Func<IMongoClient>>(SettingsKeys.MongoClient, out Func<IMongoClient>? client))
if (installerSettings.Disabled || !settings.IsFeatureActive(typeof(OutboxStorage)))
{
return;
}
Expand All @@ -29,7 +31,10 @@ public async Task Install(string identity, CancellationToken cancellationToken =
timeToKeepOutboxDeduplicationData = DefaultTimeToKeepOutboxDeduplicationData;
}

await CreateInfrastructureForOutboxTypes(client(), databaseName, databaseSettings, collectionNamingConvention, collectionSettings, timeToKeepOutboxDeduplicationData, cancellationToken)
// We have to resolve the client provider here because at the time of the creation of the installer the provider might not be registered yet.
var clientProvider = serviceProvider.GetRequiredService<IMongoClientProvider>();

await CreateInfrastructureForOutboxTypes(clientProvider!.Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings, timeToKeepOutboxDeduplicationData, cancellationToken)
.ConfigureAwait(false);
}

Expand Down
14 changes: 3 additions & 11 deletions src/NServiceBus.Storage.MongoDB/Outbox/OutboxPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,11 @@

class OutboxPersister : IOutboxStorage
{
public OutboxPersister(IMongoClient client, string databaseName, Func<Type, string> collectionNamingConvention)
public OutboxPersister(IMongoClient client, string databaseName, MongoDatabaseSettings databaseSettings, Func<Type, string> collectionNamingConvention, MongoCollectionSettings collectionSettings)
{
outboxTransactionFactory = new MongoOutboxTransactionFactory(client, databaseName, collectionNamingConvention,
MongoPersistence.DefaultTransactionTimeout);
outboxTransactionFactory = new MongoOutboxTransactionFactory(client, databaseName, collectionNamingConvention, MongoPersistence.DefaultTransactionTimeout);

var collectionSettings = new MongoCollectionSettings
{
ReadConcern = ReadConcern.Majority,
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};

outboxRecordCollection = client.GetDatabase(databaseName)
outboxRecordCollection = client.GetDatabase(databaseName, databaseSettings)
.GetCollection<OutboxRecord>(collectionNamingConvention(typeof(OutboxRecord)), collectionSettings);
}

Expand Down
9 changes: 3 additions & 6 deletions src/NServiceBus.Storage.MongoDB/Outbox/OutboxStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ protected override void Setup(FeatureConfigurationContext context)
$"Transactions are required when the Outbox is enabled, but they have been disabled by calling 'EndpointConfiguration.UsePersistence<{nameof(MongoPersistence)}>().UseTransactions(false)'.");
}

if (!context.Settings.TryGet<Func<IMongoClient>>(SettingsKeys.MongoClient, out Func<IMongoClient>? client))
{
return;
}

var databaseName = context.Settings.Get<string>(SettingsKeys.DatabaseName);
var collectionNamingConvention = context.Settings.Get<Func<Type, string>>(SettingsKeys.CollectionNamingConvention);
var databaseSettings = context.Settings.Get<MongoDatabaseSettings>();
var collectionSettings = context.Settings.Get<MongoCollectionSettings>();

context.Services.AddSingleton<IOutboxStorage>(new OutboxPersister(client(), databaseName, collectionNamingConvention));
context.Services.AddSingleton<IOutboxStorage>(sp => new OutboxPersister(sp.GetRequiredService<IMongoClientProvider>().Client, databaseName, databaseSettings, collectionNamingConvention, collectionSettings));

RegisterOutboxClassMappings();
}
Expand Down
11 changes: 8 additions & 3 deletions src/NServiceBus.Storage.MongoDB/Sagas/SagaInstaller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Features;
using global::MongoDB.Bson;
using global::MongoDB.Driver;
using Installation;
using Microsoft.Extensions.DependencyInjection;
using Sagas;
using Settings;

sealed class SagaInstaller(IReadOnlySettings settings) : INeedToInstallSomething
sealed class SagaInstaller(IReadOnlySettings settings, IServiceProvider serviceProvider) : INeedToInstallSomething
{
public async Task Install(string identity, CancellationToken cancellationToken = default)
{
var installerSettings = settings.Get<InstallerSettings>();
if (installerSettings.Disabled || !settings.TryGet<Func<IMongoClient>>(SettingsKeys.MongoClient, out Func<IMongoClient>? client))
if (installerSettings.Disabled || !settings.IsFeatureActive(typeof(SagaStorage)))
{
return;
}
Expand All @@ -26,7 +28,10 @@ public async Task Install(string identity, CancellationToken cancellationToken =
var collectionSettings = settings.Get<MongoCollectionSettings>();
var memberMapCache = settings.Get<MemberMapCache>();

await CreateInfrastructureForSagaDataTypes(client(), databaseSettings, memberMapCache, databaseName, collectionNamingConvention, collectionSettings, sagaMetadataCollection, cancellationToken)
// We have to resolve the client provider here because at the time of the creation of the installer the provider might not be registered yet.
var clientProvider = serviceProvider.GetRequiredService<IMongoClientProvider>();

await CreateInfrastructureForSagaDataTypes(clientProvider.Client, databaseSettings, memberMapCache, databaseName, collectionNamingConvention, collectionSettings, sagaMetadataCollection, cancellationToken)
.ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ namespace NServiceBus.Storage.MongoDB;
using System;
using System.Threading;
using System.Threading.Tasks;
using Features;
using global::MongoDB.Driver;
using Installation;
using Microsoft.Extensions.DependencyInjection;
using Settings;

sealed class SubscriptionInstaller(IReadOnlySettings settings) : INeedToInstallSomething
sealed class SubscriptionInstaller(IReadOnlySettings settings, IServiceProvider serviceProvider) : INeedToInstallSomething
{
public async Task Install(string identity, CancellationToken cancellationToken = default)
{
var installerSettings = settings.Get<InstallerSettings>();
if (installerSettings.Disabled || !settings.TryGet<Func<IMongoClient>>(SettingsKeys.MongoClient, out Func<IMongoClient>? client))
if (installerSettings.Disabled || !settings.IsFeatureActive(typeof(SubscriptionStorage)))
{
return;
}
Expand All @@ -22,7 +24,10 @@ public async Task Install(string identity, CancellationToken cancellationToken =
var collectionSettings = settings.Get<MongoCollectionSettings>();
var collectionNamingConvention = settings.Get<Func<Type, string>>(SettingsKeys.CollectionNamingConvention);

await CreateInfrastructureForSubscriptionTypes(client(), databaseSettings, databaseName, collectionSettings, collectionNamingConvention, cancellationToken)
// We have to resolve the client provider here because at the time of the creation of the installer the provider might not be registered yet.
var clientProvider = serviceProvider.GetRequiredService<IMongoClientProvider>();

await CreateInfrastructureForSubscriptionTypes(clientProvider.Client, databaseSettings, databaseName, collectionSettings, collectionNamingConvention, cancellationToken)
.ConfigureAwait(false);
}

Expand Down
Loading
Loading