From 29801bece95c1e246fbad63b3fa86d40dcfed3e7 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 14:51:20 +0200 Subject: [PATCH 01/11] Basic implementation of AsyncManualResetEvent --- .../client/impl/AsyncManualResetEvent.cs | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs new file mode 100644 index 000000000..60d89a296 --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -0,0 +1,108 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System.Threading; +using System.Threading.Tasks; + +namespace RabbitMQ.Client.client.impl +{ + /// + /// Inspired by http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx + /// + sealed class AsyncManualResetEvent + { + public AsyncManualResetEvent(bool initialState) + { + if (initialState) + { + _taskCompletionSource.SetResult(true); + } + } + + public bool IsSet => _taskCompletionSource.Task.IsCompleted; + + public async Task WaitAsync(CancellationToken cancellationToken) + { + CancellationTokenRegistration tokenRegistration = +#if NET6_0_OR_GREATER + cancellationToken.UnsafeRegister( + state => ((TaskCompletionSource)state!).TrySetCanceled(), _taskCompletionSource); +#else + cancellationToken.Register( + state => ((TaskCompletionSource)state!).TrySetCanceled(), + state: _taskCompletionSource, useSynchronizationContext: false); +#endif + try + { + await _taskCompletionSource.Task.ConfigureAwait(false); + } + finally + { +#if NET6_0_OR_GREATER + await tokenRegistration.DisposeAsync() + .ConfigureAwait(false); +#else + tokenRegistration.Dispose(); +#endif + } + } + + public void Set() + { + _taskCompletionSource.TrySetResult(true); + } + + public void Reset() + { + var sw = new SpinWait(); + + do + { + var currentTaskCompletionSource = _taskCompletionSource; + if (!currentTaskCompletionSource.Task.IsCompleted) + { + return; + } + + var nextTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + if (Interlocked.CompareExchange(ref _taskCompletionSource, nextTaskCompletionSource, currentTaskCompletionSource) == currentTaskCompletionSource) + { + return; + } + + sw.SpinOnce(); + } + while (true); + } + + volatile TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } +} From cbd994b01f68ff8ed2bd7d62df36aed97d8cee10 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 14:51:42 +0200 Subject: [PATCH 02/11] Adjust channel to use new async primitive --- .../RabbitMQ.Client/client/framing/Channel.cs | 6 +- .../client/impl/ChannelBase.cs | 111 ++++++++++-------- 2 files changed, 67 insertions(+), 50 deletions(-) diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 5ad4ea258..7456f876a 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -47,21 +47,21 @@ public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken) { var method = new BasicAck(deliveryTag, multiple); - return ModelSendAsync(method, cancellationToken); + return ModelSendAsync(in method, cancellationToken); } public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, CancellationToken cancellationToken) { var method = new BasicNack(deliveryTag, multiple, requeue); - return ModelSendAsync(method, cancellationToken); + return ModelSendAsync(in method, cancellationToken); } public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, CancellationToken cancellationToken) { var method = new BasicReject(deliveryTag, requeue); - return ModelSendAsync(method, cancellationToken); + return ModelSendAsync(in method, cancellationToken); } /// diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 9cbc9d514..d3ad88a0b 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -58,7 +58,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable // AMQP only allows one RPC operation to be active at a time. protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); - private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); + private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true); private ulong _nextPublishSeqNo; private SemaphoreSlim? _confirmSemaphore; @@ -231,7 +231,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) { var method = new ChannelClose( args.ReplyCode, args.ReplyText, args.ClassId, args.MethodId); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } @@ -277,9 +277,9 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok { using var timeoutTokenSource = new CancellationTokenSource(HandshakeContinuationTimeout); using var lts = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken); - var m = new ConnectionOpen(virtualHost); + var method = new ConnectionOpen(virtualHost); // Note: must be awaited or else the timeoutTokenSource instance will be disposed - await ModelSendAsync(m, lts.Token).ConfigureAwait(false); + await ModelSendAsync(in method, lts.Token).ConfigureAwait(false); } internal async ValueTask ConnectionSecureOkAsync(byte[] response, @@ -297,7 +297,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { var method = new ConnectionSecureOk(response); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } catch (AlreadyClosedException) @@ -336,7 +336,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { var method = new ConnectionStartOk(clientProperties, mechanism, response, locale); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } catch (AlreadyClosedException) @@ -386,7 +386,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new ChannelOpen(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -451,16 +451,21 @@ protected ValueTask ModelSendAsync(in T method, CancellationToken cancellatio } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendObserveFlowControlAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader + { + return Session.TransmitAsync(in method, in header, body, cancellationToken); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected async Task FlowControlAsync(CancellationToken cancellationToken) { if (!_flowControlBlock.IsSet) { - _flowControlBlock.Wait(cancellationToken); + await _flowControlBlock.WaitAsync(cancellationToken) + .ConfigureAwait(false); } - - return Session.TransmitAsync(in method, in header, body, cancellationToken); } internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) @@ -568,7 +573,7 @@ protected virtual void Dispose(bool disposing) public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) { var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); - return ModelSendAsync(method, cancellationToken).AsTask(); + return ModelSendAsync(in method, cancellationToken).AsTask(); } protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken) @@ -662,7 +667,7 @@ await Session.CloseAsync(_closeReason, false, cancellationToken) .ConfigureAwait(false); var method = new ChannelCloseOk(); - await ModelSendAsync(method, cancellationToken) + await ModelSendAsync(in method, cancellationToken) .ConfigureAwait(false); await Session.NotifyAsync(cancellationToken) @@ -702,7 +707,7 @@ protected async Task HandleChannelFlowAsync(IncomingCommand cmd, Cancellat } var method = new ChannelFlowOk(active); - await ModelSendAsync(method, cancellationToken). + await ModelSendAsync(in method, cancellationToken). ConfigureAwait(false); if (!_flowControlAsyncWrapper.IsEmpty) @@ -732,7 +737,7 @@ await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) .ConfigureAwait(false); var replyMethod = new ConnectionCloseOk(); - await ModelSendAsync(replyMethod, cancellationToken) + await ModelSendAsync(in replyMethod, cancellationToken) .ConfigureAwait(false); SetCloseReason(Session.Connection.CloseReason!); @@ -847,7 +852,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); ConsumerDispatcher.GetAndRemoveConsumer(consumerTag); } @@ -855,7 +860,7 @@ await ModelSendAsync(method, k.CancellationToken) { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -891,7 +896,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new Client.Framing.Impl.BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return await k; @@ -920,7 +925,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new BasicGet(queue, autoAck); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); BasicGetResult? result = await k; @@ -985,18 +990,24 @@ await _confirmSemaphore.WaitAsync(cancellationToken) BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); if (props is null) { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await FlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } else { - await ModelSendObserveFlowControlAsync(in cmd, in props, body, cancellationToken) + await FlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); } } else { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await FlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } } @@ -1064,18 +1075,24 @@ await _confirmSemaphore.WaitAsync(cancellationToken) BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); if (props is null) { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await FlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } else { - await ModelSendObserveFlowControlAsync(in cmd, in props, body, cancellationToken) + await FlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); } } else { - await ModelSendObserveFlowControlAsync(in cmd, in basicProperties, body, cancellationToken) + await FlowControlAsync(cancellationToken) + .ConfigureAwait(false); + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } } @@ -1128,7 +1145,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); var method = new ConnectionUpdateSecret(newSecretBytes, reason); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1158,7 +1175,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new BasicQos(prefetchSize, prefetchCount, global); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1198,7 +1215,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new ConfirmSelect(false); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1235,14 +1252,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1282,14 +1299,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, false, noWait, arguments); if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1322,14 +1339,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1363,14 +1380,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1426,7 +1443,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); if (false == passive) @@ -1440,7 +1457,7 @@ await ModelSendAsync(method, k.CancellationToken) { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); QueueDeclareOk result = await k; @@ -1477,14 +1494,14 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } else { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1533,7 +1550,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return 0; @@ -1542,7 +1559,7 @@ await ModelSendAsync(method, k.CancellationToken) { enqueued = Enqueue(k); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return await k; @@ -1571,7 +1588,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new QueuePurge(queue, false); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); return await k; @@ -1600,7 +1617,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) Enqueue(k); var method = new QueueUnbind(queue, exchange, routingKey, arguments); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1629,7 +1646,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new TxCommit(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1658,7 +1675,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) enqueued = Enqueue(k); var method = new TxRollback(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; @@ -1687,7 +1704,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) Enqueue(k); var method = new TxSelect(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); bool result = await k; From d7695082caf8690ff3101e3de95d02de15021fef Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 15:18:29 +0200 Subject: [PATCH 03/11] Use ManualResetValueTaskSourceCore for more efficiency --- projects/Directory.Packages.props | 3 +- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 1 + .../client/impl/AsyncManualResetEvent.cs | 77 ++++++++++++------- 3 files changed, 52 insertions(+), 29 deletions(-) diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props index 34cca5f24..21c278ef3 100644 --- a/projects/Directory.Packages.props +++ b/projects/Directory.Packages.props @@ -34,6 +34,7 @@ + @@ -46,4 +47,4 @@ - \ No newline at end of file + diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 22a045b8e..fbcdd2122 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -77,5 +77,6 @@ + diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs index 60d89a296..faf1025a0 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -29,40 +29,63 @@ // Copyright (c) 2007-2024 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- +using System; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Sources; namespace RabbitMQ.Client.client.impl { /// /// Inspired by http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx /// - sealed class AsyncManualResetEvent + sealed class AsyncManualResetEvent : IValueTaskSource { - public AsyncManualResetEvent(bool initialState) + private ManualResetValueTaskSourceCore _valueTaskSource; + private volatile bool _isSet; + + public AsyncManualResetEvent(bool initialState = false) { + _isSet = initialState; + _valueTaskSource.Reset(); if (initialState) { - _taskCompletionSource.SetResult(true); + _valueTaskSource.SetResult(true); } } - public bool IsSet => _taskCompletionSource.Task.IsCompleted; + public bool IsSet => _isSet; - public async Task WaitAsync(CancellationToken cancellationToken) + public async ValueTask WaitAsync(CancellationToken cancellationToken) { + if (_isSet) + { + return; + } + + cancellationToken.ThrowIfCancellationRequested(); + CancellationTokenRegistration tokenRegistration = #if NET6_0_OR_GREATER cancellationToken.UnsafeRegister( - state => ((TaskCompletionSource)state!).TrySetCanceled(), _taskCompletionSource); + static state => + { + var (source, token) = ((ManualResetValueTaskSourceCore, CancellationToken))state!; + source.SetException(new OperationCanceledException(token)); + }, (_valueTaskSource, cancellationToken)); #else cancellationToken.Register( - state => ((TaskCompletionSource)state!).TrySetCanceled(), - state: _taskCompletionSource, useSynchronizationContext: false); + static state => + { + var (source, token) = ((ManualResetValueTaskSourceCore, CancellationToken))state!; + source.SetException(new OperationCanceledException(token)); + }, + state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false); #endif try { - await _taskCompletionSource.Task.ConfigureAwait(false); + await new ValueTask(this, _valueTaskSource.Version) + .ConfigureAwait(false); } finally { @@ -77,32 +100,30 @@ await tokenRegistration.DisposeAsync() public void Set() { - _taskCompletionSource.TrySetResult(true); + if (_isSet) + { + return; + } + + _isSet = true; + _valueTaskSource.SetResult(true); } public void Reset() { - var sw = new SpinWait(); - - do + if (!_isSet) { - var currentTaskCompletionSource = _taskCompletionSource; - if (!currentTaskCompletionSource.Task.IsCompleted) - { - return; - } - - var nextTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - if (Interlocked.CompareExchange(ref _taskCompletionSource, nextTaskCompletionSource, currentTaskCompletionSource) == currentTaskCompletionSource) - { - return; - } - - sw.SpinOnce(); + return; } - while (true); + + _isSet = false; + _valueTaskSource.Reset(); } - volatile TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + void IValueTaskSource.GetResult(short token) => _valueTaskSource.GetResult(token); + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _valueTaskSource.GetStatus(token); + + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags); } } From 13ec02368ba0a04e403615205f4189d7c45dd8dd Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 15:20:21 +0200 Subject: [PATCH 04/11] Propagate value task --- projects/RabbitMQ.Client/client/impl/ChannelBase.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index d3ad88a0b..21d8e3ac6 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -459,13 +459,14 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected async Task FlowControlAsync(CancellationToken cancellationToken) + protected ValueTask FlowControlAsync(CancellationToken cancellationToken) { - if (!_flowControlBlock.IsSet) + if (_flowControlBlock.IsSet) { - await _flowControlBlock.WaitAsync(cancellationToken) - .ConfigureAwait(false); + return new ValueTask(); } + + return _flowControlBlock.WaitAsync(cancellationToken); } internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) From b52c3d160fe65f5065981352f1bac88f50be28ec Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 15:22:21 +0200 Subject: [PATCH 05/11] Remove now unnecessary link --- projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs index faf1025a0..83e58e9ba 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -36,9 +36,6 @@ namespace RabbitMQ.Client.client.impl { - /// - /// Inspired by http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx - /// sealed class AsyncManualResetEvent : IValueTaskSource { private ManualResetValueTaskSourceCore _valueTaskSource; From 382c0e7f6a594f90f8d73f46ede192827991eb43 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 18 Sep 2024 17:37:11 +0200 Subject: [PATCH 06/11] Volatile.Read and Write --- .../client/impl/AsyncManualResetEvent.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs index 83e58e9ba..fd78ddd0d 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -39,7 +39,7 @@ namespace RabbitMQ.Client.client.impl sealed class AsyncManualResetEvent : IValueTaskSource { private ManualResetValueTaskSourceCore _valueTaskSource; - private volatile bool _isSet; + private bool _isSet; public AsyncManualResetEvent(bool initialState = false) { @@ -51,7 +51,7 @@ public AsyncManualResetEvent(bool initialState = false) } } - public bool IsSet => _isSet; + public bool IsSet => Volatile.Read(ref _isSet); public async ValueTask WaitAsync(CancellationToken cancellationToken) { @@ -97,23 +97,23 @@ await tokenRegistration.DisposeAsync() public void Set() { - if (_isSet) + if (IsSet) { return; } - _isSet = true; + Volatile.Write(ref _isSet, true); _valueTaskSource.SetResult(true); } public void Reset() { - if (!_isSet) + if (!IsSet) { return; } - _isSet = false; + Volatile.Write(ref _isSet, false); _valueTaskSource.Reset(); } From ddbf555f2ec7057f737ceb1ced855e6b2df37149 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 18 Sep 2024 10:30:20 -0700 Subject: [PATCH 07/11] Minor formatting change. --- .../RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs index fd78ddd0d..b8d557a3e 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -117,10 +117,13 @@ public void Reset() _valueTaskSource.Reset(); } - void IValueTaskSource.GetResult(short token) => _valueTaskSource.GetResult(token); + void IValueTaskSource.GetResult(short token) => + _valueTaskSource.GetResult(token); - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _valueTaskSource.GetStatus(token); + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => + _valueTaskSource.GetStatus(token); - void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags); + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => + _valueTaskSource.OnCompleted(continuation, state, token, flags); } } From e206659153b71b62f5975b81aa242eb2a0c850bd Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 21:08:26 +0200 Subject: [PATCH 08/11] Use IsSet Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com> --- projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs index b8d557a3e..58facb2bd 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -55,7 +55,7 @@ public AsyncManualResetEvent(bool initialState = false) public async ValueTask WaitAsync(CancellationToken cancellationToken) { - if (_isSet) + if (IsSet) { return; } From 393dacff6cdba13ff29d6761a3d0a5af3e384528 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 19:31:08 +0000 Subject: [PATCH 09/11] Validate token --- .../client/impl/AsyncManualResetEvent.cs | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs index 58facb2bd..c667f7044 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; @@ -117,13 +118,38 @@ public void Reset() _valueTaskSource.Reset(); } - void IValueTaskSource.GetResult(short token) => + void IValueTaskSource.GetResult(short token) + { + if (token != _valueTaskSource.Version) + { + ThrowIncorrectTokenException(); + } + _valueTaskSource.GetResult(token); + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + { + if (token != _valueTaskSource.Version) + { + ThrowIncorrectTokenException(); + } + + return _valueTaskSource.GetStatus(token); + } - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => - _valueTaskSource.GetStatus(token); + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + if (token != _valueTaskSource.Version) + { + ThrowIncorrectTokenException(); + } - void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags); + } + + [DoesNotReturn] + static void ThrowIncorrectTokenException() => + throw new InvalidOperationException("ValueTask cannot be awaited multiple times."); } } From ea6fdccf6a99ea1b5ad9ee1f9439b27f31134242 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 19:32:23 +0000 Subject: [PATCH 10/11] Use default --- projects/RabbitMQ.Client/client/impl/ChannelBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 21d8e3ac6..e10c2b1bd 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -463,7 +463,7 @@ protected ValueTask FlowControlAsync(CancellationToken cancellationToken) { if (_flowControlBlock.IsSet) { - return new ValueTask(); + return default; } return _flowControlBlock.WaitAsync(cancellationToken); From a15c1f8d0b121f987d36d42b585a8e0724277d38 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 18 Sep 2024 12:43:18 -0700 Subject: [PATCH 11/11] Rename to fix conflict with event. --- .../RabbitMQ.Client/client/impl/ChannelBase.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index e10c2b1bd..3d94112ea 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -459,7 +459,7 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask FlowControlAsync(CancellationToken cancellationToken) + protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken) { if (_flowControlBlock.IsSet) { @@ -991,14 +991,14 @@ await _confirmSemaphore.WaitAsync(cancellationToken) BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); if (props is null) { - await FlowControlAsync(cancellationToken) + await EnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } else { - await FlowControlAsync(cancellationToken) + await EnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); @@ -1006,7 +1006,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) } else { - await FlowControlAsync(cancellationToken) + await EnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); @@ -1076,14 +1076,14 @@ await _confirmSemaphore.WaitAsync(cancellationToken) BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); if (props is null) { - await FlowControlAsync(cancellationToken) + await EnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false); } else { - await FlowControlAsync(cancellationToken) + await EnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); @@ -1091,7 +1091,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) } else { - await FlowControlAsync(cancellationToken) + await EnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) .ConfigureAwait(false);