diff --git a/src/Tests/EventRouting/MigrationTopologySubscriptionManagerTests.cs b/src/Tests/EventRouting/MigrationTopologySubscriptionManagerTests.cs index ec374452..d8c55c3b 100644 --- a/src/Tests/EventRouting/MigrationTopologySubscriptionManagerTests.cs +++ b/src/Tests/EventRouting/MigrationTopologySubscriptionManagerTests.cs @@ -41,7 +41,7 @@ public async Task Should_create_topology_for_events_to_migrate() SubscribingQueueName = "SubscribingQueue", Client = client, AdministrationClient = administrationClient - }, topologyOptions); + }, topologyOptions, new ManifestItems()); await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag()); @@ -72,7 +72,7 @@ public async Task Should_create_topology_for_migrated_and_not_migrated_events() SubscribingQueueName = "SubscribingQueue", Client = client, AdministrationClient = administrationClient - }, topologyOptions); + }, topologyOptions, new ManifestItems()); await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag()); @@ -98,7 +98,7 @@ public async Task Should_throw_when_event_is_not_mapped() SubscribingQueueName = "SubscribingQueue", Client = client, AdministrationClient = administrationClient - }, topologyOptions); + }, topologyOptions, new ManifestItems()); await Assert.ThatAsync(() => subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1))], new ContextBag()), Throws.Exception); } diff --git a/src/Tests/EventRouting/TopicPerEventSubscriptionManagerTests.cs b/src/Tests/EventRouting/TopicPerEventSubscriptionManagerTests.cs index 27e33e1a..b9ed0a07 100644 --- a/src/Tests/EventRouting/TopicPerEventSubscriptionManagerTests.cs +++ b/src/Tests/EventRouting/TopicPerEventSubscriptionManagerTests.cs @@ -32,7 +32,7 @@ public async Task Should_create_topology_for_mapped_events() SubscribingQueueName = "SubscribingQueue", Client = client, AdministrationClient = administrationClient - }, topologyOptions); + }, topologyOptions, new ManifestItems()); await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag()); @@ -56,7 +56,7 @@ public async Task Should_create_topology_for_unmapped_events() SubscribingQueueName = "SubscribingQueue", Client = client, AdministrationClient = administrationClient - }, topologyOptions); + }, topologyOptions, new ManifestItems()); await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag()); diff --git a/src/Transport/AzureServiceBusTransportInfrastructure.cs b/src/Transport/AzureServiceBusTransportInfrastructure.cs index b79d4786..9a2191c1 100644 --- a/src/Transport/AzureServiceBusTransportInfrastructure.cs +++ b/src/Transport/AzureServiceBusTransportInfrastructure.cs @@ -52,6 +52,7 @@ ServiceBusAdministrationClient administrationClient }); WriteStartupDiagnostics(hostSettings.StartupDiagnostic); + WriteManifest(hostSettings.Manifest); } void WriteStartupDiagnostics(StartupDiagnosticEntries startupDiagnostic) => @@ -89,7 +90,7 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, ServiceBusCl SetupInfrastructure = hostSettings.SetupInfrastructure, SubscribingQueueName = receiveAddress, EntityMaximumSizeInMegabytes = transportSettings.EntityMaximumSizeInMegabytes - }) + }, hostSettings) : null, subQueue ); @@ -142,4 +143,22 @@ static SubQueue ToSubQueue(QueueAddress address) => QueueAddressQualifier.DeadLetterQueue.Equals(address.Qualifier, StringComparison.OrdinalIgnoreCase) ? SubQueue.DeadLetter : SubQueue.None; + + void WriteManifest(ManifestItems manifest) + { + var inputQueues = receiveSettingsAndClientPairs + .Select(settingsAndClient => (ManifestItems.ManifestItem)ToTransportAddress(settingsAndClient.receiveSettings.ReceiveAddress).ToLower()) + .ToArray(); + + manifest.Add("asbSettings", new ManifestItems.ManifestItem + { + ItemValue = [ + new("entityMaximumSize", $"{transportSettings.EntityMaximumSize}GB"), + new("prefetchCount", transportSettings.PrefetchCount?.ToString() ?? "default"), + new("prefetchMultiplier", transportSettings.PrefetchMultiplier.ToString()), + new("enablePartitioning", transportSettings.EnablePartitioning.ToString().ToLower()) + ] + }); + manifest.Add("inputQueues", new ManifestItems.ManifestItem { ArrayValue = inputQueues }); + } } \ No newline at end of file diff --git a/src/Transport/EventRouting/MigrationTopology.cs b/src/Transport/EventRouting/MigrationTopology.cs index ab03f128..3f86f070 100644 --- a/src/Transport/EventRouting/MigrationTopology.cs +++ b/src/Transport/EventRouting/MigrationTopology.cs @@ -187,7 +187,8 @@ protected override string GetPublishDestinationCore(string eventTypeFullName) throw new Exception($"When using migration topology, every event needs to be marked either as migrated or pending migration to avoid message loss. In the topology configuration use either MigratedPublishedEvent<'{eventTypeFullName}'>() or EventToMigrate<'{eventTypeFullName}'>(), depending on the migration state of this event."); } - internal override SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions) => new MigrationTopologySubscriptionManager(creationOptions, Options); + internal override SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions, HostSettings hostSettings) => + new MigrationTopologySubscriptionManager(creationOptions, Options, hostSettings.Manifest); sealed class OptionsValidatorDecorator(IValidateOptions decorated) : IValidateOptions diff --git a/src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs b/src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs index 1466b36b..fa0d868f 100644 --- a/src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs +++ b/src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs @@ -16,28 +16,49 @@ sealed class MigrationTopologySubscriptionManager : SubscriptionManager #pragma warning disable CS0618 // Type or member is obsolete readonly MigrationTopologyOptions topologyOptions; #pragma warning restore CS0618 // Type or member is obsolete + readonly ManifestItems manifest; readonly string subscriptionName; #pragma warning disable CS0618 // Type or member is obsolete - public MigrationTopologySubscriptionManager(SubscriptionManagerCreationOptions creationOptions, MigrationTopologyOptions topologyOptions) : base(creationOptions) + public MigrationTopologySubscriptionManager(SubscriptionManagerCreationOptions creationOptions, MigrationTopologyOptions topologyOptions, ManifestItems manifest) : base(creationOptions) #pragma warning restore CS0618 // Type or member is obsolete { this.topologyOptions = topologyOptions; + this.manifest = manifest; subscriptionName = topologyOptions.QueueNameToSubscriptionNameMap.GetValueOrDefault(CreationOptions.SubscribingQueueName, CreationOptions.SubscribingQueueName); } static readonly ILog Logger = LogManager.GetLogger(); public override Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context, - CancellationToken cancellationToken = default) => - eventTypes.Length switch + CancellationToken cancellationToken = default) + { + var subscriptions = eventTypes + .Select(eventType => eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null")) + .SelectMany(eventTypeFullName => + topologyOptions.SubscribedEventToTopicsMap + .GetValueOrDefault(eventTypeFullName, [eventTypeFullName]) + .Select(topicName => new { Topic = topicName.ToLower(), MessageType = eventTypeFullName })) + .GroupBy(topicAndMessageType => topicAndMessageType.Topic) + .Select(group => new ManifestItems.ManifestItem + { + ItemValue = [ + new("topicName", group.Key), + new("messageTypes", new ManifestItems.ManifestItem { ArrayValue = group.Select(topicAndMessageType => (ManifestItems.ManifestItem)topicAndMessageType.MessageType).ToArray() }) + ] + }) + .ToArray(); + manifest.Add("subscriptions", new ManifestItems.ManifestItem { ArrayValue = subscriptions }); + + return eventTypes.Length switch { 0 => Task.CompletedTask, - 1 => SubscribeEvent(eventTypes[0].MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken), + 1 => SubscribeEvent(eventTypes[0].MessageType.FullName!, cancellationToken), _ => Task.WhenAll(eventTypes.Select(eventType => - SubscribeEvent(eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken)) + SubscribeEvent(eventType.MessageType.FullName!, cancellationToken)) .ToArray()) }; + } async Task SubscribeEvent(string eventTypeFullName, CancellationToken cancellationToken) { diff --git a/src/Transport/EventRouting/TopicPerEventTopology.cs b/src/Transport/EventRouting/TopicPerEventTopology.cs index c7d8e2cd..11ad6a0d 100644 --- a/src/Transport/EventRouting/TopicPerEventTopology.cs +++ b/src/Transport/EventRouting/TopicPerEventTopology.cs @@ -113,6 +113,6 @@ protected override string GetPublishDestinationCore(string eventTypeFullName) } internal override SubscriptionManager CreateSubscriptionManager( - SubscriptionManagerCreationOptions creationOptions) => - new TopicPerEventTopologySubscriptionManager(creationOptions, Options); + SubscriptionManagerCreationOptions creationOptions, HostSettings hostSettings) => + new TopicPerEventTopologySubscriptionManager(creationOptions, Options, hostSettings.Manifest); } \ No newline at end of file diff --git a/src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs b/src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs index 385d0716..60234513 100644 --- a/src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs +++ b/src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs @@ -14,27 +14,49 @@ namespace NServiceBus.Transport.AzureServiceBus; sealed class TopicPerEventTopologySubscriptionManager : SubscriptionManager { readonly TopologyOptions topologyOptions; + readonly ManifestItems manifest; readonly string subscriptionName; public TopicPerEventTopologySubscriptionManager(SubscriptionManagerCreationOptions creationOptions, - TopologyOptions topologyOptions) : base(creationOptions) + TopologyOptions topologyOptions, + ManifestItems manifest) : base(creationOptions) { this.topologyOptions = topologyOptions; + this.manifest = manifest; subscriptionName = topologyOptions.QueueNameToSubscriptionNameMap.GetValueOrDefault(CreationOptions.SubscribingQueueName, CreationOptions.SubscribingQueueName); } static readonly ILog Logger = LogManager.GetLogger(); public override Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context, - CancellationToken cancellationToken = default) => - eventTypes.Length switch + CancellationToken cancellationToken = default) + { + var subscriptions = eventTypes + .Select(eventType => eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null")) + .SelectMany(eventTypeFullName => + topologyOptions.SubscribedEventToTopicsMap + .GetValueOrDefault(eventTypeFullName, [eventTypeFullName]) + .Select(topicName => new { Topic = topicName.ToLower(), MessageType = eventTypeFullName })) + .GroupBy(topicAndMessageType => topicAndMessageType.Topic) + .Select(group => new ManifestItems.ManifestItem + { + ItemValue = [ + new("topicName", group.Key), + new("messageTypes", new ManifestItems.ManifestItem { ArrayValue = group.Select(topicAndMessageType => (ManifestItems.ManifestItem)topicAndMessageType.MessageType).ToArray() }) + ] + }) + .ToArray(); + manifest.Add("subscriptions", new ManifestItems.ManifestItem { ArrayValue = subscriptions }); + + return eventTypes.Length switch { 0 => Task.CompletedTask, - 1 => SubscribeEvent(eventTypes[0].MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken), + 1 => SubscribeEvent(eventTypes[0].MessageType.FullName!, cancellationToken), _ => Task.WhenAll(eventTypes.Select(eventType => - SubscribeEvent(eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken)) + SubscribeEvent(eventType.MessageType.FullName!, cancellationToken)) .ToArray()) }; + } Task SubscribeEvent(string eventTypeFullName, CancellationToken cancellationToken) { diff --git a/src/Transport/EventRouting/TopicTopology.cs b/src/Transport/EventRouting/TopicTopology.cs index bb736c9b..62c3378d 100644 --- a/src/Transport/EventRouting/TopicTopology.cs +++ b/src/Transport/EventRouting/TopicTopology.cs @@ -3,6 +3,7 @@ namespace NServiceBus using System; using System.ComponentModel.DataAnnotations; using Microsoft.Extensions.Options; + using NServiceBus.Transport; using Particular.Obsoletes; using Transport.AzureServiceBus; @@ -113,7 +114,7 @@ internal string GetPublishDestination(Type eventType) // By having this internal abstract method it is not possible to extend the topology with a custom topology outside // of this assembly. That is a deliberate design decision. - internal abstract SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions); + internal abstract SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions, HostSettings hostSettings); /// /// Returns instructions where to publish a given event. diff --git a/src/TransportTests/When_using_dlq_qualifier.cs b/src/TransportTests/When_using_dlq_qualifier.cs index cd38dbbe..4045e201 100644 --- a/src/TransportTests/When_using_dlq_qualifier.cs +++ b/src/TransportTests/When_using_dlq_qualifier.cs @@ -109,6 +109,7 @@ async Task SeamReceiveFromDlqAndSendTestMessage(TransportTransactionMode mode, C inputQueueName, string.Empty, new StartupDiagnosticEntries(), + new ManifestItems(), (message, ex, token) => { }, false ); @@ -168,6 +169,7 @@ async Task SendViaTransportSeam(CancellationToken cancellationToken) inputQueueName, string.Empty, new StartupDiagnosticEntries(), + new ManifestItems(), (_, _, _) => { }, true ); @@ -211,6 +213,7 @@ async Task BlockUntilReceivedViaTransportSeam(CancellationToken cancellationToke inputQueueName, string.Empty, new StartupDiagnosticEntries(), + new ManifestItems(), (message, ex, token) => { }, true );