Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8348554
Add an acceptance test
danielmarbach Aug 26, 2025
9603a74
Simple straight forward implementation to make the test pass
danielmarbach Aug 26, 2025
3137ab7
Use an equality predicate with fallback
danielmarbach Aug 26, 2025
1f447cb
Upconvert old legacy entries
danielmarbach Aug 26, 2025
eb9e8c0
Remove todo for now
danielmarbach Aug 26, 2025
6445a6f
Move time to keep duplication data to outbox settings
danielmarbach Aug 26, 2025
e7d5675
Making sure the outbox class mappings are always present before the c…
danielmarbach Aug 26, 2025
52a4036
Proper namespaces and wire up new configuration
danielmarbach Aug 26, 2025
1aa7a3a
Remove unneeded Mongo prefix
danielmarbach Aug 26, 2025
d93a971
Remove unneeded check
danielmarbach Aug 26, 2025
f917653
Introduce read fallback and run persistence tests without
danielmarbach Aug 26, 2025
3e1e6b6
Fix default class map indicator
danielmarbach Aug 26, 2025
1cfe791
Forgot to approve the rename
danielmarbach Aug 26, 2025
6aee372
Bring back the default of seven days for TTL
danielmarbach Aug 26, 2025
78ccdac
Make sure the processor endpoint is taken into account
danielmarbach Aug 26, 2025
5baeae5
Disable outbox installation when processor endpoint is set
danielmarbach Aug 26, 2025
9afcacb
Suppress null warning
danielmarbach Aug 26, 2025
00d6270
Make outbox settings extensions still fluent since others are too
danielmarbach Aug 26, 2025
b98b545
Inline the hierarchy since we really only have these outbox tests and…
danielmarbach Aug 27, 2025
392684c
Some minor formattingˆ
danielmarbach Aug 27, 2025
34b3e64
Additional test
danielmarbach Aug 27, 2025
88a5956
Use explicit fallback sacrificing network latency for a more safe by …
danielmarbach Aug 27, 2025
4b1a306
Remove TODO for now since we can deal with this elsewhere
danielmarbach Aug 27, 2025
ec1258f
Extract filtering
danielmarbach Sep 1, 2025
efaef8c
Comment
danielmarbach Sep 1, 2025
379c950
Comment
danielmarbach Sep 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
namespace NServiceBus.AcceptanceTests.Outbox;

using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using AcceptanceTests;
using EndpointTemplates;
using NUnit.Framework;

public class When_subscribers_handles_the_same_event : NServiceBusAcceptanceTest
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreasohlund I took over the test you wrote. I think this should be moved into Core ideally. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

{
[Test]
public async Task Should_be_processed_by_all_subscribers()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b =>
b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, session => session.Publish(new MyEvent()))
)
.WithEndpoint<Subscriber1>()
.WithEndpoint<Subscriber2>()
.Done(c => c.Subscriber1GotTheEvent && c.Subscriber2GotTheEvent)
.Run(TimeSpan.FromSeconds(10));

Assert.Multiple(() =>
{
Assert.That(context.Subscriber1GotTheEvent, Is.True);
Assert.That(context.Subscriber2GotTheEvent, Is.True);
});
}

public class Context : ScenarioContext
{
public bool Subscriber1Subscribed { get; set; }
public bool Subscriber2Subscribed { get; set; }

public bool Subscriber1GotTheEvent { get; set; }
public bool Subscriber2GotTheEvent { get; set; }
}

public class Publisher : EndpointConfigurationBuilder
{
public Publisher() => EndpointSetup<DefaultPublisher>(c =>
{
c.OnEndpointSubscribed<Context>((s, context) =>
{
var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1));
if (s.SubscriberEndpoint.Contains(subscriber1))
{
context.Subscriber1Subscribed = true;
}

var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2));
if (s.SubscriberEndpoint.Contains(subscriber2))
{
context.Subscriber2Subscribed = true;
}
});
});
}

