@@ -184,12 +184,17 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
184
184
185
185
public IProtocol Protocol => Endpoint . Protocol ;
186
186
187
- public async ValueTask < RecoveryAwareChannel > CreateNonRecoveringChannelAsync ( ushort consumerDispatchConcurrency ,
187
+ public async ValueTask < RecoveryAwareChannel > CreateNonRecoveringChannelAsync (
188
+ bool publisherConfirmationsEnabled = false ,
189
+ bool publisherConfirmationTrackingEnabled = false ,
190
+ ushort ? consumerDispatchConcurrency = null ,
188
191
CancellationToken cancellationToken = default )
189
192
{
190
193
ISession session = InnerConnection . CreateSession ( ) ;
191
194
var result = new RecoveryAwareChannel ( _config , session , consumerDispatchConcurrency ) ;
192
- return ( RecoveryAwareChannel ) await result . OpenAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
195
+ return ( RecoveryAwareChannel ) await result . OpenAsync (
196
+ publisherConfirmationsEnabled , publisherConfirmationTrackingEnabled , cancellationToken )
197
+ . ConfigureAwait ( false ) ;
193
198
}
194
199
195
200
public override string ToString ( )
@@ -251,27 +256,29 @@ await CloseInnerConnectionAsync()
251
256
}
252
257
}
253
258
254
- public async Task < IChannel > CreateChannelAsync ( bool publisherConfirmations = false , bool publisherConfirmationTracking = false ,
255
- ushort ? consumerDispatchConcurrency = null , CancellationToken cancellationToken = default )
259
+ public async Task < IChannel > CreateChannelAsync ( bool publisherConfirmationsEnabled = false ,
260
+ bool publisherConfirmationTrackingEnabled = false ,
261
+ ushort ? consumerDispatchConcurrency = null ,
262
+ CancellationToken cancellationToken = default )
256
263
{
257
264
EnsureIsOpen ( ) ;
258
265
259
266
ushort cdc = consumerDispatchConcurrency . GetValueOrDefault ( _config . ConsumerDispatchConcurrency ) ;
260
267
261
- RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync ( cdc , cancellationToken )
268
+ RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync (
269
+ publisherConfirmationsEnabled , publisherConfirmationTrackingEnabled , cdc , cancellationToken )
262
270
. ConfigureAwait ( false ) ;
263
271
264
- AutorecoveringChannel channel = new AutorecoveringChannel ( this , recoveryAwareChannel , cdc ) ;
265
- if ( publisherConfirmations )
272
+ var autorecoveringChannel = new AutorecoveringChannel ( this , recoveryAwareChannel , cdc ) ;
273
+ if ( publisherConfirmationsEnabled )
266
274
{
267
- await channel . ConfirmSelectAsync ( trackConfirmations : publisherConfirmationTracking ,
268
- cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
275
+ // TODO yes, this is necessary, not sure why
276
+ await autorecoveringChannel . ConfirmSelectAsync ( publisherConfirmationTrackingEnabled , cancellationToken )
277
+ . ConfigureAwait ( false ) ;
269
278
}
270
-
271
- await RecordChannelAsync ( channel , channelsSemaphoreHeld : false , cancellationToken : cancellationToken )
279
+ await RecordChannelAsync ( autorecoveringChannel , channelsSemaphoreHeld : false , cancellationToken : cancellationToken )
272
280
. ConfigureAwait ( false ) ;
273
-
274
- return channel ;
281
+ return autorecoveringChannel ;
275
282
}
276
283
277
284
public void Dispose ( ) => DisposeAsync ( ) . AsTask ( ) . GetAwaiter ( ) . GetResult ( ) ;
0 commit comments