Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task Should_create_topology_for_events_to_migrate()
SubscribingQueueName = "SubscribingQueue",
Client = client,
AdministrationClient = administrationClient
}, topologyOptions);
}, topologyOptions, new ManifestItems());

await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag());

Expand Down Expand Up @@ -72,7 +72,7 @@ public async Task Should_create_topology_for_migrated_and_not_migrated_events()
SubscribingQueueName = "SubscribingQueue",
Client = client,
AdministrationClient = administrationClient
}, topologyOptions);
}, topologyOptions, new ManifestItems());

await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag());

Expand All @@ -98,7 +98,7 @@ public async Task Should_throw_when_event_is_not_mapped()
SubscribingQueueName = "SubscribingQueue",
Client = client,
AdministrationClient = administrationClient
}, topologyOptions);
}, topologyOptions, new ManifestItems());

await Assert.ThatAsync(() => subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1))], new ContextBag()), Throws.Exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task Should_create_topology_for_mapped_events()
SubscribingQueueName = "SubscribingQueue",
Client = client,
AdministrationClient = administrationClient
}, topologyOptions);
}, topologyOptions, new ManifestItems());

await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag());

Expand All @@ -56,7 +56,7 @@ public async Task Should_create_topology_for_unmapped_events()
SubscribingQueueName = "SubscribingQueue",
Client = client,
AdministrationClient = administrationClient
}, topologyOptions);
}, topologyOptions, new ManifestItems());

await subscriptionManager.SubscribeAll([new MessageMetadata(typeof(MyEvent1)), new MessageMetadata(typeof(MyEvent2))], new ContextBag());

Expand Down
21 changes: 20 additions & 1 deletion src/Transport/AzureServiceBusTransportInfrastructure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
});

WriteStartupDiagnostics(hostSettings.StartupDiagnostic);
WriteManifest(hostSettings.Manifest);
}

void WriteStartupDiagnostics(StartupDiagnosticEntries startupDiagnostic) =>
Expand Down Expand Up @@ -89,7 +90,7 @@
SetupInfrastructure = hostSettings.SetupInfrastructure,
SubscribingQueueName = receiveAddress,
EntityMaximumSizeInMegabytes = transportSettings.EntityMaximumSizeInMegabytes
})
}, hostSettings)
: null,
subQueue
);
Expand Down Expand Up @@ -142,4 +143,22 @@
QueueAddressQualifier.DeadLetterQueue.Equals(address.Qualifier, StringComparison.OrdinalIgnoreCase)
? SubQueue.DeadLetter
: SubQueue.None;

void WriteManifest(ManifestItems manifest)

Check failure on line 147 in src/Transport/AzureServiceBusTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 147 in src/Transport/AzureServiceBusTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 147 in src/Transport/AzureServiceBusTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 147 in src/Transport/AzureServiceBusTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)
{
var inputQueues = receiveSettingsAndClientPairs
.Select(settingsAndClient => (ManifestItems.ManifestItem)ToTransportAddress(settingsAndClient.receiveSettings.ReceiveAddress).ToLower())
.ToArray();

manifest.Add("asbSettings", new ManifestItems.ManifestItem
{
ItemValue = [
new("entityMaximumSize", $"{transportSettings.EntityMaximumSize}GB"),
new("prefetchCount", transportSettings.PrefetchCount?.ToString() ?? "default"),
new("prefetchMultiplier", transportSettings.PrefetchMultiplier.ToString()),
new("enablePartitioning", transportSettings.EnablePartitioning.ToString().ToLower())
]
});
manifest.Add("inputQueues", new ManifestItems.ManifestItem { ArrayValue = inputQueues });
}
}
3 changes: 2 additions & 1 deletion src/Transport/EventRouting/MigrationTopology.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ protected override string GetPublishDestinationCore(string eventTypeFullName)
throw new Exception($"When using migration topology, every event needs to be marked either as migrated or pending migration to avoid message loss. In the topology configuration use either MigratedPublishedEvent<'{eventTypeFullName}'>() or EventToMigrate<'{eventTypeFullName}'>(), depending on the migration state of this event.");
}

