Skip to content

Commit 2766d6f

Browse files
committed
- Only add topic mappings if running a primary instance
- Connection string has precedence over custom config
1 parent 8567382 commit 2766d6f

File tree

1 file changed

+40
-23
lines changed

1 file changed

+40
-23
lines changed

src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010

1111
public class ASBSTransportCustomization : TransportCustomization<AzureServiceBusTransport>
1212
{
13-
const string DefaultSingleTopic = "bundle-1";
14-
1513
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, AzureServiceBusTransport transportDefinition, TransportSettings transportSettings) =>
1614
transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
1715

@@ -24,21 +22,47 @@ protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfigur
2422
protected override AzureServiceBusTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
2523
{
2624
var connectionSettings = ConnectionStringParser.Parse(transportSettings.ConnectionString);
27-
TopicTopology selectedTopology;
2825

29-
var serviceBusRootNamespace = new SettingsRootNamespace("ServiceControl.Transport.ASBS");
30-
if (SettingsReader.TryRead<string>(serviceBusRootNamespace, "Topology", out var topologyJson))
26+
if (!transportSettings.TryGet(out TopicTopology selectedTopology))
3127
{
32-
//Load topology from json
33-
selectedTopology = TopicTopology.FromOptions(JsonSerializer.Deserialize<TopologyOptions>(topologyJson));
28+
//Topology is pre-selected and customized only when creating transport for the primary instance
29+
//For all other cases use the connection string to determine which topology to use
30+
if (connectionSettings.TopicName != null)
31+
{
32+
selectedTopology = TopicTopology.MigrateFromNamedSingleTopic(connectionSettings.TopicName);
33+
}
34+
else
35+
{
36+
selectedTopology = TopicTopology.Default;
37+
}
3438
}
35-
else if (connectionSettings.TopicName != null)
39+
40+
var transport = connectionSettings.AuthenticationMethod.CreateTransportDefinition(connectionSettings, selectedTopology);
41+
transport.UseWebSockets = connectionSettings.UseWebSockets;
42+
transport.EnablePartitioning = connectionSettings.EnablePartitioning;
43+
44+
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
45+
46+
return transport;
47+
}
48+
49+
protected override void AddTransportForPrimaryCore(IServiceCollection services,
50+
TransportSettings transportSettings)
51+
{
52+
services.AddSingleton<IBrokerThroughputQuery, AzureQuery>();
53+
54+
var connectionSettings = ConnectionStringParser.Parse(transportSettings.ConnectionString);
55+
TopicTopology selectedTopology;
56+
57+
var serviceBusRootNamespace = new SettingsRootNamespace("ServiceControl.Transport.ASBS");
58+
if (connectionSettings.TopicName != null)
3659
{
3760
//Bundle name provided -> use migration topology
61+
//Need to explicitly specific events to be published on the single topic
3862
selectedTopology = TopicTopology.FromOptions(new MigrationTopologyOptions
3963
{
40-
TopicToPublishTo = connectionSettings.TopicName ?? DefaultSingleTopic,
41-
TopicToSubscribeOn = connectionSettings.TopicName ?? DefaultSingleTopic,
64+
TopicToPublishTo = connectionSettings.TopicName,
65+
TopicToSubscribeOn = connectionSettings.TopicName,
4266
EventsToMigrateMap = [
4367
"ServiceControl.Contracts.CustomCheckFailed",
4468
"ServiceControl.Contracts.CustomCheckSucceeded",
@@ -52,25 +76,18 @@ protected override AzureServiceBusTransport CreateTransport(TransportSettings tr
5276
]
5377
});
5478
}
79+
else if (SettingsReader.TryRead<string>(serviceBusRootNamespace, "Topology", out var topologyJson))
80+
{
81+
//Load topology from json
82+
selectedTopology = TopicTopology.FromOptions(JsonSerializer.Deserialize(topologyJson, TopologyOptionsSerializationContext.Default.TopologyOptions));
83+
}
5584
else
5685
{
5786
//Default to topic-per-event topology
5887
selectedTopology = TopicTopology.Default;
5988
}
6089

61-
var transport = connectionSettings.AuthenticationMethod.CreateTransportDefinition(connectionSettings, selectedTopology);
62-
transport.UseWebSockets = connectionSettings.UseWebSockets;
63-
transport.EnablePartitioning = connectionSettings.EnablePartitioning;
64-
65-
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
66-
67-
return transport;
68-
}
69-
70-
protected override void AddTransportForPrimaryCore(IServiceCollection services,
71-
TransportSettings transportSettings)
72-
{
73-
services.AddSingleton<IBrokerThroughputQuery, AzureQuery>();
90+
transportSettings.Set(selectedTopology);
7491
}
7592

7693
protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)

0 commit comments

Comments
 (0)