Skip to content

Commit e24f2b1

Browse files
Travis Nickelsbording
authored andcommitted
Allow RabbitMQQuery to access the transport
1 parent d5f6979 commit e24f2b1

File tree

7 files changed

+65
-28
lines changed

7 files changed

+65
-28
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace ServiceControl.Transports.RabbitMQ
2+
{
3+
using NServiceBus;
4+
5+
public interface IRabbitMQTransportExtensions
6+
{
7+
RabbitMQTransport GetTransport();
8+
}
9+
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
using NServiceBus;
1010

1111
public abstract class RabbitMQConventionalRoutingTransportCustomization(QueueType queueType)
12-
: TransportCustomization<RabbitMQTransport>
12+
: TransportCustomization<RabbitMQTransport>, IRabbitMQTransportExtensions
1313
{
14+
15+
RabbitMQTransport rabbitMQTransport;
16+
1417
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
1518

1619
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
@@ -36,14 +39,13 @@ protected override RabbitMQTransport CreateTransport(TransportSettings transport
3639
transport.ManagementApiUrl = GetValue(connectionStringDictionary, "ManagementApiUrl", string.Empty);
3740
transport.UseManagementApi = disableManagementApi.Equals("false", StringComparison.OrdinalIgnoreCase);
3841

42+
rabbitMQTransport = transport;
43+
3944
return transport;
4045
}
4146

42-
protected override void AddTransportForPrimaryCore(IServiceCollection services,
43-
TransportSettings transportSettings)
44-
{
45-
services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
46-
}
47+
protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
48+
=> services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
4749

4850
protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
4951
{
@@ -52,8 +54,15 @@ protected sealed override void AddTransportForMonitoringCore(IServiceCollection
5254
}
5355

5456
static string GetValue(Dictionary<string, string> dictionary, string key, string defaultValue)
57+
=> dictionary.TryGetValue(key, out var value) ? value : defaultValue;
58+
59+
RabbitMQTransport IRabbitMQTransportExtensions.GetTransport()
5560
{
56-
return dictionary.TryGetValue(key, out var value) ? value : defaultValue;
61+
if (rabbitMQTransport == null)
62+
{
63+
throw new InvalidOperationException("Transport instance has not been created yet. Make sure CreateTransport() is called before accessing the transport.");
64+
};
65+
return rabbitMQTransport;
5766
}
5867
}
5968
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
using Microsoft.Extensions.DependencyInjection;
99
using NServiceBus;
1010

11-
public abstract class RabbitMQDirectRoutingTransportCustomization : TransportCustomization<RabbitMQTransport>
11+
public abstract class RabbitMQDirectRoutingTransportCustomization(QueueType queueType)
12+
: TransportCustomization<RabbitMQTransport>, IRabbitMQTransportExtensions
1213
{
13-
readonly QueueType queueType;
14-
15-
protected RabbitMQDirectRoutingTransportCustomization(QueueType queueType) => this.queueType = queueType;
14+
RabbitMQTransport rabbitMQTransport;
1615

1716
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
1817

@@ -38,14 +37,13 @@ protected override RabbitMQTransport CreateTransport(TransportSettings transport
3837
transport.ManagementApiUrl = GetValue(connectionStringDictionary, "ManagementApiUrl", string.Empty);
3938
transport.UseManagementApi = disableManagementApi.Equals("false", StringComparison.OrdinalIgnoreCase);
4039

40+
rabbitMQTransport = transport;
41+
4142
return transport;
4243
}
4344

44-
protected override void AddTransportForPrimaryCore(IServiceCollection services,
45-
TransportSettings transportSettings)
46-
{
47-
services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
48-
}
45+
protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
46+
=> services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
4947

5048
protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
5149
{
@@ -54,8 +52,15 @@ protected sealed override void AddTransportForMonitoringCore(IServiceCollection
5452
}
5553

5654
static string GetValue(Dictionary<string, string> dictionary, string key, string defaultValue)
55+
=> dictionary.TryGetValue(key, out var value) ? value : defaultValue;
56+
57+
RabbitMQTransport IRabbitMQTransportExtensions.GetTransport()
5758
{
58-
return dictionary.TryGetValue(key, out var value) ? value : defaultValue;
59+
if (rabbitMQTransport == null)
60+
{
61+
throw new InvalidOperationException("Transport instance has not been created yet. Make sure CreateTransport() is called before accessing the transport.");
62+
};
63+
return rabbitMQTransport;
5964
}
6065
}
6166
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace ServiceControl.Transports.RabbitMQ;
1616
using System.Threading.Tasks;
1717
using System.Web;
1818
using Microsoft.Extensions.Logging;
19+
using NServiceBus;
1920
using Polly;
2021
using Polly.Retry;
2122
using ServiceControl.Transports.BrokerThroughput;
@@ -30,23 +31,35 @@ public class RabbitMQQuery : BrokerThroughputQuery
3031
readonly ILogger<RabbitMQQuery> logger;
3132
readonly TimeProvider timeProvider;
3233
readonly ConnectionConfiguration connectionConfiguration;
33-
readonly string connectionString;
34+
readonly TransportSettings transportSettings;
35+
readonly RabbitMQTransport rabbitMQTransport;
3436

3537
public RabbitMQQuery(ILogger<RabbitMQQuery> logger,
3638
TimeProvider timeProvider,
37-
TransportSettings transportSettings) : base(logger, "RabbitMQ")
39+
TransportSettings transportSettings,
40+
ITransportCustomization transportCustomization) : base(logger, "RabbitMQ")
3841
{
3942
this.logger = logger;
4043
this.timeProvider = timeProvider;
41-
connectionString = transportSettings.ConnectionString;
44+
this.transportSettings = transportSettings;
45+
if (transportCustomization is IRabbitMQTransportExtensions rabbitMQTransportCustomization)
46+
{
47+
rabbitMQTransport = rabbitMQTransportCustomization.GetTransport();
48+
}
49+
else
50+
{
51+
throw new InvalidOperationException($"Expected a RabbitMQTransport but received {transportCustomization.GetType().Name}.");
52+
}
4253

43-
connectionConfiguration = ConnectionConfiguration.Create(connectionString, string.Empty);
54+
connectionConfiguration = ConnectionConfiguration.Create(transportSettings.ConnectionString, string.Empty);
4455
}
4556

4657
protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
4758
{
4859
var mangementApiUrl = GetManagementApiUrl();
4960

61+
// The licensing component configurations take precedence over the management API connection string configuration options
62+
// https://docs.particular.net/servicecontrol/servicecontrol-instances/configuration#usage-reporting-when-using-the-rabbitmq-transport
5063
var userName = GetSettingsValue(settings, RabbitMQSettings.UserName, mangementApiUrl.UserName);
5164
var password = GetSettingsValue(settings, RabbitMQSettings.Password, mangementApiUrl.Password);
5265
var apiUrl = GetSettingsValue(settings, RabbitMQSettings.API, mangementApiUrl.Uri.AbsoluteUri);
@@ -107,7 +120,7 @@ string GetSettingsValue(ReadOnlyDictionary<string, string> settings, string key,
107120

108121
UriBuilder GetManagementApiUrl()
109122
{
110-
var dictionary = ConnectionConfiguration.ParseNServiceBusConnectionString(connectionString, new StringBuilder());
123+
var dictionary = ConnectionConfiguration.ParseNServiceBusConnectionString(transportSettings.ConnectionString, new StringBuilder());
111124
UriBuilder uriBuilder;
112125

113126
var managementApiUrl = GetValue(dictionary, "ManagementApiUrl", "");

src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public async Task GetQueueNames_FindsQueues()
2828
MaxConcurrency = 1,
2929
EndpointName = Guid.NewGuid().ToString("N")
3030
};
31-
var query = new RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, provider, transportSettings);
31+
var query = new RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, provider, transportSettings, configuration.TransportCustomization);
3232
string[] additionalQueues = Enumerable.Range(1, 10).Select(i => $"myqueue{i}").ToArray();
3333
await configuration.TransportCustomization.ProvisionQueues(transportSettings, additionalQueues);
3434

src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void Initialise()
3232
MaxConcurrency = 1,
3333
EndpointName = Guid.NewGuid().ToString("N")
3434
};
35-
query = new RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, provider, transportSettings);
35+
query = new RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, provider, transportSettings, configuration.TransportCustomization);
3636
}
3737

