-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathPostgreSqlTransportCustomization.cs
More file actions
96 lines (72 loc) · 4.22 KB
/
PostgreSqlTransportCustomization.cs
File metadata and controls
96 lines (72 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
namespace ServiceControl.Transports.PostgreSql;
using System.Linq;
using System.Runtime.CompilerServices;
using BrokerThroughput;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NServiceBus;
using NServiceBus.Transport.PostgreSql;
public class PostgreSqlTransportCustomization(ILogger<PostgreSqlTransportCustomization> logger) : TransportCustomization<PostgreSqlTransport>
{
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings)
{
transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
transportSettings.MaxConcurrency ??= 10;
}
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings)
{
transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
transportSettings.MaxConcurrency ??= 10;
}
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings)
{
transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
transportSettings.MaxConcurrency ??= 10;
}
protected override void AddTransportForPrimaryCore(IServiceCollection services,
TransportSettings transportSettings) =>
services.AddSingleton<IBrokerThroughputQuery, PostgreSqlQuery>();
protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}
protected override PostgreSqlTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
{
var connectionString = transportSettings.ConnectionString.RemoveCustomConnectionStringParts(out var customSchema, out var subscriptionsTableSetting);
var transport = new PostgreSqlTransport(connectionString);
var subscriptions = transport.Subscriptions;
if (customSchema != null)
{
transport.DefaultSchema = customSchema;
subscriptions.SubscriptionTableName = new SubscriptionTableName(DefaultSubscriptionTableName, customSchema);
}
if (subscriptionsTableSetting != null)
{
var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting);
subscriptions.SubscriptionTableName =
new SubscriptionTableName(subscriptionsAddress.Table,
subscriptionsAddress.Schema ?? customSchema);
}
if (transportSettings.GetOrDefault<bool>("TransportSettings.EnableDtc"))
{
logger.LogError("The EnableDtc setting is no longer supported natively within ServiceControl. If you require distributed transactions, you will have to use a Transport Adapter (https://docs.particular.net/servicecontrol/transport-adapter/)");
}
DisableDelayedDelivery(transport) = true;
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
return transport;
}
protected override string ToTransportQualifiedQueueNameCore(string queueName)
{
const string delimiter = "\"";
const string escapedDelimiter = delimiter + delimiter;
if (queueName.StartsWith(delimiter) || queueName.EndsWith(delimiter))
{
return queueName;
}
return delimiter + queueName.Replace(delimiter, escapedDelimiter) + delimiter;
}
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<DisableDelayedDelivery>k__BackingField")]
static extern ref bool DisableDelayedDelivery(PostgreSqlTransport transport);
const string DefaultSubscriptionTableName = "SubscriptionRouting";
}