Skip to content

Commit be31dd6

Browse files
committed
Address more TODOs
* Remove `EventingBasicConsumer` and subclasses * Begin making `IAsyncBasicConsumer` callbacks async * Try to figure out what's up with `TestDisposedWithSocketClosedOutOfBand` * Listen for `ConnectionShutdown` in `TestDisposedWithSocketClosedOutOfBand` * Remove `SetSessionClosing` * Always abort connections and channels on Dispose * Pass cancellation token through consumer handler methods. * Remove `DispatchConsumersAsync` since all consumers will implement `IAsyncBasicConsumer` * Finish up passing cancellation tokens * Restore `IsOpen` check
1 parent eb5be9f commit be31dd6

File tree

55 files changed

+450
-1036
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+450
-1036
lines changed

projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace RabbitMQ.Benchmarks
88
{
9-
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer, IBasicConsumer
9+
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer
1010
{
1111
private readonly ManualResetEventSlim _autoResetEvent;
1212
private int _current;
@@ -18,64 +18,33 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
1818
_autoResetEvent = autoResetEvent;
1919
}
2020

21-
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
22-
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
21+
Task IAsyncBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
22+
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
23+
CancellationToken cancellationToken)
2324
{
2425
if (Interlocked.Increment(ref _current) == Count)
2526
{
2627
_current = 0;
2728
_autoResetEvent.Set();
2829
}
29-
return Task.CompletedTask;
30-
}
3130

32-
Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
33-
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
34-
{
35-
if (Interlocked.Increment(ref _current) == Count)
36-
{
37-
_current = 0;
38-
_autoResetEvent.Set();
39-
}
4031
return Task.CompletedTask;
4132
}
4233

43-
public Task HandleBasicCancel(string consumerTag) => Task.CompletedTask;
34+
Task IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag, CancellationToken _) => Task.CompletedTask;
4435

45-
public Task HandleBasicCancelOk(string consumerTag) => Task.CompletedTask;
36+
Task IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag, CancellationToken _) => Task.CompletedTask;
4637

47-
public Task HandleBasicConsumeOk(string consumerTag) => Task.CompletedTask;
38+
Task IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag, CancellationToken _) => Task.CompletedTask;
4839

49-
public Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) => Task.CompletedTask;
40+
Task IAsyncBasicConsumer.HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason, CancellationToken _) => Task.CompletedTask;
5041

5142
public IChannel Channel { get; }
5243

53-
event EventHandler<ConsumerEventArgs> IBasicConsumer.ConsumerCancelled
54-
{
55-
add { }
56-
remove { }
57-
}
58-
5944
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
6045
{
6146
add { }
6247
remove { }
6348
}
64-
65-
void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
66-
{
67-
}
68-
69-
void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
70-
{
71-
}
72-
73-
void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason)
74-
{
75-
}
76-
77-
void IBasicConsumer.HandleBasicCancel(string consumerTag)
78-
{
79-
}
8049
}
8150
}

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _ex
6464
public async Task SetUpConsumer()
6565
{
6666
_consumer.Count = Count;
67-
_dispatcher = new ConsumerDispatcher(null, Concurrency);
67+
_dispatcher = new AsyncConsumerDispatcher(null, Concurrency);
6868
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
6969
}
7070

projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ public static async Task Publish_Hello_World(IConnection connection, uint messag
1616
{
1717
QueueDeclareOk queue = await channel.QueueDeclareAsync();
1818
int consumed = 0;
19-
var consumer = new EventingBasicConsumer(channel);
20-
consumer.Received += (s, args) =>
19+
var consumer = new AsyncEventingBasicConsumer(channel);
20+
consumer.Received += (s, args, ct) =>
2121
{
2222
if (Interlocked.Increment(ref consumed) == messageCount)
2323
{
2424
tcs.SetResult(true);
2525
}
26+
27+
return Task.CompletedTask;
2628
};
29+
2730
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);
2831

2932
for (int i = 0; i < messageCount; i++)

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 25 additions & 65 deletions
Large diffs are not rendered by default.

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 23 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88

99
namespace RabbitMQ.Client
1010
{
11-
public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
11+
public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer
1212
{
1313
private readonly HashSet<string> _consumerTags = new HashSet<string>();
1414

1515
/// <summary>
16-
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
16+
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
1717
/// </summary>
1818
public AsyncDefaultBasicConsumer()
1919
{
@@ -70,28 +70,31 @@ public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
7070
/// <summary>
7171
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
7272
/// e.g. the queue has been deleted (either by this channel or by any other channel).
73-
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
73+
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
7474
/// </summary>
7575
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
76-
public virtual Task HandleBasicCancel(string consumerTag)
76+
/// <param name="cancellationToken">The cancellation token for this operation.</param>
77+
public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
7778
{
78-
return OnCancel(consumerTag);
79+
return OnCancelAsync(cancellationToken, consumerTag);
7980
}
8081

8182
/// <summary>
8283
/// Called upon successful deregistration of the consumer from the broker.
8384
/// </summary>
8485
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
85-
public virtual Task HandleBasicCancelOk(string consumerTag)
86+
/// <param name="cancellationToken">The cancellation token for this operation.</param>
87+
public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
8688
{
87-
return OnCancel(consumerTag);
89+
return OnCancelAsync(cancellationToken, consumerTag);
8890
}
8991

9092
/// <summary>
9193
/// Called upon successful registration of the consumer with the broker.
9294
/// </summary>
9395
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
94-
public virtual Task HandleBasicConsumeOk(string consumerTag)
96+
/// <param name="cancellationToken">The cancellation token for this operation.</param>
97+
public virtual Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken)
9598
{
9699
_consumerTags.Add(consumerTag);
97100
IsRunning = true;
@@ -108,13 +111,14 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
108111
/// Accessing the body at a later point is unsafe as its memory can
109112
/// be already released.
110113
/// </remarks>
111-
public virtual Task HandleBasicDeliver(string consumerTag,
114+
public virtual Task HandleBasicDeliverAsync(string consumerTag,
112115
ulong deliveryTag,
113116
bool redelivered,
114117
string exchange,
115118
string routingKey,
116-
in ReadOnlyBasicProperties properties,
117-
ReadOnlyMemory<byte> body)
119+
ReadOnlyBasicProperties properties,
120+
ReadOnlyMemory<byte> body,
121+
CancellationToken cancellationToken)
118122
{
119123
// Nothing to do here.
120124
return Task.CompletedTask;
@@ -125,63 +129,33 @@ public virtual Task HandleBasicDeliver(string consumerTag,
125129
/// </summary>
126130
/// <param name="channel">A channel this consumer was registered on.</param>
127131
/// <param name="reason">Shutdown context.</param>
128-
public virtual Task HandleChannelShutdown(object channel, ShutdownEventArgs reason)
132+
/// <param name="cancellationToken">The cancellation token for this operation.</param>
133+
public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason,
134+
CancellationToken cancellationToken)
129135
{
130136
ShutdownReason = reason;
131-
return OnCancel(_consumerTags.ToArray());
137+
return OnCancelAsync(cancellationToken, _consumerTags.ToArray());
132138
}
133139

134140
/// <summary>
135141
/// Default implementation - overridable in subclasses.</summary>
136-
/// <param name="consumerTags">The set of consumer tags that where cancelled</param>
137142
/// <remarks>
143+
/// <param name="cancellationToken">The cancellation token for this operation.</param>
144+
/// <param name="consumerTags">The set of consumer tags that where cancelled</param>
138145
/// This default implementation simply sets the <see cref="IsRunning"/> property to false, and takes no further action.
139146
/// </remarks>
140-
public virtual async Task OnCancel(params string[] consumerTags)
147+
public virtual async Task OnCancelAsync(CancellationToken cancellationToken, params string[] consumerTags)
141148
{
142149
IsRunning = false;
143150
if (!_consumerCancelledWrapper.IsEmpty)
144151
{
145-
// TODO cancellation token
146-
await _consumerCancelledWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags), CancellationToken.None)
152+
await _consumerCancelledWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags), cancellationToken)
147153
.ConfigureAwait(false);
148154
}
149155
foreach (string consumerTag in consumerTags)
150156
{
151157
_consumerTags.Remove(consumerTag);
152158
}
153159
}
154-
155-
event EventHandler<ConsumerEventArgs> IBasicConsumer.ConsumerCancelled
156-
{
157-
add { throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); }
158-
remove { throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); }
159-
}
160-
161-
void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
162-
{
163-
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
164-
}
165-
166-
void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
167-
{
168-
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
169-
}
170-
171-
Task IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
172-
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
173-
{
174-
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
175-
}
176-
177-
void IBasicConsumer.HandleChannelShutdown(object channel, ShutdownEventArgs reason)
178-
{
179-
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
180-
}
181-
182-
void IBasicConsumer.HandleBasicCancel(string consumerTag)
183-
{
184-
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
185-
}
186160
}
187161
}

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,7 @@ public sealed class ConnectionConfig
132132
public readonly TimeSpan RequestedConnectionTimeout;
133133

134134
/// <summary>
135-
/// Set to true will enable an asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/>.
136-
/// </summary>
137-
public readonly bool DispatchConsumersAsync;
138-
139-
/// <summary>
140-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
135+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
141136
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
142137
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
143138
/// </summary>
@@ -152,7 +147,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
152147
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled,
153148
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
154149
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
155-
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
150+
int dispatchConsumerConcurrency,
156151
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
157152
{
158153
VirtualHost = virtualHost;
@@ -173,7 +168,6 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
173168
ContinuationTimeout = continuationTimeout;
174169
HandshakeContinuationTimeout = handshakeContinuationTimeout;
175170
RequestedConnectionTimeout = requestedConnectionTimeout;
176-
DispatchConsumersAsync = dispatchConsumersAsync;
177171
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
178172
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
179173
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
175175
public bool AutomaticRecoveryEnabled { get; set; } = true;
176176

177177
/// <summary>
178-
/// Set to true will enable an asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/>.
179-
/// Defaults to false.
180-
/// </summary>
181-
public bool DispatchConsumersAsync { get; set; } = false;
182-
183-
/// <summary>
184-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
178+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
185179
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
186180
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
187181
/// Defaults to 1.
@@ -610,7 +604,6 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
610604
ContinuationTimeout,
611605
HandshakeContinuationTimeout,
612606
RequestedConnectionTimeout,
613-
DispatchConsumersAsync,
614607
ConsumerDispatchConcurrency,
615608
CreateFrameHandlerAsync);
616609
}

0 commit comments

Comments
 (0)