Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 143 additions & 100 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,15 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
var method = new ChannelOpen();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new ChannelOpen();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
return this;
}
finally
Expand Down Expand Up @@ -877,12 +879,13 @@ await ModelSendAsync(in method, k.CancellationToken)
else
{
enqueued = Enqueue(k);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
if (enqueued)
{
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
bool result = await k;
Debug.Assert(result);
}
}

return;
Expand Down Expand Up @@ -912,12 +915,17 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);

var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

return await k;
if (enqueued)
{
var method = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
return await k;
}
else
{
return string.Empty;
}
}
finally
{
Expand All @@ -941,22 +949,27 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
var method = new BasicGet(queue, autoAck);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new BasicGet(queue, autoAck);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

BasicGetResult? result = await k;

using Activity? activity = result != null
? RabbitMQActivitySource.Receive(result.RoutingKey,
result.Exchange,
result.DeliveryTag, result.BasicProperties, result.Body.Length)
: RabbitMQActivitySource.ReceiveEmpty(queue);
BasicGetResult? result = await k;

activity?.SetStartTime(k.StartTime);
using Activity? activity = result != null
? RabbitMQActivitySource.Receive(result.RoutingKey,
result.Exchange,
result.DeliveryTag, result.BasicProperties, result.Body.Length)
: RabbitMQActivitySource.ReceiveEmpty(queue);

return result;
activity?.SetStartTime(k.StartTime);
return result;
}
else
{
return null;
}
}
finally
{
Expand Down Expand Up @@ -1160,14 +1173,17 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret);
var method = new ConnectionUpdateSecret(newSecretBytes, reason);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret);
var method = new ConnectionUpdateSecret(newSecretBytes, reason);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
bool result = await k;
Debug.Assert(result);
}

bool result = await k;
Debug.Assert(result);
return;
}
finally
Expand All @@ -1191,13 +1207,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
var method = new BasicQos(prefetchSize, prefetchCount, global);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new BasicQos(prefetchSize, prefetchCount, global);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
bool result = await k;
Debug.Assert(result);
}

bool result = await k;
Debug.Assert(result);
return;
}
finally
Expand Down Expand Up @@ -1231,17 +1250,19 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
}

enqueued = Enqueue(k);
if (enqueued)
{
var method = new ConfirmSelect(false);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new ConfirmSelect(false);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);

// Note:
// Non-null means confirms are enabled
_confirmSemaphore = new SemaphoreSlim(1, 1);
// Note:
// Non-null means confirms are enabled
_confirmSemaphore = new SemaphoreSlim(1, 1);
}

return;
}
Expand Down Expand Up @@ -1276,12 +1297,14 @@ await ModelSendAsync(in method, k.CancellationToken)
else
{
enqueued = Enqueue(k);
if (enqueued)
{
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
}

return;
Expand Down Expand Up @@ -1323,12 +1346,14 @@ await ModelSendAsync(in method, k.CancellationToken)
else
{
enqueued = Enqueue(k);
if (enqueued)
{
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
}

return;
Expand Down Expand Up @@ -1363,12 +1388,14 @@ await ModelSendAsync(in method, k.CancellationToken)
else
{
enqueued = Enqueue(k);
if (enqueued)
{
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
}

return;
Expand Down Expand Up @@ -1404,12 +1431,14 @@ await ModelSendAsync(in method, k.CancellationToken)
else
{
enqueued = Enqueue(k);
if (enqueued)
{
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
}

return;
Expand Down Expand Up @@ -1473,6 +1502,8 @@ await ModelSendAsync(in method, k.CancellationToken)
}
else
{
// Note: since this method must return something
// we don't check enqueued here
enqueued = Enqueue(k);

await ModelSendAsync(in method, k.CancellationToken)
Expand Down Expand Up @@ -1518,12 +1549,14 @@ await ModelSendAsync(in method, k.CancellationToken)
else
{
enqueued = Enqueue(k);
if (enqueued)
{
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
}

return;
Expand Down Expand Up @@ -1632,14 +1665,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
.ConfigureAwait(false);
try
{
Enqueue(k);

var method = new QueueUnbind(queue, exchange, routingKey, arguments);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
enqueued = Enqueue(k);
if (enqueued)
{
var method = new QueueUnbind(queue, exchange, routingKey, arguments);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
return;
}
finally
Expand All @@ -1662,13 +1697,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
var method = new TxCommit();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new TxCommit();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
bool result = await k;
Debug.Assert(result);
}

bool result = await k;
Debug.Assert(result);
return;
}
finally
Expand All @@ -1691,13 +1729,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
var method = new TxRollback();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new TxRollback();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
bool result = await k;
Debug.Assert(result);
}

bool result = await k;
Debug.Assert(result);
return;
}
finally
Expand All @@ -1719,14 +1760,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
.ConfigureAwait(false);
try
{
Enqueue(k);

var method = new TxSelect();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
enqueued = Enqueue(k);
if (enqueued)
{
var method = new TxSelect();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await k;
Debug.Assert(result);
}
return;
}
finally
Expand Down
Loading