33namespace NServiceBus . Transport . RabbitMQ
44{
55 using System ;
6- using System . Collections . Concurrent ;
76 using System . Threading ;
87 using System . Threading . Tasks ;
98 using global ::RabbitMQ . Client ;
109 using global ::RabbitMQ . Client . Events ;
1110 using Logging ;
1211
13- class ChannelProvider : IAsyncDisposable
12+ class ChannelProvider ( ConnectionFactory connectionFactory , TimeSpan retryDelay , IRoutingTopology routingTopology )
13+ : IAsyncDisposable
1414 {
15- public ChannelProvider ( ConnectionFactory connectionFactory , TimeSpan retryDelay , IRoutingTopology routingTopology )
16- {
17- this . connectionFactory = connectionFactory ;
18- this . retryDelay = retryDelay ;
19-
20- this . routingTopology = routingTopology ;
21-
22- channels = new ConcurrentQueue < ConfirmsAwareChannel > ( ) ;
23- }
24-
2515 public async Task Initialize ( CancellationToken cancellationToken = default ) => connection = await CreateConnectionWithShutdownListener ( cancellationToken ) . ConfigureAwait ( false ) ;
2616
2717 async Task < IConnection > CreateConnectionWithShutdownListener ( CancellationToken cancellationToken )
@@ -60,7 +50,7 @@ async Task ReconnectSwallowingExceptions(string? connectionName, CancellationTok
6050
6151 var newConnection = await CreateConnectionWithShutdownListener ( cancellationToken ) . ConfigureAwait ( false ) ;
6252
63- // A race condition is possible where CreatePublishConnection is invoked during Dispose
53+ // A race condition is possible where CreatePublishConnection is invoked during Dispose
6454 // where the returned connection isn't disposed so invoking Dispose to be sure
6555 if ( cancellationToken . IsCancellationRequested )
6656 {
@@ -94,32 +84,57 @@ protected virtual void FireAndForget(Func<CancellationToken, Task> action, Cance
9484
9585 public async ValueTask < ConfirmsAwareChannel > GetPublishChannel ( CancellationToken cancellationToken = default )
9686 {
97- if ( channels . TryDequeue ( out var channel ) && ! channel . IsClosed )
87+ if ( publishChannel is { IsOpen : true } )
9888 {
99- return channel ;
89+ return publishChannel ;
10090 }
10191
102- if ( channel is not null )
92+ try
10393 {
104- await channel . DisposeAsync ( )
105- . ConfigureAwait ( false ) ;
106- }
94+ await publishChannelSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
95+ if ( publishChannel is { IsOpen : true } )
96+ {
97+ return publishChannel ;
98+ }
10799
108- channel = new ConfirmsAwareChannel ( connection , routingTopology ) ;
109- await channel . Initialize ( cancellationToken ) . ConfigureAwait ( false ) ;
100+ var oldChannel = publishChannel ;
101+ if ( oldChannel is not null )
102+ {
103+ await oldChannel . DisposeAsync ( ) . ConfigureAwait ( false ) ;
104+ }
110105
111- return channel ;
106+ var newChannel = new ConfirmsAwareChannel ( connection , routingTopology ) ;
107+ await newChannel . Initialize ( cancellationToken ) . ConfigureAwait ( false ) ;
108+ publishChannel = newChannel ;
109+ return newChannel ;
110+ }
111+ finally
112+ {
113+ publishChannelSemaphore . Release ( ) ;
114+ }
112115 }
113116
114- public ValueTask ReturnPublishChannel ( ConfirmsAwareChannel channel , CancellationToken cancellationToken = default )
117+ public async ValueTask ReturnPublishChannel ( ConfirmsAwareChannel channel , CancellationToken cancellationToken = default )
115118 {
116119 if ( channel . IsOpen )
117120 {
118- channels . Enqueue ( channel ) ;
119- return ValueTask . CompletedTask ;
121+ return ;
120122 }
121123
122- return channel . DisposeAsync ( ) ;
124+ try
125+ {
126+ await publishChannelSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
127+
128+ if ( ReferenceEquals ( publishChannel , channel ) )
129+ {
130+ await channel . DisposeAsync ( ) . ConfigureAwait ( false ) ;
131+ publishChannel = null ;
132+ }
133+ }
134+ finally
135+ {
136+ publishChannelSemaphore . Release ( ) ;
137+ }
123138 }
124139
125140#pragma warning disable PS0018
@@ -137,20 +152,19 @@ public async ValueTask DisposeAsync()
137152 var oldConnection = Interlocked . Exchange ( ref connection , null ) ;
138153 oldConnection ? . Dispose ( ) ;
139154
140- foreach ( var channel in channels )
155+ var oldChannel = Interlocked . Exchange ( ref publishChannel , null ) ;
156+ if ( oldChannel is not null )
141157 {
142- await channel . DisposeAsync ( ) . ConfigureAwait ( false ) ;
158+ await oldChannel . DisposeAsync ( ) . ConfigureAwait ( false ) ;
143159 }
144160
145161 disposed = true ;
146162 }
147163
148- readonly ConnectionFactory connectionFactory ;
149- readonly TimeSpan retryDelay ;
150- readonly IRoutingTopology routingTopology ;
151- readonly ConcurrentQueue < ConfirmsAwareChannel > channels ;
152164 readonly CancellationTokenSource stoppingTokenSource = new ( ) ;
153165 volatile IConnection ? connection ;
166+ readonly SemaphoreSlim publishChannelSemaphore = new ( 1 , 1 ) ;
167+ volatile ConfirmsAwareChannel ? publishChannel ;
154168 bool disposed ;
155169
156170 static readonly ILog Logger = LogManager . GetLogger ( typeof ( ChannelProvider ) ) ;
0 commit comments