diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index a10140481..071cd537e 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching { internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase { - internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency) + internal AsyncConsumerDispatcher(Channel channel, ushort concurrency) : base(channel, concurrency) { } diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index eb01959e4..92c273957 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -31,34 +31,35 @@ using System; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using RabbitMQ.Client.Events; -using RabbitMQ.Client.Impl; using RabbitMQ.Client.Logging; namespace RabbitMQ.Client.ConsumerDispatching { internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, IConsumerDispatcher { - protected readonly ChannelBase _channel; - protected readonly ChannelReader _reader; - private readonly ChannelWriter _writer; + protected readonly Impl.Channel _channel; + protected readonly System.Threading.Channels.ChannelReader _reader; + private readonly System.Threading.Channels.ChannelWriter _writer; private readonly Task _worker; private readonly ushort _concurrency; private bool _quiesce = false; private bool _disposed; - internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) + internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) { _channel = channel; _concurrency = concurrency; - var workChannel = Channel.CreateUnbounded(new UnboundedChannelOptions + + var channelOpts = new System.Threading.Channels.UnboundedChannelOptions { SingleReader = _concurrency == 1, SingleWriter = false, AllowSynchronousContinuations = false - }); + }; + + var workChannel = System.Threading.Channels.Channel.CreateUnbounded(channelOpts); _reader = workChannel.Reader; _writer = workChannel.Writer; diff --git a/projects/RabbitMQ.Client/Framing/Channel.cs b/projects/RabbitMQ.Client/Framing/Channel.cs deleted file mode 100644 index a6c809d36..000000000 --- a/projects/RabbitMQ.Client/Framing/Channel.cs +++ /dev/null @@ -1,155 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using System.Threading; -using System.Threading.Tasks; -using RabbitMQ.Client.Impl; - -namespace RabbitMQ.Client.Framing -{ - // TODO merge into ChannelBase - internal class Channel : ChannelBase - { - public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) - : base(config, session, consumerDispatchConcurrency) - { - } - - public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, - CancellationToken cancellationToken) - { - var method = new BasicAck(deliveryTag, multiple); - return ModelSendAsync(in method, cancellationToken); - } - - public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, - CancellationToken cancellationToken) - { - var method = new BasicNack(deliveryTag, multiple, requeue); - return ModelSendAsync(in method, cancellationToken); - } - - public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, - CancellationToken cancellationToken) - { - var method = new BasicReject(deliveryTag, requeue); - return ModelSendAsync(in method, cancellationToken); - } - - /// - /// Returning true from this method means that the command was server-originated, - /// and handled already. - /// Returning false (the default) means that the incoming command is the response to - /// a client-initiated RPC call, and must be handled. - /// - /// The incoming command from the AMQP server - /// The cancellation token - /// - protected override Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - switch (cmd.CommandId) - { - case ProtocolCommandId.BasicCancel: - { - // Note: always returns true - return HandleBasicCancelAsync(cmd, cancellationToken); - } - case ProtocolCommandId.BasicDeliver: - { - // Note: always returns true - return HandleBasicDeliverAsync(cmd, cancellationToken); - } - case ProtocolCommandId.BasicAck: - { - return HandleBasicAck(cmd, cancellationToken); - } - case ProtocolCommandId.BasicNack: - { - return HandleBasicNack(cmd, cancellationToken); - } - case ProtocolCommandId.BasicReturn: - { - // Note: always returns true - return HandleBasicReturn(cmd, cancellationToken); - } - case ProtocolCommandId.ChannelClose: - { - // Note: always returns true - return HandleChannelCloseAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ChannelCloseOk: - { - // Note: always returns true - return HandleChannelCloseOkAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ChannelFlow: - { - // Note: always returns true - return HandleChannelFlowAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ConnectionBlocked: - { - // Note: always returns true - return HandleConnectionBlockedAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ConnectionClose: - { - // Note: always returns true - return HandleConnectionCloseAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ConnectionSecure: - { - // Note: always returns true - return HandleConnectionSecureAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ConnectionStart: - { - // Note: always returns true - return HandleConnectionStartAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ConnectionTune: - { - // Note: always returns true - return HandleConnectionTuneAsync(cmd, cancellationToken); - } - case ProtocolCommandId.ConnectionUnblocked: - { - // Note: always returns true - return HandleConnectionUnblockedAsync(cancellationToken); - } - default: - { - return Task.FromResult(false); - } - } - } - } -} diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/Channel.cs similarity index 93% rename from projects/RabbitMQ.Client/Impl/ChannelBase.cs rename to projects/RabbitMQ.Client/Impl/Channel.cs index 6ba9ed60c..cce412984 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -48,7 +48,7 @@ namespace RabbitMQ.Client.Impl { - internal abstract class ChannelBase : IChannel, IRecoverable + internal class Channel : IChannel, IRecoverable { ///Only used to kick-start a connection open ///sequence. See @@ -71,7 +71,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null) + public Channel(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null) { ContinuationTimeout = config.ContinuationTimeout; ConsumerDispatcher = new AsyncConsumerDispatcher(this, @@ -92,6 +92,7 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChan } internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan ContinuationTimeout { get; set; } public event AsyncEventHandler BasicAcksAsync @@ -192,7 +193,7 @@ public void MaybeSetConnectionStartException(Exception ex) } } - protected void TakeOver(ChannelBase other) + protected void TakeOver(Channel other) { _basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper); _basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper); @@ -355,8 +356,6 @@ await ModelSendAsync(in method, k.CancellationToken) } } - protected abstract Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken); - protected bool Enqueue(IRpcContinuation k) { if (IsOpen) @@ -873,14 +872,26 @@ public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToke } } - public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, - CancellationToken cancellationToken); + public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, + CancellationToken cancellationToken) + { + var method = new BasicAck(deliveryTag, multiple); + return ModelSendAsync(in method, cancellationToken); + } - public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, - CancellationToken cancellationToken); + public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, + CancellationToken cancellationToken) + { + var method = new BasicNack(deliveryTag, multiple, requeue); + return ModelSendAsync(in method, cancellationToken); + } - public abstract ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, - CancellationToken cancellationToken); + public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, + CancellationToken cancellationToken) + { + var method = new BasicReject(deliveryTag, requeue); + return ModelSendAsync(in method, cancellationToken); + } public async Task BasicCancelAsync(string consumerTag, bool noWait, CancellationToken cancellationToken) @@ -1881,5 +1892,93 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers } } } + + /// + /// Returning true from this method means that the command was server-originated, + /// and handled already. + /// Returning false (the default) means that the incoming command is the response to + /// a client-initiated RPC call, and must be handled. + /// + /// The incoming command from the AMQP server + /// The cancellation token + /// + private Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + switch (cmd.CommandId) + { + case ProtocolCommandId.BasicCancel: + { + // Note: always returns true + return HandleBasicCancelAsync(cmd, cancellationToken); + } + case ProtocolCommandId.BasicDeliver: + { + // Note: always returns true + return HandleBasicDeliverAsync(cmd, cancellationToken); + } + case ProtocolCommandId.BasicAck: + { + return HandleBasicAck(cmd, cancellationToken); + } + case ProtocolCommandId.BasicNack: + { + return HandleBasicNack(cmd, cancellationToken); + } + case ProtocolCommandId.BasicReturn: + { + // Note: always returns true + return HandleBasicReturn(cmd, cancellationToken); + } + case ProtocolCommandId.ChannelClose: + { + // Note: always returns true + return HandleChannelCloseAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ChannelCloseOk: + { + // Note: always returns true + return HandleChannelCloseOkAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ChannelFlow: + { + // Note: always returns true + return HandleChannelFlowAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ConnectionBlocked: + { + // Note: always returns true + return HandleConnectionBlockedAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ConnectionClose: + { + // Note: always returns true + return HandleConnectionCloseAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ConnectionSecure: + { + // Note: always returns true + return HandleConnectionSecureAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ConnectionStart: + { + // Note: always returns true + return HandleConnectionStartAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ConnectionTune: + { + // Note: always returns true + return HandleConnectionTuneAsync(cmd, cancellationToken); + } + case ProtocolCommandId.ConnectionUnblocked: + { + // Note: always returns true + return HandleConnectionUnblockedAsync(cancellationToken); + } + default: + { + return Task.FromResult(false); + } + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 373f6deea..333f24a4b 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -35,7 +35,6 @@ using System.IO; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -50,7 +49,7 @@ internal sealed partial class Connection : IConnection private volatile bool _closed; private readonly ConnectionConfig _config; - private readonly ChannelBase _channel0; // FUTURE Note: this is not disposed + private readonly Channel _channel0; private readonly MainSession _session0; private Guid _id = Guid.NewGuid(); @@ -351,7 +350,7 @@ await _session0.TransmitAsync(method, cancellationToken) .ConfigureAwait(false); } } - catch (ChannelClosedException) + catch (System.Threading.Channels.ChannelClosedException) { if (false == abort) { @@ -510,6 +509,9 @@ await this.AbortAsync() _session0.Dispose(); _mainLoopCts.Dispose(); + + await _channel0.DisposeAsync() + .ConfigureAwait(false); } catch (OperationInterruptedException) { diff --git a/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs index f9a413d27..850a5d8c0 100644 --- a/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs @@ -65,7 +65,7 @@ private SocketFrameHandler(AmqpTcpEndpoint amqpTcpEndpoint, ITcpClient socket, S _socket = socket; _stream = stream; - var channel = Channel.CreateBounded( + var channel = System.Threading.Channels.Channel.CreateBounded( new BoundedChannelOptions(128) { AllowSynchronousContinuations = false,