Skip to content

Commit c1ae8e2

Browse files
Travis Nickelsbording
authored andcommitted
Use alpha version of RabbitMQ transport
1 parent f69913b commit c1ae8e2

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

src/Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
<PackageVersion Include="NServiceBus.Metrics" Version="5.0.1" />
3939
<PackageVersion Include="NServiceBus.Metrics.ServiceControl" Version="5.0.0" />
4040
<PackageVersion Include="NServiceBus.Persistence.NonDurable" Version="2.0.1" />
41-
<PackageVersion Include="NServiceBus.RabbitMQ" Version="9.2.0" />
41+
<PackageVersion Include="NServiceBus.RabbitMQ" Version="10.0.0-alpha.0.92" />
42+
<!--<PackageVersion Include="NServiceBus.RabbitMQ" Version="9.2.0" />-->
4243
<PackageVersion Include="NServiceBus.SagaAudit" Version="5.0.2" />
4344
<PackageVersion Include="NServiceBus.Testing" Version="9.0.1" />
4445
<PackageVersion Include="NServiceBus.Transport.AzureServiceBus" Version="5.0.0" />

src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
7575
{
7676
foreach (var endpointQueuePair in endpointQueues)
7777
{
78-
await queryExecutor.Execute(m =>
78+
await queryExecutor.Execute(async m =>
7979
{
8080
var queueName = endpointQueuePair.Value;
8181

8282
try
8383
{
84-
var size = (int)m.MessageCount(queueName);
84+
var size = (int)await m.MessageCountAsync(queueName, cancellationToken).ConfigureAwait(false);
8585

8686
sizes.AddOrUpdate(queueName, _ => size, (_, __) => size);
8787
}
@@ -120,7 +120,7 @@ public void Initialize()
120120
null); // value would come from config API in actual transport
121121
}
122122

123-
public async Task Execute(Action<IModel> action, CancellationToken cancellationToken = default)
123+
public async Task Execute(Action<IChannel> action, CancellationToken cancellationToken = default)
124124
{
125125
try
126126
{
@@ -132,14 +132,14 @@ public async Task Execute(Action<IModel> action, CancellationToken cancellationT
132132
await Task.Delay(ReconnectionDelay, cancellationToken);
133133
}
134134

135-
if (model == null || model.IsClosed)
135+
if (channel == null || channel.IsClosed)
136136
{
137-
model?.Dispose();
137+
channel?.Dispose();
138138

139-
model = connection.CreateModel();
139+
channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
140140
}
141141

142-
action(model);
142+
action(channel);
143143
}
144144
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
145145
{
@@ -154,7 +154,7 @@ public async Task Execute(Action<IModel> action, CancellationToken cancellationT
154154
public void Dispose() => connection?.Dispose();
155155

156156
IConnection connection;
157-
IModel model;
157+
IChannel channel;
158158
ConnectionFactory connectionFactory;
159159

160160
static readonly TimeSpan ReconnectionDelay = TimeSpan.FromSeconds(5);

0 commit comments

Comments
 (0)