diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 4cd9118d3..ed5306cde 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -381,13 +381,15 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + var method = new ChannelOpen(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new ChannelOpen(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } return this; } finally @@ -877,12 +879,13 @@ await ModelSendAsync(in method, k.CancellationToken) else { enqueued = Enqueue(k); - - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + if (enqueued) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); + } } return; @@ -912,12 +915,17 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); - - var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - return await k; + if (enqueued) + { + var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + return await k; + } + else + { + return string.Empty; + } } finally { @@ -941,22 +949,27 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + var method = new BasicGet(queue, autoAck); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new BasicGet(queue, autoAck); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - BasicGetResult? result = await k; - - using Activity? activity = result != null - ? RabbitMQActivitySource.Receive(result.RoutingKey, - result.Exchange, - result.DeliveryTag, result.BasicProperties, result.Body.Length) - : RabbitMQActivitySource.ReceiveEmpty(queue); + BasicGetResult? result = await k; - activity?.SetStartTime(k.StartTime); + using Activity? activity = result != null + ? RabbitMQActivitySource.Receive(result.RoutingKey, + result.Exchange, + result.DeliveryTag, result.BasicProperties, result.Body.Length) + : RabbitMQActivitySource.ReceiveEmpty(queue); - return result; + activity?.SetStartTime(k.StartTime); + return result; + } + else + { + return null; + } } finally { @@ -1160,14 +1173,17 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); + var method = new ConnectionUpdateSecret(newSecretBytes, reason); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); - var method = new ConnectionUpdateSecret(newSecretBytes, reason); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); + } - bool result = await k; - Debug.Assert(result); return; } finally @@ -1191,13 +1207,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + var method = new BasicQos(prefetchSize, prefetchCount, global); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new BasicQos(prefetchSize, prefetchCount, global); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); + } - bool result = await k; - Debug.Assert(result); return; } finally @@ -1231,17 +1250,19 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) } enqueued = Enqueue(k); + if (enqueued) + { + var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new ConfirmSelect(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); - // Note: - // Non-null means confirms are enabled - _confirmSemaphore = new SemaphoreSlim(1, 1); + // Note: + // Non-null means confirms are enabled + _confirmSemaphore = new SemaphoreSlim(1, 1); + } return; } @@ -1276,12 +1297,14 @@ await ModelSendAsync(in method, k.CancellationToken) else { enqueued = Enqueue(k); + if (enqueued) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } } return; @@ -1323,12 +1346,14 @@ await ModelSendAsync(in method, k.CancellationToken) else { enqueued = Enqueue(k); + if (enqueued) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } } return; @@ -1363,12 +1388,14 @@ await ModelSendAsync(in method, k.CancellationToken) else { enqueued = Enqueue(k); + if (enqueued) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } } return; @@ -1404,12 +1431,14 @@ await ModelSendAsync(in method, k.CancellationToken) else { enqueued = Enqueue(k); + if (enqueued) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } } return; @@ -1473,6 +1502,8 @@ await ModelSendAsync(in method, k.CancellationToken) } else { + // Note: since this method must return something + // we don't check enqueued here enqueued = Enqueue(k); await ModelSendAsync(in method, k.CancellationToken) @@ -1518,12 +1549,14 @@ await ModelSendAsync(in method, k.CancellationToken) else { enqueued = Enqueue(k); + if (enqueued) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } } return; @@ -1632,14 +1665,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); - - var method = new QueueUnbind(queue, exchange, routingKey, arguments); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + enqueued = Enqueue(k); + if (enqueued) + { + var method = new QueueUnbind(queue, exchange, routingKey, arguments); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } return; } finally @@ -1662,13 +1697,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + var method = new TxCommit(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new TxCommit(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); + } - bool result = await k; - Debug.Assert(result); return; } finally @@ -1691,13 +1729,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + var method = new TxRollback(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new TxRollback(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); + } - bool result = await k; - Debug.Assert(result); return; } finally @@ -1719,14 +1760,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); - - var method = new TxSelect(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + enqueued = Enqueue(k); + if (enqueued) + { + var method = new TxSelect(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + bool result = await k; + Debug.Assert(result); + } return; } finally