@@ -59,7 +59,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
59
59
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue ( ) ;
60
60
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim ( true ) ;
61
61
62
- private readonly object _confirmLock = new object ( ) ;
62
+ private object _confirmLock ;
63
63
private readonly LinkedList < ulong > _pendingDeliveryTags = new LinkedList < ulong > ( ) ;
64
64
65
65
private bool _onlyAcksReceived = true ;
@@ -183,7 +183,6 @@ public IBasicConsumer DefaultConsumer
183
183
184
184
public bool IsOpen => CloseReason is null ;
185
185
186
- // TODO add private bool for Confirm mode
187
186
public ulong NextPublishSeqNo { get ; private set ; }
188
187
189
188
public string CurrentQueue { get ; private set ; }
@@ -376,19 +375,22 @@ protected bool Enqueue(IRpcContinuation k)
376
375
}
377
376
}
378
377
379
- internal async Task < IChannel > OpenAsync ( )
378
+ internal async Task < IChannel > OpenAsync ( CancellationToken cancellationToken )
380
379
{
381
380
bool enqueued = false ;
382
381
var k = new ChannelOpenAsyncRpcContinuation ( ContinuationTimeout ) ;
383
382
384
- await _rpcSemaphore . WaitAsync ( k . CancellationToken )
383
+ using CancellationTokenSource lts =
384
+ CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken , k . CancellationToken ) ;
385
+
386
+ await _rpcSemaphore . WaitAsync ( lts . Token )
385
387
. ConfigureAwait ( false ) ;
386
388
try
387
389
{
388
390
enqueued = Enqueue ( k ) ;
389
391
390
392
var method = new ChannelOpen ( ) ;
391
- await ModelSendAsync ( method , k . CancellationToken )
393
+ await ModelSendAsync ( method , lts . Token )
392
394
. ConfigureAwait ( false ) ;
393
395
394
396
bool result = await k ;
@@ -416,6 +418,8 @@ internal void FinishClose()
416
418
m_connectionStartCell ? . TrySetResult ( null ) ;
417
419
}
418
420
421
+ private bool ConfirmsAreEnabled => _confirmLock != null ;
422
+
419
423
private async Task HandleCommandAsync ( IncomingCommand cmd , CancellationToken cancellationToken )
420
424
{
421
425
/*
@@ -475,17 +479,21 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
475
479
{
476
480
_continuationQueue . HandleChannelShutdown ( reason ) ;
477
481
_channelShutdownWrapper . Invoke ( this , reason ) ;
478
- lock ( _confirmLock )
482
+
483
+ if ( ConfirmsAreEnabled )
479
484
{
480
- if ( _confirmsTaskCompletionSources ? . Count > 0 )
485
+ lock ( _confirmLock )
481
486
{
482
- var exception = new AlreadyClosedException ( reason ) ;
483
- foreach ( var confirmsTaskCompletionSource in _confirmsTaskCompletionSources )
487
+ if ( _confirmsTaskCompletionSources ? . Count > 0 )
484
488
{
485
- confirmsTaskCompletionSource . TrySetException ( exception ) ;
486
- }
489
+ var exception = new AlreadyClosedException ( reason ) ;
490
+ foreach ( var confirmsTaskCompletionSource in _confirmsTaskCompletionSources )
491
+ {
492
+ confirmsTaskCompletionSource . TrySetException ( exception ) ;
493
+ }
487
494
488
- _confirmsTaskCompletionSources . Clear ( ) ;
495
+ _confirmsTaskCompletionSources . Clear ( ) ;
496
+ }
489
497
}
490
498
}
491
499
@@ -581,7 +589,7 @@ protected void HandleBasicNack(in IncomingCommand cmd)
581
589
protected void HandleAckNack ( ulong deliveryTag , bool multiple , bool isNack )
582
590
{
583
591
// No need to do this if publisher confirms have never been enabled.
584
- if ( NextPublishSeqNo > 0 )
592
+ if ( ConfirmsAreEnabled )
585
593
{
586
594
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
587
595
lock ( _confirmLock )
@@ -1017,7 +1025,7 @@ await ModelSendAsync(method, k.CancellationToken)
1017
1025
public async ValueTask BasicPublishAsync < TProperties > ( string exchange , string routingKey , TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
1018
1026
where TProperties : IReadOnlyBasicProperties , IAmqpHeader
1019
1027
{
1020
- if ( NextPublishSeqNo > 0 )
1028
+ if ( ConfirmsAreEnabled )
1021
1029
{
1022
1030
lock ( _confirmLock )
1023
1031
{
@@ -1047,7 +1055,7 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
1047
1055
}
1048
1056
catch
1049
1057
{
1050
- if ( NextPublishSeqNo > 0 )
1058
+ if ( ConfirmsAreEnabled )
1051
1059
{
1052
1060
lock ( _confirmLock )
1053
1061
{
@@ -1078,7 +1086,7 @@ public async void BasicPublish<TProperties>(CachedString exchange, CachedString
1078
1086
TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
1079
1087
where TProperties : IReadOnlyBasicProperties , IAmqpHeader
1080
1088
{
1081
- if ( NextPublishSeqNo > 0 )
1089
+ if ( ConfirmsAreEnabled )
1082
1090
{
1083
1091
lock ( _confirmLock )
1084
1092
{
@@ -1109,7 +1117,7 @@ public async void BasicPublish<TProperties>(CachedString exchange, CachedString
1109
1117
}
1110
1118
catch
1111
1119
{
1112
- if ( NextPublishSeqNo > 0 )
1120
+ if ( ConfirmsAreEnabled )
1113
1121
{
1114
1122
lock ( _confirmLock )
1115
1123
{
@@ -1126,7 +1134,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
1126
1134
TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
1127
1135
where TProperties : IReadOnlyBasicProperties , IAmqpHeader
1128
1136
{
1129
- if ( NextPublishSeqNo > 0 )
1137
+ if ( ConfirmsAreEnabled )
1130
1138
{
1131
1139
lock ( _confirmLock )
1132
1140
{
@@ -1157,7 +1165,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
1157
1165
}
1158
1166
catch
1159
1167
{
1160
- if ( NextPublishSeqNo > 0 )
1168
+ if ( ConfirmsAreEnabled )
1161
1169
{
1162
1170
lock ( _confirmLock )
1163
1171
{
@@ -1263,6 +1271,10 @@ await ModelSendAsync(method, k.CancellationToken)
1263
1271
bool result = await k ;
1264
1272
Debug . Assert ( result ) ;
1265
1273
1274
+ // Note:
1275
+ // Non-null means confirms are enabled
1276
+ _confirmLock = new object ( ) ;
1277
+
1266
1278
return ;
1267
1279
}
1268
1280
finally
@@ -1747,7 +1759,7 @@ await ModelSendAsync(method, k.CancellationToken)
1747
1759
1748
1760
public Task < bool > WaitForConfirmsAsync ( CancellationToken token = default )
1749
1761
{
1750
- if ( NextPublishSeqNo == 0UL )
1762
+ if ( false == ConfirmsAreEnabled )
1751
1763
{
1752
1764
throw new InvalidOperationException ( "Confirms not selected" ) ;
1753
1765
}
@@ -1821,17 +1833,15 @@ public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
1821
1833
await CloseAsync ( ea , false )
1822
1834
. ConfigureAwait ( false ) ;
1823
1835
}
1824
- catch ( TaskCanceledException )
1836
+ catch ( OperationCanceledException ex )
1825
1837
{
1826
1838
const string msg = "timed out waiting for acks" ;
1827
-
1828
- var ex = new IOException ( msg ) ;
1829
1839
var ea = new ShutdownEventArgs ( ShutdownInitiator . Library , Constants . ReplySuccess , msg , ex ) ;
1830
1840
1831
1841
await CloseAsync ( ea , false )
1832
1842
. ConfigureAwait ( false ) ;
1833
1843
1834
- throw ex ;
1844
+ throw ;
1835
1845
}
1836
1846
}
1837
1847
0 commit comments