Skip to content

Commit fa1dc4b

Browse files
committed
Propagate token into wrapper
1 parent a0b3a03 commit fa1dc4b

File tree

5 files changed

+20
-18
lines changed

5 files changed

+20
-18
lines changed

projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,28 +78,29 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs
7878
private const string ContextString = "context";
7979
private const string ConsumerString = "consumer";
8080

81+
// TODO Why is this public when there is a build method?
8182
public CallbackExceptionEventArgs(IDictionary<string, object> detail, Exception exception, CancellationToken cancellationToken = default)
8283
: base(detail, exception, cancellationToken)
8384
{
8485
}
8586

86-
public static CallbackExceptionEventArgs Build(Exception e, string context)
87+
public static CallbackExceptionEventArgs Build(Exception e, string context, CancellationToken cancellationToken = default)
8788
{
8889
var details = new Dictionary<string, object>(1)
8990
{
9091
{ContextString, context}
9192
};
92-
return new CallbackExceptionEventArgs(details, e);
93+
return new CallbackExceptionEventArgs(details, e, cancellationToken);
9394
}
9495

95-
public static CallbackExceptionEventArgs Build(Exception e, string context, object consumer)
96+
public static CallbackExceptionEventArgs Build(Exception e, string context, object consumer, CancellationToken cancellationToken = default)
9697
{
9798
var details = new Dictionary<string, object>(2)
9899
{
99100
{ContextString, context},
100101
{ConsumerString, consumer}
101102
};
102-
return new CallbackExceptionEventArgs(details, e);
103+
return new CallbackExceptionEventArgs(details, e, cancellationToken);
103104
}
104105
}
105106
}

projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using RabbitMQ.Client.Events;
45

@@ -9,11 +10,11 @@ internal struct AsyncEventingWrapper<T> where T : AsyncEventArgs
910
private event AsyncEventHandler<T>? _event;
1011
private Delegate[]? _handlers;
1112
private string? _context;
12-
private Func<Exception, string, Task>? _onException;
13+
private Func<Exception, string, CancellationToken, Task>? _onException;
1314

1415
public readonly bool IsEmpty => _event is null;
1516

16-
public AsyncEventingWrapper(string context, Func<Exception, string, Task> onException)
17+
public AsyncEventingWrapper(string context, Func<Exception, string, CancellationToken, Task> onException)
1718
{
1819
_event = null;
1920
_handlers = null;
@@ -34,7 +35,7 @@ public void RemoveHandler(AsyncEventHandler<T>? handler)
3435
}
3536

3637
// Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied)
37-
public Task InvokeAsync(object sender, T parameter)
38+
public Task InvokeAsync(object sender, T parameter, CancellationToken cancellationToken = default)
3839
{
3940
Delegate[]? handlers = _handlers;
4041
if (handlers is null)
@@ -48,10 +49,10 @@ public Task InvokeAsync(object sender, T parameter)
4849
_handlers = handlers;
4950
}
5051

51-
return InternalInvoke(handlers, sender, parameter);
52+
return InternalInvoke(handlers, sender, parameter, cancellationToken);
5253
}
5354

54-
private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter)
55+
private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter, CancellationToken cancellationToken)
5556
{
5657
foreach (AsyncEventHandler<T> action in handlers)
5758
{
@@ -64,7 +65,7 @@ await action(sender, parameter)
6465
{
6566
if (_onException != null)
6667
{
67-
await _onException(exception, _context!)
68+
await _onException(exception, _context!, cancellationToken)
6869
.ConfigureAwait(false);
6970
}
7071
else

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end
8383
_recoveringConsumerAsyncWrapper =
8484
new AsyncEventingWrapper<RecoveringConsumerEventArgs>("OnRecoveringConsumer", onExceptionAsync);
8585

86-
Task onExceptionAsync(Exception exception, string context) =>
87-
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
86+
Task onExceptionAsync(Exception exception, string context, CancellationToken cancellationToken) =>
87+
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
8888
}
8989

9090
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ protected ChannelBase(ConnectionConfig config, ISession session,
7979
ContinuationTimeout = config.ContinuationTimeout;
8080
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
8181
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
82-
Func<Exception, string, Task> onExceptionAsync = (exception, context) =>
83-
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
82+
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
83+
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
8484
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
8585
_basicNacksAsyncWrapper = new AsyncEventingWrapper<BasicNackEventArgs>("OnBasicNack", onExceptionAsync);
8686
_basicReturnAsyncWrapper = new AsyncEventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onExceptionAsync);
8787
_callbackExceptionAsyncWrapper =
88-
new AsyncEventingWrapper<CallbackExceptionEventArgs>(string.Empty, (exception, context) => Task.CompletedTask);
88+
new AsyncEventingWrapper<CallbackExceptionEventArgs>(string.Empty, (exception, context, cancellationToken) => Task.CompletedTask);
8989
_flowControlAsyncWrapper = new AsyncEventingWrapper<FlowControlEventArgs>("OnFlowControl", onExceptionAsync);
9090
_channelShutdownAsyncWrapper = new AsyncEventingWrapper<ShutdownEventArgs>("OnChannelShutdownAsync", onExceptionAsync);
9191
_recoveryAsyncWrapper = new AsyncEventingWrapper<AsyncEventArgs>("OnChannelRecovery", onExceptionAsync);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
6666

6767
_callbackExceptionAsyncWrapper =
6868
new AsyncEventingWrapper<CallbackExceptionEventArgs>(string.Empty,
69-
(exception, context) => Task.CompletedTask);
69+
(exception, context, cancellationToken) => Task.CompletedTask);
7070

7171
_connectionBlockedAsyncWrapper =
7272
new AsyncEventingWrapper<ConnectionBlockedEventArgs>("OnConnectionBlocked", onExceptionAsync);
@@ -89,8 +89,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
8989

9090
_mainLoopTask = Task.CompletedTask;
9191

92-
Task onExceptionAsync(Exception exception, string context) =>
93-
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
92+
Task onExceptionAsync(Exception exception, string context, CancellationToken cancellationToken) =>
93+
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
9494
}
9595

9696
public Guid Id => _id;

0 commit comments

Comments
 (0)