diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props index 34cca5f24..21c278ef3 100644 --- a/projects/Directory.Packages.props +++ b/projects/Directory.Packages.props @@ -34,6 +34,7 @@ + @@ -46,4 +47,4 @@ - \ No newline at end of file + diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 22a045b8e..fbcdd2122 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -77,5 +77,6 @@ + diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 5ad4ea258..7456f876a 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -47,21 +47,21 @@ public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken) { var method = new BasicAck(deliveryTag, multiple); - return ModelSendAsync(method, cancellationToken); + return ModelSendAsync(in method, cancellationToken); } public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, CancellationToken cancellationToken) { var method = new BasicNack(deliveryTag, multiple, requeue); - return ModelSendAsync(method, cancellationToken); + return ModelSendAsync(in method, cancellationToken); } public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, CancellationToken cancellationToken) { var method = new BasicReject(deliveryTag, requeue); - return ModelSendAsync(method, cancellationToken); + return ModelSendAsync(in method, cancellationToken); } /// diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs new file mode 100644 index 000000000..c667f7044 --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -0,0 +1,155 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace RabbitMQ.Client.client.impl +{ + sealed class AsyncManualResetEvent : IValueTaskSource + { + private ManualResetValueTaskSourceCore _valueTaskSource; + private bool _isSet; + + public AsyncManualResetEvent(bool initialState = false) + { + _isSet = initialState; + _valueTaskSource.Reset(); + if (initialState) + { + _valueTaskSource.SetResult(true); + } + } + + public bool IsSet => Volatile.Read(ref _isSet); + + public async ValueTask WaitAsync(CancellationToken cancellationToken) + { + if (IsSet) + { + return; + } + + cancellationToken.ThrowIfCancellationRequested(); + + CancellationTokenRegistration tokenRegistration = +#if NET6_0_OR_GREATER + cancellationToken.UnsafeRegister( + static state => + { + var (source, token) = ((ManualResetValueTaskSourceCore, CancellationToken))state!; + source.SetException(new OperationCanceledException(token)); + }, (_valueTaskSource, cancellationToken)); +#else + cancellationToken.Register( + static state => + { + var (source, token) = ((ManualResetValueTaskSourceCore, CancellationToken))state!; + source.SetException(new OperationCanceledException(token)); + }, + state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false); +#endif + try + { + await new ValueTask(this, _valueTaskSource.Version) + .ConfigureAwait(false); + } + finally + { +#if NET6_0_OR_GREATER + await tokenRegistration.DisposeAsync() + .ConfigureAwait(false); +#else + tokenRegistration.Dispose(); +#endif + } + } + + public void Set() + { + if (IsSet) + { + return; + } + + Volatile.Write(ref _isSet, true); + _valueTaskSource.SetResult(true); + } + + public void Reset() + { + if (!IsSet) + { + return; + } + + Volatile.Write(ref _isSet, false); + _valueTaskSource.Reset(); + } + + void IValueTaskSource.GetResult(short token) + { + if (token != _valueTaskSource.Version) + { + ThrowIncorrectTokenException(); + } + + _valueTaskSource.GetResult(token); + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + { + if (token != _valueTaskSource.Version) + { + ThrowIncorrectTokenException(); + } + + return _valueTaskSource.GetStatus(token); + } + + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + if (token != _valueTaskSource.Version) + { + ThrowIncorrectTokenException(); + } + + _valueTaskSource.OnCompleted(continuation, state, token, flags); + } + + [DoesNotReturn] + static void ThrowIncorrectTokenException() => + throw new InvalidOperationException("ValueTask cannot be awaited multiple times."); + } +} diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 9cbc9d514..3d94112ea 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -58,7 +58,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable // AMQP only allows one RPC operation to be active at a time. protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); - private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); + private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true); private ulong _nextPublishSeqNo; private SemaphoreSlim? _confirmSemaphore; @@ -231,7 +231,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) { var method = new ChannelClose( args.ReplyCode, args.ReplyText, args.ClassId, args.MethodId); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } @@ -277,9 +277,9 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok { using var timeoutTokenSource = new CancellationTokenSource(HandshakeContinuationTimeout); using var lts = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken); - var m = new ConnectionOpen(virtualHost); + var method = new ConnectionOpen(virtualHost); // Note: must be awaited or else the timeoutTokenSource instance will be disposed - await ModelSendAsync(m, lts.Token).ConfigureAwait(false); + await ModelSendAsync(in method, lts.Token).ConfigureAwait(false); } internal async ValueTask ConnectionSecureOkAsync(byte[] response, @@ -297,7 +297,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { var method = new ConnectionSecureOk(response); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } catch (AlreadyClosedException) @@ -336,7 +336,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { var method = new ConnectionStartOk(clientProperties, mechanism, response, locale); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } catch (AlreadyClosedException) @@ -386,7 +386,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new ChannelOpen(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -451,16 +451,22 @@ protected ValueTask ModelSendAsync(in T method, CancellationToken cancellatio } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendObserveFlowControlAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { - if (!_flowControlBlock.IsSet) + return Session.TransmitAsync(in method, in header, body, cancellationToken); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken) + { + if (_flowControlBlock.IsSet) { - _flowControlBlock.Wait(cancellationToken); + return default; } - return Session.TransmitAsync(in method, in header, body, cancellationToken); + return _flowControlBlock.WaitAsync(cancellationToken); } internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) @@ -568,7 +574,7 @@ protected virtual void Dispose(bool disposing) public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) { var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); - return ModelSendAsync(method, cancellationToken).AsTask(); + return ModelSendAsync(in method, cancellationToken).AsTask(); } protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken) @@ -662,7 +668,7 @@ await Session.CloseAsync(_closeReason, false, cancellationToken) .ConfigureAwait(false); var method = new ChannelCloseOk(); - await ModelSendAsync(method, cancellationToken) + await ModelSendAsync(in method, cancellationToken) .ConfigureAwait(false); await Session.NotifyAsync(cancellationToken) @@ -702,7 +708,7 @@ protected async Task HandleChannelFlowAsync(IncomingCommand cmd, Cancellat } var method = new ChannelFlowOk(active); - await ModelSendAsync(method, cancellationToken). + await ModelSendAsync(in method, cancellationToken). ConfigureAwait(false); if (!_flowControlAsyncWrapper.IsEmpty) @@ -732,7 +738,7 @@ await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) .ConfigureAwait(false); var replyMethod = new ConnectionCloseOk(); - await ModelSendAsync(replyMethod, cancellationToken) + await ModelSendAsync(in replyMethod, cancellationToken) .ConfigureAwait(false); SetCloseReason(Session.Connection.CloseReason!); @@ -847,7 +853,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); ConsumerDispatcher.GetAndRemoveConsumer(consumerTag); } @@ -855,7 +861,7 @@ await ModelSendAsync(method, k.CancellationToken) { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -891,7 +897,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new Client.Framing.Impl.BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return await k; @@ -920,7 +926,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new BasicGet(queue, autoAck); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); BasicGetResult? result = await k; @@ -985,18 +991,24 @@ await _confirmSemaphore.WaitAsync(cancellationToken) BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); if (props is null) { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } else { - await ModelSendObserveFlowControlAsync(in cmd, in props, body, cancellationToken) + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); } } else { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } } @@ -1064,18 +1076,24 @@ await _confirmSemaphore.WaitAsync(cancellationToken) BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); if (props is null) { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } else { - await ModelSendObserveFlowControlAsync(in cmd, in props, body, cancellationToken) + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); } } else { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } } @@ -1128,7 +1146,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); var method = new ConnectionUpdateSecret(newSecretBytes, reason); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1158,7 +1176,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new BasicQos(prefetchSize, prefetchCount, global); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1198,7 +1216,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new ConfirmSelect(false); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1235,14 +1253,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1282,14 +1300,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, false, noWait, arguments); if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1322,14 +1340,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1363,14 +1381,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1426,7 +1444,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); if (false == passive) @@ -1440,7 +1458,7 @@ await ModelSendAsync(method, k.CancellationToken) { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); QueueDeclareOk result = await k; @@ -1477,14 +1495,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1533,7 +1551,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return 0; @@ -1542,7 +1560,7 @@ await ModelSendAsync(method, k.CancellationToken) { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return await k; @@ -1571,7 +1589,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new QueuePurge(queue, false); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return await k; @@ -1600,7 +1618,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) Enqueue(k); var method = new QueueUnbind(queue, exchange, routingKey, arguments); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1629,7 +1647,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new TxCommit(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1658,7 +1676,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new TxRollback(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1687,7 +1705,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) Enqueue(k); var method = new TxSelect(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k;