internal override SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions) => new MigrationTopologySubscriptionManager(creationOptions, Options);
internal override SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions, HostSettings hostSettings) =>
new MigrationTopologySubscriptionManager(creationOptions, Options, hostSettings.Manifest);

sealed class OptionsValidatorDecorator(IValidateOptions<MigrationTopologyOptions> decorated)
: IValidateOptions<TopologyOptions>
Expand Down
31 changes: 26 additions & 5 deletions src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,49 @@
#pragma warning disable CS0618 // Type or member is obsolete
readonly MigrationTopologyOptions topologyOptions;
#pragma warning restore CS0618 // Type or member is obsolete
readonly ManifestItems manifest;

Check failure on line 19 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 19 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 19 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 19 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)
readonly string subscriptionName;

#pragma warning disable CS0618 // Type or member is obsolete
public MigrationTopologySubscriptionManager(SubscriptionManagerCreationOptions creationOptions, MigrationTopologyOptions topologyOptions) : base(creationOptions)
public MigrationTopologySubscriptionManager(SubscriptionManagerCreationOptions creationOptions, MigrationTopologyOptions topologyOptions, ManifestItems manifest) : base(creationOptions)

Check failure on line 23 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 23 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 23 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 23 in src/Transport/EventRouting/MigrationTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)
#pragma warning restore CS0618 // Type or member is obsolete
{
this.topologyOptions = topologyOptions;
this.manifest = manifest;
subscriptionName = topologyOptions.QueueNameToSubscriptionNameMap.GetValueOrDefault(CreationOptions.SubscribingQueueName, CreationOptions.SubscribingQueueName);
}

static readonly ILog Logger = LogManager.GetLogger<MigrationTopologySubscriptionManager>();

public override Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context,
CancellationToken cancellationToken = default) =>
eventTypes.Length switch
CancellationToken cancellationToken = default)
{
var subscriptions = eventTypes
.Select(eventType => eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"))
.SelectMany(eventTypeFullName =>
topologyOptions.SubscribedEventToTopicsMap
.GetValueOrDefault(eventTypeFullName, [eventTypeFullName])
.Select(topicName => new { Topic = topicName.ToLower(), MessageType = eventTypeFullName }))
.GroupBy(topicAndMessageType => topicAndMessageType.Topic)
.Select(group => new ManifestItems.ManifestItem
{
ItemValue = [
new("topicName", group.Key),
new("messageTypes", new ManifestItems.ManifestItem { ArrayValue = group.Select(topicAndMessageType => (ManifestItems.ManifestItem)topicAndMessageType.MessageType).ToArray() })
]
})
.ToArray();
manifest.Add("subscriptions", new ManifestItems.ManifestItem { ArrayValue = subscriptions });

return eventTypes.Length switch
{
0 => Task.CompletedTask,
1 => SubscribeEvent(eventTypes[0].MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken),
1 => SubscribeEvent(eventTypes[0].MessageType.FullName!, cancellationToken),
_ => Task.WhenAll(eventTypes.Select(eventType =>
SubscribeEvent(eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken))
SubscribeEvent(eventType.MessageType.FullName!, cancellationToken))
.ToArray())
};
}

async Task SubscribeEvent(string eventTypeFullName, CancellationToken cancellationToken)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Transport/EventRouting/TopicPerEventTopology.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ protected override string GetPublishDestinationCore(string eventTypeFullName)
}

internal override SubscriptionManager CreateSubscriptionManager(
SubscriptionManagerCreationOptions creationOptions) =>
new TopicPerEventTopologySubscriptionManager(creationOptions, Options);
SubscriptionManagerCreationOptions creationOptions, HostSettings hostSettings) =>
new TopicPerEventTopologySubscriptionManager(creationOptions, Options, hostSettings.Manifest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,49 @@
sealed class TopicPerEventTopologySubscriptionManager : SubscriptionManager
{
readonly TopologyOptions topologyOptions;
readonly ManifestItems manifest;

Check failure on line 17 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 17 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 17 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 17 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)
readonly string subscriptionName;

public TopicPerEventTopologySubscriptionManager(SubscriptionManagerCreationOptions creationOptions,
TopologyOptions topologyOptions) : base(creationOptions)
TopologyOptions topologyOptions,
ManifestItems manifest) : base(creationOptions)