3838
[Test]

src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace ServiceControl.Transport.Tests;
1414
using System.Net;
1515

1616
[TestFixture]
17-
class RabbitMQQuery_ResponseParsing_Tests
17+
class RabbitMQQuery_ResponseParsing_Tests : TransportTestFixture
1818
{
1919
FakeTimeProvider provider;
2020
TransportSettings transportSettings;
@@ -35,7 +35,7 @@ public void SetUp()
3535
httpHandler = new FakeHttpHandler();
3636
var httpClient = new HttpClient(httpHandler) { BaseAddress = new Uri("http://localhost:15672") };
3737

38-
rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient);
38+
rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient, configuration.TransportCustomization);
3939
rabbitMQQuery.Initialize(ReadOnlyDictionary<string, string>.Empty);
4040
}
4141

@@ -131,8 +131,9 @@ public async Task Should_fetch_queue_details_in_old_format()
131131
sealed class TestableRabbitMQQuery(
132132
TimeProvider timeProvider,
133133
TransportSettings transportSettings,
134-
HttpClient customHttpClient)
135-
: RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, timeProvider, transportSettings)
134+
HttpClient customHttpClient,
135+
ITransportCustomization transportCustomization)
136+
: RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, timeProvider, transportSettings, transportCustomization)
136137
{
137138
protected override HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => customHttpClient;
138139
}

0 commit comments

Comments
 (0)