Skip to content

Commit 2ed3075

Browse files
committed
Make SessionShutdown event async
* Replace `lock` with `SemaphoreSlim` * Pass cancellation token through recovery * Fix not waiting for async methods * Propagate cancellation token * Add `CancellationToken` to `AsyncEventHandler` arguments * Add async version of `ConnectionShutdown` event * Ignore `OperationCanceledException` if it is due to the consumer dispatcher shutting down * shellcheck fixes, allow pulling fresh docker images * More async methods * More changes around cancellation tokens * Switch to TrySetException * Comment-out TestDisposedWithSocketClosedOutOfBand flaky test
1 parent bf86447 commit 2ed3075

38 files changed

+498
-351
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ else
3131
readonly run_toxiproxy='false'
3232
fi
3333

34+
if [[ $2 == 'pull' ]]
35+
then
36+
readonly docker_pull_args='--pull always'
37+
else
38+
readonly docker_pull_args=''
39+
fi
40+
3441
set -o nounset
3542

3643
declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
@@ -43,7 +50,8 @@ function start_toxiproxy
4350
# sudo ss -4nlp
4451
echo "[INFO] starting Toxiproxy server docker container"
4552
docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running"
46-
docker run --detach \
53+
# shellcheck disable=SC2086
54+
docker run --detach $docker_pull_args \
4755
--name "$toxiproxy_docker_name" \
4856
--hostname "$toxiproxy_docker_name" \
4957
--publish 8474:8474 \
@@ -58,7 +66,8 @@ function start_rabbitmq
5866
echo "[INFO] starting RabbitMQ server docker container"
5967
chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log"
6068
docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running"
61-
docker run --detach \
69+
# shellcheck disable=SC2086
70+
docker run --detach $docker_pull_args \
6271
--name "$rabbitmq_docker_name" \
6372
--hostname "$rabbitmq_docker_name" \
6473
--publish 5671:5671 \
@@ -101,7 +110,8 @@ function wait_rabbitmq
101110

102111
function get_rabbitmq_id
103112
{
104-
local rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
113+
local rabbitmq_docker_id
114+
rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
105115
echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'"
106116
if [[ -v GITHUB_OUTPUT ]]
107117
then

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
506506
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
507507
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
508508
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
509+
RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
509510
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
510511
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
511512
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
@@ -929,31 +930,31 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
929930
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
930931
~RabbitMQ.Client.ICredentialsRefresher.Register(RabbitMQ.Client.ICredentialsProvider provider, RabbitMQ.Client.ICredentialsRefresher.NotifyCredentialRefreshedAsync callback) -> RabbitMQ.Client.ICredentialsProvider
931932
~RabbitMQ.Client.TimerBasedCredentialRefresher.Register(RabbitMQ.Client.ICredentialsProvider provider, RabbitMQ.Client.ICredentialsRefresher.NotifyCredentialRefreshedAsync callback) -> RabbitMQ.Client.ICredentialsProvider
932-
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedBinding, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
933+
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedBinding, System.Exception, RabbitMQ.Client.IConnection, System.Threading.CancellationToken, System.Threading.Tasks.Task>
933934
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync.set -> void
934-
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedConsumer, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
935+
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedConsumer, System.Exception, RabbitMQ.Client.IConnection, System.Threading.CancellationToken, System.Threading.Tasks.Task>
935936
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandlerAsync.set -> void
936-
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedExchange, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
937+
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedExchange, System.Exception, RabbitMQ.Client.IConnection, System.Threading.CancellationToken, System.Threading.Tasks.Task>
937938
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.set -> void
938-
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
939+
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.CancellationToken, System.Threading.Tasks.Task>
939940
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
940941
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
941942
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task<string>
942943
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
943944
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
944945
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
945-
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
946+
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
946947
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> System.Threading.Tasks.Task
947948
~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
948949
~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
949950
~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task<uint>
950951
~static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel channel, string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task
951-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task
952-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
953-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
954-
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
955-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task
956-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
957-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
958-
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
952+
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
953+
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
954+
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
955+
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
956+
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
957+
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
958+
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
959+
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
959960
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using RabbitMQ.Client.Events;
67
using RabbitMQ.Client.Impl;
@@ -141,7 +142,8 @@ public virtual async Task OnCancel(params string[] consumerTags)
141142
IsRunning = false;
142143
if (!_consumerCancelledWrapper.IsEmpty)
143144
{
144-
await _consumerCancelledWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags))
145+
// TODO cancellation token
146+
await _consumerCancelledWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags), CancellationToken.None)
145147
.ConfigureAwait(false);
146148
}
147149
foreach (string consumerTag in consumerTags)

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.client.impl;
3637

@@ -145,7 +146,7 @@ public static Task QueueUnbindAsync(this IChannel channel, string queue, string
145146
/// method does nothing but wait for the in-progress close
146147
/// operation to complete. This method will not return to the
147148
/// caller until the shutdown is complete.
148-
/// In comparison to normal <see cref="CloseAsync(IChannel)"/> method, <see cref="AbortAsync(IChannel)"/> will not throw
149+
/// In comparison to normal <see cref="CloseAsync(IChannel, CancellationToken)"/> method, <see cref="AbortAsync(IChannel)"/> will not throw
149150
/// <see cref="Exceptions.AlreadyClosedException"/> or <see cref="System.IO.IOException"/> or any other <see cref="Exception"/> during closing channel.
150151
/// </remarks>
151152
public static Task AbortAsync(this IChannel channel)
@@ -160,9 +161,10 @@ public static Task AbortAsync(this IChannel channel)
160161
/// operation to complete. This method will not return to the
161162
/// caller until the shutdown is complete.
162163
/// </remarks>
163-
public static Task CloseAsync(this IChannel channel)
164+
public static Task CloseAsync(this IChannel channel, CancellationToken cancellationToken = default)
164165
{
165-
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false);
166+
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false,
167+
cancellationToken);
166168
}
167169

168170
/// <summary>

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,16 @@ public interface IConnection : INetworkConnection, IDisposable
157157
/// </remarks>
158158
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
159159

160+
/// <summary>
161+
/// Asynchronously raised when the connection is destroyed.
162+
/// </summary>
163+
/// <remarks>
164+
/// If the connection is already destroyed at the time an
165+
/// event handler is added to this event, the event handler
166+
/// will be fired immediately.
167+
/// </remarks>
168+
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync;
169+
160170
/// <summary>
161171
/// Raised when the connection completes recovery.
162172
/// </summary>

0 commit comments

Comments
 (0)