Skip to content

Commit f35ae99

Browse files
move instance concurrency config from manifest to transport customisation (#4486)
* move instance concurrency config from manifest to transport customisation * update approval files * Remove EndpointType because it is not necessary * Proper cleanup --------- Co-authored-by: danielmarbach <[email protected]>
1 parent 432c019 commit f35ae99

File tree

18 files changed

+87
-40
lines changed

18 files changed

+87
-40
lines changed

src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"MaxBodySizeToStore": 102400,
2525
"InstanceName": "Particular.ServiceControl.Audit",
2626
"TransportConnectionString": null,
27-
"MaximumConcurrencyLevel": 32,
27+
"MaximumConcurrencyLevel": null,
2828
"ServiceControlQueueAddress": "Particular.ServiceControl",
2929
"TimeToRestartAuditIngestionAfterFailure": "00:01:00",
3030
"EnableFullTextSearchOnBodies": true

src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ public void CustomizeMonitoringEndpoint(EndpointConfiguration endpointConfigurat
7373

7474
public void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
7575

76-
public Task ProvisionQueues(TransportSettings transportSettings,
77-
IEnumerable<string> additionalQueues)
76+
public Task ProvisionQueues(TransportSettings transportSettings, IEnumerable<string> additionalQueues)
7877
{
7978
QueuesCreated = new List<string>(additionalQueues)
8079
{
@@ -84,8 +83,8 @@ public Task ProvisionQueues(TransportSettings transportSettings,
8483
return Task.CompletedTask;
8584
}
8685

87-
public Task<TransportInfrastructure> CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null,
88-
OnError onError = null, Func<string, Exception, Task> onCriticalError = null,
86+
public Task<TransportInfrastructure> CreateTransportInfrastructure(string name, TransportSettings transportSettings,
87+
OnMessage onMessage = null, OnError onError = null, Func<string, Exception, Task> onCriticalError = null,
8988
TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) =>
9089
throw new NotImplementedException();
9190
public string ToTransportQualifiedQueueName(string queueName) => queueName;

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ public AuditIngestion(
4444
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
4545
receivedMeter = metrics.GetCounter("Audit ingestion - received");
4646

47-
channel = Channel.CreateBounded<MessageContext>(new BoundedChannelOptions(settings.MaximumConcurrencyLevel)
47+
if (!transportSettings.MaxConcurrency.HasValue)
48+
{
49+
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
50+
}
51+
52+
channel = Channel.CreateBounded<MessageContext>(new BoundedChannelOptions(transportSettings.MaxConcurrency.Value)
4853
{
4954
SingleReader = true,
5055
SingleWriter = false,
@@ -199,7 +204,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
199204

200205
async Task Loop()
201206
{
202-
var contexts = new List<MessageContext>(settings.MaximumConcurrencyLevel);
207+
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
203208

204209
while (await channel.Reader.WaitToReadAsync())
205210
{

src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public Settings(string transportType = null, string persisterType = null, Loggin
4545
Port = SettingsReader.Read(SettingsRootNamespace, "Port", 44444);
4646
};
4747

48-
MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultAuditMaximumConcurrencyLevel ?? 32);
48+
MaximumConcurrencyLevel = SettingsReader.Read<int?>(SettingsRootNamespace, "MaximumConcurrencyLevel");
4949
ServiceControlQueueAddress = SettingsReader.Read<string>(SettingsRootNamespace, "ServiceControlQueueAddress");
5050
TimeToRestartAuditIngestionAfterFailure = GetTimeToRestartAuditIngestionAfterFailure();
5151
EnableFullTextSearchOnBodies = SettingsReader.Read(SettingsRootNamespace, "EnableFullTextSearchOnBodies", true);
@@ -144,7 +144,7 @@ public int MaxBodySizeToStore
144144
public string InstanceName { get; init; } = DEFAULT_INSTANCE_NAME;
145145

146146
public string TransportConnectionString { get; set; }
147-
public int MaximumConcurrencyLevel { get; set; }
147+
public int? MaximumConcurrencyLevel { get; set; }
148148
public string ServiceControlQueueAddress { get; set; }
149149

150150
public TimeSpan TimeToRestartAuditIngestionAfterFailure { get; set; }

src/ServiceControl.Monitoring.UnitTests/ApprovalFiles/SettingsTests.PlatformSampleSettings.approved.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@
1414
"HttpPort": "9999",
1515
"EndpointUptimeGracePeriod": "00:00:40",
1616
"RootUrl": "http://localhost:9999/",
17-
"MaximumConcurrencyLevel": 32,
17+
"MaximumConcurrencyLevel": null,
1818
"ServiceControlThroughputDataQueue": "ServiceControl.ThroughputData"
1919
}

src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,11 @@ static void ConfigureEndpoint(EndpointConfiguration config, Func<ICriticalErrorC
8989

9090
config.GetSettings().Set(settings);
9191
config.SetDiagnosticsPath(settings.LoggingSettings.LogPath);
92-
config.LimitMessageProcessingConcurrencyTo(settings.MaximumConcurrencyLevel);
92+
if (!transportSettings.MaxConcurrency.HasValue)
93+
{
94+
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
95+
}
96+
config.LimitMessageProcessingConcurrencyTo(transportSettings.MaxConcurrency.Value);
9397

9498
config.UseSerialization<SystemJsonSerializer>();
9599
config.UsePersistence<NonDurablePersistence>();

src/ServiceControl.Monitoring/Settings.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public Settings(LoggingSettings loggingSettings = null, string transportType = n
3737
}
3838

3939
EndpointUptimeGracePeriod = TimeSpan.Parse(SettingsReader.Read(SettingsRootNamespace, "EndpointUptimeGracePeriod", "00:00:40"));
40-
MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultMonitoringMaximumConcurrencyLevel ?? 32);
40+
MaximumConcurrencyLevel = SettingsReader.Read<int?>(SettingsRootNamespace, "MaximumConcurrencyLevel");
4141
ServiceControlThroughputDataQueue = SettingsReader.Read(SettingsRootNamespace, "ServiceControlThroughputDataQueue", "ServiceControl.ThroughputData");
4242

4343
AssemblyLoadContextResolver = static assemblyPath => new PluginAssemblyLoadContext(assemblyPath);
@@ -64,7 +64,7 @@ public Settings(LoggingSettings loggingSettings = null, string transportType = n
6464

6565
public string RootUrl => $"http://{HttpHostName}:{HttpPort}/";
6666

67-
public int MaximumConcurrencyLevel { get; set; }
67+
public int? MaximumConcurrencyLevel { get; set; }
6868

6969
public string ServiceControlThroughputDataQueue { get; set; }
7070

src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,27 @@
1010

1111
public class PostgreSqlTransportCustomization : TransportCustomization<PostgreSqlTransport>
1212
{
13-
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) =>
13+
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings)
14+
{
1415
transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
1516

16-
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) =>
17+
transportSettings.MaxConcurrency ??= 10;
18+
}
19+
20+
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings)
21+
{
1722
transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
1823

19-
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) =>
24+
transportSettings.MaxConcurrency ??= 10;
25+
}
26+
27+
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings)
28+
{
2029
transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
2130

31+
transportSettings.MaxConcurrency ??= 10;
32+
}
33+
2234
protected override void AddTransportForPrimaryCore(IServiceCollection services,
2335
TransportSettings transportSettings) =>
2436
services.AddSingleton<IBrokerThroughputQuery, PostgreSqlQuery>();

src/ServiceControl.Transports.PostgreSql/transport.manifest

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
"DisplayName": "PostgreSQL",
66
"AssemblyName": "ServiceControl.Transports.PostgreSql",
77
"TypeName": "ServiceControl.Transports.PostgreSql.PostgreSqlTransportCustomization, ServiceControl.Transports.PostgreSql",
8-
"DefaultPrimaryMaximumConcurrencyLevel": 10,
9-
"DefaultAuditMaximumConcurrencyLevel": 10,
10-
"DefaultMonitoringMaximumConcurrencyLevel": 10,
118
"SampleConnectionString": "Server=<ServerName>;Database=nservicebus;Port=5432;User Id=<Username>;Password=<Password>;Queue Schema=myschema;Subscriptions Table=schema.tablename",
129
"AvailableInSCMU": true,
1310
"Help": "Specify optional 'Queue Schema' to override the default schema. Specify optional 'Subscriptions Table' to override the default subscriptions table location."

src/ServiceControl.Transports.SqlServer/SqlServerTransportCustomization.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,25 @@ protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfigurati
1616
transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
1717
var routing = new RoutingSettings(endpointConfiguration.GetSettings());
1818
routing.EnableMessageDrivenPubSubCompatibilityMode();
19+
20+
transportSettings.MaxConcurrency ??= 10;
1921
}
2022

2123
//Do not EnableMessageDrivenPubSubCompatibilityMode for send-only endpoint
22-
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, SqlServerTransport transportDefinition, TransportSettings transportSettings) =>
24+
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, SqlServerTransport transportDefinition, TransportSettings transportSettings)
25+
{
2326
transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
2427

25-
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, SqlServerTransport transportDefinition, TransportSettings transportSettings) =>
28+
transportSettings.MaxConcurrency ??= 10;
29+
}
30+
31+
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, SqlServerTransport transportDefinition, TransportSettings transportSettings)
32+
{
2633
transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
2734

35+
transportSettings.MaxConcurrency ??= 10;
36+
}
37+
2838
protected override void AddTransportForPrimaryCore(IServiceCollection services,
2939
TransportSettings transportSettings)
3040
{

0 commit comments

Comments
 (0)