public class Subscriber1 : EndpointConfigurationBuilder
{
public Subscriber1() =>
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler(Context testContext) : IHandleMessages<MyEvent>
{
public Task Handle(MyEvent message, IMessageHandlerContext context)
{
testContext.Subscriber1GotTheEvent = true;
return Task.CompletedTask;
}
}
}

public class Subscriber2 : EndpointConfigurationBuilder
{
public Subscriber2() =>
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler(Context testContext) : IHandleMessages<MyEvent>
{
public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context)
{
testContext.Subscriber2GotTheEvent = true;
return Task.CompletedTask;
}
}
}

public class MyEvent : IEvent;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ await SagaInstaller.CreateInfrastructureForSagaDataTypes(ClientProvider.Client,
OutboxStorageFeature.RegisterOutboxClassMappings();
await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings, TimeSpan.FromDays(7), cancellationToken);

OutboxStorage = new OutboxPersister(ClientProvider.Client, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings);
OutboxStorage = new OutboxPersister(ClientProvider.Client, string.Empty, false, databaseName, MongoPersistence.DefaultDatabaseSettings, MongoPersistence.DefaultCollectionNamingConvention, MongoPersistence.DefaultCollectionSettings);
}

public async Task Cleanup(CancellationToken cancellationToken = default) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("NServiceBus.Storage.MongoDB.PersistenceTests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("NServiceBus.Storage.MongoDB.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("NServiceBus.Storage.MongoDB.TransactionalSession")]
namespace NServiceBus
{
public class CompatibilitySettings : NServiceBus.Configuration.AdvancedExtensibility.ExposeSettings
{
public NServiceBus.CompatibilitySettings CollectionNamingConvention(System.Func<System.Type, string> collectionNamingConvention) { }
public NServiceBus.CompatibilitySettings VersionElementName(string versionElementName) { }
}
public static class MongoOutboxSettingsExtensions
{
public static NServiceBus.Outbox.OutboxSettings DisableReadFallback(this NServiceBus.Outbox.OutboxSettings outboxSettings) { }
public static NServiceBus.Outbox.OutboxSettings TimeToKeepOutboxDeduplicationData(this NServiceBus.Outbox.OutboxSettings outboxSettings, System.TimeSpan timeToKeepOutboxDeduplicationData) { }
}
public class MongoPersistence : NServiceBus.Persistence.PersistenceDefinition
{
public MongoPersistence() { }
Expand All @@ -17,6 +23,8 @@ namespace NServiceBus
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> DatabaseName(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, string databaseName) { }
public static void DisableInstaller(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions) { }
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> MongoClient(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, MongoDB.Driver.IMongoClient mongoClient) { }
[System.Obsolete("Use \'MongoOutboxSettingsExtensions.TimeToKeepOutboxDeduplicationData\' instead. Wi" +
"ll be removed in version 8.0.0.", true)]
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> TimeToKeepOutboxDeduplicationData(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, System.TimeSpan timeToKeepOutboxDeduplicationData) { }
public static NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> UseTransactions(this NServiceBus.PersistenceExtensions<NServiceBus.MongoPersistence> persistenceExtensions, bool useTransactions) { }
}
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.Storage.MongoDB.Tests/ClientProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public static IMongoClient Client
}

MongoPersistence.SafeRegisterDefaultGuidSerializer();
OutboxStorage.RegisterOutboxClassMappings();

var containerConnectionString =
Environment.GetEnvironmentVariable("NServiceBusStorageMongoDB_ConnectionString");
Expand Down

This file was deleted.

173 changes: 159 additions & 14 deletions src/NServiceBus.Storage.MongoDB.Tests/Outbox/OutboxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,136 @@

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NServiceBus.Extensibility;
using Extensibility;
using NServiceBus.Outbox;
using NUnit.Framework;

