11namespace ServiceControl . Transports . ASBS
22{
33 using System . Linq ;
4+ using System . Text . Json ;
45 using BrokerThroughput ;
6+ using Configuration ;
57 using Microsoft . Extensions . DependencyInjection ;
68 using NServiceBus ;
9+ using NServiceBus . Transport . AzureServiceBus ;
710
811 public class ASBSTransportCustomization : TransportCustomization < AzureServiceBusTransport >
912 {
@@ -19,18 +22,27 @@ protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfigur
1922 protected override AzureServiceBusTransport CreateTransport ( TransportSettings transportSettings , TransportTransactionMode preferredTransactionMode = TransportTransactionMode . ReceiveOnly )
2023 {
2124 var connectionSettings = ConnectionStringParser . Parse ( transportSettings . ConnectionString ) ;
22- var transport = connectionSettings . AuthenticationMethod . CreateTransportDefinition ( connectionSettings ) ;
23- transport . UseWebSockets = connectionSettings . UseWebSockets ;
2425
25- if ( connectionSettings . TopicName != null )
26+ if ( ! transportSettings . TryGet ( out TopicTopology selectedTopology ) )
2627 {
27- transport . Topology = TopicTopology . Single ( connectionSettings . TopicName ) ;
28+ //Topology is pre-selected and customized only when creating transport for the primary instance
29+ //For all other cases use the connection string to determine which topology to use
30+ if ( connectionSettings . TopicName != null )
31+ {
32+ #pragma warning disable CS0618 // Type or member is obsolete
33+ selectedTopology = TopicTopology . MigrateFromNamedSingleTopic ( connectionSettings . TopicName ) ;
34+ #pragma warning restore CS0618 // Type or member is obsolete
35+ }
36+ else
37+ {
38+ selectedTopology = TopicTopology . Default ;
39+ }
2840 }
2941
42+ var transport = connectionSettings . AuthenticationMethod . CreateTransportDefinition ( connectionSettings , selectedTopology ) ;
43+ transport . UseWebSockets = connectionSettings . UseWebSockets ;
3044 transport . EnablePartitioning = connectionSettings . EnablePartitioning ;
3145
32- transport . ConfigureNameShorteners ( ) ;
33-
3446 transport . TransportTransactionMode = transport . GetSupportedTransactionModes ( ) . Contains ( preferredTransactionMode ) ? preferredTransactionMode : TransportTransactionMode . ReceiveOnly ;
3547
3648 return transport ;
@@ -40,6 +52,46 @@ protected override void AddTransportForPrimaryCore(IServiceCollection services,
4052 TransportSettings transportSettings )
4153 {
4254 services . AddSingleton < IBrokerThroughputQuery , AzureQuery > ( ) ;
55+
56+ var connectionSettings = ConnectionStringParser . Parse ( transportSettings . ConnectionString ) ;
57+ TopicTopology selectedTopology ;
58+
59+ var serviceBusRootNamespace = new SettingsRootNamespace ( "ServiceControl.Transport.ASBS" ) ;
60+ if ( connectionSettings . TopicName != null )
61+ {
62+ //Bundle name provided -> use migration topology
63+ //Need to explicitly specific events to be published on the single topic
64+ #pragma warning disable CS0618 // Type or member is obsolete
65+ selectedTopology = TopicTopology . FromOptions ( new MigrationTopologyOptions
66+ #pragma warning restore CS0618 // Type or member is obsolete
67+ {
68+ TopicToPublishTo = connectionSettings . TopicName ,
69+ TopicToSubscribeOn = connectionSettings . TopicName ,
70+ EventsToMigrateMap = [
71+ "ServiceControl.Contracts.CustomCheckFailed" ,
72+ "ServiceControl.Contracts.CustomCheckSucceeded" ,
73+ "ServiceControl.Contracts.HeartbeatRestored" ,
74+ "ServiceControl.Contracts.HeartbeatStopped" ,
75+ "ServiceControl.Contracts.FailedMessagesArchived" ,
76+ "ServiceControl.Contracts.FailedMessagesUnArchived" ,
77+ "ServiceControl.Contracts.MessageFailed" ,
78+ "ServiceControl.Contracts.MessageFailureResolvedByRetry" ,
79+ "ServiceControl.Contracts.MessageFailureResolvedManually"
80+ ]
81+ } ) ;
82+ }
83+ else if ( SettingsReader . TryRead < string > ( serviceBusRootNamespace , "Topology" , out var topologyJson ) )
84+ {
85+ //Load topology from json
86+ selectedTopology = TopicTopology . FromOptions ( JsonSerializer . Deserialize ( topologyJson , TopologyOptionsSerializationContext . Default . TopologyOptions ) ) ;
87+ }
88+ else
89+ {
90+ //Default to topic-per-event topology
91+ selectedTopology = TopicTopology . Default ;
92+ }
93+
94+ transportSettings . Set ( selectedTopology ) ;
4395 }
4496
4597 protected override void AddTransportForMonitoringCore ( IServiceCollection services , TransportSettings transportSettings )
0 commit comments