Check failure on line 22 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 22 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 22 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 22 in src/Transport/EventRouting/TopicPerEventTopologySubscriptionManager.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'ManifestItems' could not be found (are you missing a using directive or an assembly reference?)
{
this.topologyOptions = topologyOptions;
this.manifest = manifest;
subscriptionName = topologyOptions.QueueNameToSubscriptionNameMap.GetValueOrDefault(CreationOptions.SubscribingQueueName, CreationOptions.SubscribingQueueName);
}

static readonly ILog Logger = LogManager.GetLogger<TopicPerEventTopologySubscriptionManager>();

public override Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context,
CancellationToken cancellationToken = default) =>
eventTypes.Length switch
CancellationToken cancellationToken = default)
{
var subscriptions = eventTypes
.Select(eventType => eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"))
.SelectMany(eventTypeFullName =>
topologyOptions.SubscribedEventToTopicsMap
.GetValueOrDefault(eventTypeFullName, [eventTypeFullName])
.Select(topicName => new { Topic = topicName.ToLower(), MessageType = eventTypeFullName }))
.GroupBy(topicAndMessageType => topicAndMessageType.Topic)
.Select(group => new ManifestItems.ManifestItem
{
ItemValue = [
new("topicName", group.Key),
new("messageTypes", new ManifestItems.ManifestItem { ArrayValue = group.Select(topicAndMessageType => (ManifestItems.ManifestItem)topicAndMessageType.MessageType).ToArray() })
]
})
.ToArray();
manifest.Add("subscriptions", new ManifestItems.ManifestItem { ArrayValue = subscriptions });

return eventTypes.Length switch
{
0 => Task.CompletedTask,
1 => SubscribeEvent(eventTypes[0].MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken),
1 => SubscribeEvent(eventTypes[0].MessageType.FullName!, cancellationToken),
_ => Task.WhenAll(eventTypes.Select(eventType =>
SubscribeEvent(eventType.MessageType.FullName ?? throw new InvalidOperationException("Message type full name is null"), cancellationToken))
SubscribeEvent(eventType.MessageType.FullName!, cancellationToken))
.ToArray())
};
}

Task SubscribeEvent(string eventTypeFullName, CancellationToken cancellationToken)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Transport/EventRouting/TopicTopology.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace NServiceBus
using System;
using System.ComponentModel.DataAnnotations;
using Microsoft.Extensions.Options;
using NServiceBus.Transport;
using Particular.Obsoletes;
using Transport.AzureServiceBus;

Expand Down Expand Up @@ -113,7 +114,7 @@ internal string GetPublishDestination(Type eventType)

// By having this internal abstract method it is not possible to extend the topology with a custom topology outside
// of this assembly. That is a deliberate design decision.
internal abstract SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions);
internal abstract SubscriptionManager CreateSubscriptionManager(SubscriptionManagerCreationOptions creationOptions, HostSettings hostSettings);

/// <summary>
/// Returns instructions where to publish a given event.
Expand Down
3 changes: 3 additions & 0 deletions src/TransportTests/When_using_dlq_qualifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ async Task SeamReceiveFromDlqAndSendTestMessage(TransportTransactionMode mode, C
inputQueueName,
string.Empty,
new StartupDiagnosticEntries(),
new ManifestItems(),
(message, ex, token) => { },
false
);
Expand Down Expand Up @@ -168,6 +169,7 @@ async Task SendViaTransportSeam(CancellationToken cancellationToken)
inputQueueName,
string.Empty,
new StartupDiagnosticEntries(),
new ManifestItems(),
(_, _, _) => { },
true
);
Expand Down Expand Up @@ -211,6 +213,7 @@ async Task BlockUntilReceivedViaTransportSeam(CancellationToken cancellationToke
inputQueueName,
string.Empty,
new StartupDiagnosticEntries(),
new ManifestItems(),
(message, ex, token) => { },
true
);
Expand Down
Loading