Skip to content

Commit 75e87e4

Browse files
committed
Pass through token into args on declaration
1 parent c2f4c19 commit 75e87e4

File tree

6 files changed

+31
-21
lines changed

6 files changed

+31
-21
lines changed

projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ public AsyncEventArgs(CancellationToken cancellationToken = default)
6868
/// </summary>
6969
public CancellationToken CancellationToken { get; }
7070

71+
public static AsyncEventArgs CreateOrDefault(CancellationToken cancellationToken)
72+
{
73+
if (cancellationToken.CanBeCanceled)
74+
{
75+
return new AsyncEventArgs(cancellationToken);
76+
}
77+
78+
return Empty;
79+
}
80+
7181
/// <summary>
7282
/// Provides a value to use with events that do not have event data.
7383
/// </summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
9898
case ProtocolCommandId.BasicReturn:
9999
{
100100
// Note: always returns true
101-
return HandleBasicReturn(cmd);
101+
return HandleBasicReturn(cmd, cancellationToken);
102102
}
103103
case ProtocolCommandId.ChannelClose:
104104
{

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
213213
.ConfigureAwait(false);
214214
}
215215

216-
await _innerChannel.RunRecoveryEventHandlers(this)
216+
await _innerChannel.RunRecoveryEventHandlers(this, cancellationToken)
217217
.ConfigureAwait(false);
218218

219219
return true;

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c
206206

207207
ESLog.Info("Connection recovery completed");
208208
ThrowIfDisposed();
209-
await _recoverySucceededAsyncWrapper.InvokeAsync(this, AsyncEventArgs.Empty)
209+
await _recoverySucceededAsyncWrapper.InvokeAsync(this, AsyncEventArgs.CreateOrDefault(cancellationToken))
210210
.ConfigureAwait(false);
211211

212212
return true;
@@ -272,7 +272,7 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
272272
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)
273273
{
274274
// Note: recordedEntities semaphore is _NOT_ held at this point
275-
await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e))
275+
await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e, cancellationToken))
276276
.ConfigureAwait(false);
277277
}
278278

@@ -386,7 +386,7 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
386386
try
387387
{
388388
_recordedEntitiesSemaphore.Release();
389-
await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName))
389+
await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName, cancellationToken))
390390
.ConfigureAwait(false);
391391
}
392392
finally
@@ -520,7 +520,7 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe
520520
try
521521
{
522522
_recordedEntitiesSemaphore.Release();
523-
await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments))
523+
await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments, cancellationToken))
524524
.ConfigureAwait(false);
525525
}
526526
finally
@@ -542,7 +542,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
542542
try
543543
{
544544
_recordedEntitiesSemaphore.Release();
545-
await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag))
545+
await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag, cancellationToken))
546546
.ConfigureAwait(false);
547547
}
548548
finally

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ public event AsyncEventHandler<AsyncEventArgs> RecoveryAsync
163163

164164
private AsyncEventingWrapper<AsyncEventArgs> _recoveryAsyncWrapper;
165165

166-
internal Task RunRecoveryEventHandlers(object sender)
166+
internal Task RunRecoveryEventHandlers(object sender, CancellationToken cancellationToken)
167167
{
168-
return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.Empty);
168+
return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.CreateOrDefault(cancellationToken));
169169
}
170170

171171
public int ChannelNumber => ((Session)Session).ChannelNumber;
@@ -494,7 +494,7 @@ await _channelShutdownAsyncWrapper.InvokeAsync(this, reason)
494494

495495
if (ConfirmsAreEnabled)
496496
{
497-
await _confirmSemaphore.WaitAsync()
497+
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
498498
.ConfigureAwait(false);
499499
try
500500
{
@@ -582,7 +582,7 @@ protected async Task<bool> HandleBasicAck(IncomingCommand cmd, CancellationToken
582582
var ack = new BasicAck(cmd.MethodSpan);
583583
if (!_basicAcksAsyncWrapper.IsEmpty)
584584
{
585-
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple);
585+
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken);
586586
await _basicAcksAsyncWrapper.InvokeAsync(this, args)
587587
.ConfigureAwait(false);
588588
}
@@ -598,7 +598,7 @@ protected async Task<bool> HandleBasicNack(IncomingCommand cmd, CancellationToke
598598
if (!_basicNacksAsyncWrapper.IsEmpty)
599599
{
600600
var args = new BasicNackEventArgs(
601-
nack._deliveryTag, nack._multiple, nack._requeue);
601+
nack._deliveryTag, nack._multiple, nack._requeue, cancellationToken);
602602
await _basicNacksAsyncWrapper.InvokeAsync(this, args)
603603
.ConfigureAwait(false);
604604
}
@@ -641,14 +641,14 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
641641
return deliveryTag;
642642
}
643643

644-
protected async Task<bool> HandleBasicReturn(IncomingCommand cmd)
644+
protected async Task<bool> HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken)
645645
{
646646
if (!_basicReturnAsyncWrapper.IsEmpty)
647647
{
648648
var basicReturn = new BasicReturn(cmd.MethodSpan);
649649
var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText,
650650
basicReturn._exchange, basicReturn._routingKey,
651-
new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory);
651+
new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken);
652652
await _basicReturnAsyncWrapper.InvokeAsync(this, e)
653653
.ConfigureAwait(false);
654654
}
@@ -713,7 +713,7 @@ await ModelSendAsync(in method, cancellationToken).
713713

714714
if (!_flowControlAsyncWrapper.IsEmpty)
715715
{
716-
await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active))
716+
await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active, cancellationToken))
717717
.ConfigureAwait(false);
718718
}
719719

@@ -723,7 +723,7 @@ await ModelSendAsync(in method, cancellationToken).
723723
protected async Task<bool> HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken)
724724
{
725725
string reason = new ConnectionBlocked(cmd.MethodSpan)._reason;
726-
await Session.Connection.HandleConnectionBlockedAsync(reason)
726+
await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken)
727727
.ConfigureAwait(false);
728728
return true;
729729
}
@@ -801,7 +801,7 @@ await k.HandleCommandAsync(cmd)
801801

802802
protected async Task<bool> HandleConnectionUnblockedAsync(CancellationToken cancellationToken)
803803
{
804-
await Session.Connection.HandleConnectionUnblockedAsync()
804+
await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken)
805805
.ConfigureAwait(false);
806806
return true;
807807
}

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,20 @@ internal void NotifyReceivedCloseOk()
5454
_closed = true;
5555
}
5656

57-
internal Task HandleConnectionBlockedAsync(string reason)
57+
internal Task HandleConnectionBlockedAsync(string reason, CancellationToken cancellationToken)
5858
{
5959
if (!_connectionBlockedAsyncWrapper.IsEmpty)
6060
{
61-
return _connectionBlockedAsyncWrapper.InvokeAsync(this, new ConnectionBlockedEventArgs(reason));
61+
return _connectionBlockedAsyncWrapper.InvokeAsync(this, new ConnectionBlockedEventArgs(reason, cancellationToken));
6262
}
6363
return Task.CompletedTask;
6464
}
6565

66-
internal Task HandleConnectionUnblockedAsync()
66+
internal Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken)
6767
{
6868
if (!_connectionUnblockedAsyncWrapper.IsEmpty)
6969
{
70-
return _connectionUnblockedAsyncWrapper.InvokeAsync(this, AsyncEventArgs.Empty);
70+
return _connectionUnblockedAsyncWrapper.InvokeAsync(this, AsyncEventArgs.CreateOrDefault(cancellationToken));
7171
}
7272
return Task.CompletedTask;
7373
}

0 commit comments

Comments
 (0)