public class OutboxStorageTest : OutboxPersisterTests
public class OutboxStorageTest
{
[OneTimeSetUp]
public virtual async Task OneTimeSetUp()
{
databaseName = $"Test_{DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture)}";

var database = ClientProvider.Client.GetDatabase(databaseName, MongoPersistence.DefaultDatabaseSettings);

await database.CreateCollectionAsync(collectionNamingConvention(typeof(OutboxRecord)));

await OutboxInstaller.CreateInfrastructureForOutboxTypes(ClientProvider.Client, databaseName,
MongoPersistence.DefaultDatabaseSettings, collectionNamingConvention,
MongoPersistence.DefaultCollectionSettings, TimeSpan.FromHours(1));

transactionFactory = new OutboxTransactionFactory(ClientProvider.Client, databaseName,
MongoPersistence.DefaultDatabaseSettings,
collectionNamingConvention, MongoPersistence.DefaultTransactionTimeout);
}

[OneTimeTearDown]
public virtual async Task OneTimeTearDown() => await ClientProvider.Client.DropDatabaseAsync(databaseName);

[Test]
public async Task Should_return_same_data()
{
var persister = SetupPersister();

var msgId = RandomString();

var operations = new Outbox.TransportOperation[]
var operations = new TransportOperation[]
{
new Outbox.TransportOperation(RandomString(), FillDictionary(new Transport.DispatchProperties(), 3),
new(RandomString(), FillDictionary(new Transport.DispatchProperties(), 3),
Encoding.UTF8.GetBytes(RandomString()), FillDictionary(new Dictionary<string, string>(), 3)),
new Outbox.TransportOperation(RandomString(), FillDictionary(new Transport.DispatchProperties(), 3),
new(RandomString(), FillDictionary(new Transport.DispatchProperties(), 3),
Encoding.UTF8.GetBytes(RandomString()), FillDictionary(new Dictionary<string, string>(), 3)),
new Outbox.TransportOperation(RandomString(), FillDictionary(new Transport.DispatchProperties(), 3),
new(RandomString(), FillDictionary(new Transport.DispatchProperties(), 3),
Encoding.UTF8.GetBytes(RandomString()), FillDictionary(new Dictionary<string, string>(), 3)),
};
var testMessage = new Outbox.OutboxMessage(msgId, operations);
var testMessage = new OutboxMessage(msgId, operations);

var context = new ContextBag();
var empty = await configuration.OutboxStorage.Get(msgId, context);
var empty = await persister.Get(msgId, context);
Assert.That(empty, Is.Null);

using (var transaction = await configuration.CreateTransaction(context))
using (var transaction = await CreateTransaction(context))
{
await configuration.OutboxStorage.Store(testMessage, transaction, context);
await persister.Store(testMessage, transaction, context);
await transaction.Commit();
}

var received = await configuration.OutboxStorage.Get(msgId, context);
var received = await persister.Get(msgId, context);

AreSame(received, msgId, operations);
}

[Test]
public async Task Should_support_legacy_format_in_get()
{
var persister = SetupPersister(enableReadFallback: true);

var msgId = RandomString();

var database = ClientProvider.Client.GetDatabase(databaseName, MongoPersistence.DefaultDatabaseSettings);
var outboxCollection = database.GetCollection<DuckTypeOutboxRecord>(
collectionNamingConvention(typeof(OutboxRecord)), MongoPersistence.DefaultCollectionSettings);

var transportOperation = new TransportOperation(RandomString(),
FillDictionary(new Transport.DispatchProperties(), 3),
Encoding.UTF8.GetBytes(RandomString()), FillDictionary(new Dictionary<string, string>(), 3));

var transportOperations = new[] { transportOperation };

var storageOperations = transportOperations.Select(o => new StorageTransportOperation(o)).ToArray();

await outboxCollection.InsertOneAsync(new DuckTypeOutboxRecord
{
Id = msgId,
TransportOperations = storageOperations,
});

var context = new ContextBag();

var received = await persister.Get(msgId, context);

AreSame(received, msgId, transportOperations);
}

[Test]
public async Task Should_treat_as_new_record_when_fallback_disabled()
{
var persister = SetupPersister(enableReadFallback: false);

var msgId = RandomString();

var database = ClientProvider.Client.GetDatabase(databaseName, MongoPersistence.DefaultDatabaseSettings);
var outboxCollection = database.GetCollection<DuckTypeOutboxRecord>(
collectionNamingConvention(typeof(OutboxRecord)), MongoPersistence.DefaultCollectionSettings);

var transportOperation = new TransportOperation(RandomString(),
FillDictionary(new Transport.DispatchProperties(), 3),
Encoding.UTF8.GetBytes(RandomString()), FillDictionary(new Dictionary<string, string>(), 3));

var transportOperations = new[] { transportOperation };

var storageOperations = transportOperations.Select(o => new StorageTransportOperation(o)).ToArray();

await outboxCollection.InsertOneAsync(new DuckTypeOutboxRecord
{
Id = msgId,
TransportOperations = storageOperations,
});

var context = new ContextBag();

var received = await persister.Get(msgId, context);

Assert.That(received, Is.Null);
}

static void AreSame(OutboxMessage received, string msgId, TransportOperation[] operations)
{
Assert.Multiple(() =>
{
Assert.That(received.MessageId, Is.EqualTo(msgId));
Expand Down Expand Up @@ -66,12 +161,56 @@ public async Task Should_return_same_data()
}
}

string RandomString()
[Test]
public async Task Should_support_legacy_format_in_set_as_dispatched()
{
return Guid.NewGuid().ToString();
var persister = SetupPersister(enableReadFallback: true);

var msgId = RandomString();

var database = ClientProvider.Client.GetDatabase(databaseName, MongoPersistence.DefaultDatabaseSettings);
var outboxCollection = database.GetCollection<DuckTypeOutboxRecord>(
collectionNamingConvention(typeof(OutboxRecord)), MongoPersistence.DefaultCollectionSettings);

var transportOperation = new TransportOperation(RandomString(),
FillDictionary(new Transport.DispatchProperties(), 3),
Encoding.UTF8.GetBytes(RandomString()), FillDictionary(new Dictionary<string, string>(), 3));

var transportOperations = new[] { transportOperation };

var storageOperations = transportOperations.Select(o => new StorageTransportOperation(o)).ToArray();

await outboxCollection.InsertOneAsync(new DuckTypeOutboxRecord
{
Id = msgId,
TransportOperations = storageOperations,
});

var context = new ContextBag();

await persister.SetAsDispatched(msgId, context);

var receivedAfter = await persister.Get(msgId, context);

Assert.That(receivedAfter.TransportOperations, Is.Empty);
}

T FillDictionary<T>(T dictionary, int count)
OutboxPersister SetupPersister(bool enableReadFallback = true, string partitionKey = "") => new(
ClientProvider.Client, partitionKey, enableReadFallback, databaseName, MongoPersistence.DefaultDatabaseSettings,
collectionNamingConvention, MongoPersistence.DefaultCollectionSettings);


class DuckTypeOutboxRecord
{
public string Id { get; set; }
public DateTime? Dispatched { get; set; }

public StorageTransportOperation[] TransportOperations { get; set; }
}

static string RandomString() => Guid.NewGuid().ToString();

static T FillDictionary<T>(T dictionary, int count)
where T : Dictionary<string, string>
{
for (var i = 0; i < count; i++)
Expand All @@ -81,4 +220,10 @@ T FillDictionary<T>(T dictionary, int count)

return dictionary;
}

Task<IOutboxTransaction> CreateTransaction(ContextBag context) => transactionFactory.BeginTransaction(context);

string databaseName;
OutboxTransactionFactory transactionFactory;
readonly Func<Type, string> collectionNamingConvention = t => t.Name.ToLower();
}
Loading
Loading