Skip to content

Commit aba1188

Browse files
committed
Finish up review of enqueued
1 parent aff8436 commit aba1188

File tree

1 file changed

+99
-68
lines changed

1 file changed

+99
-68
lines changed

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 99 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -949,22 +949,27 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
949949
try
950950
{
951951
enqueued = Enqueue(k);
952+
if (enqueued)
953+
{
954+
var method = new BasicGet(queue, autoAck);
955+
await ModelSendAsync(in method, k.CancellationToken)
956+
.ConfigureAwait(false);
952957

953-
var method = new BasicGet(queue, autoAck);
954-
await ModelSendAsync(in method, k.CancellationToken)
955-
.ConfigureAwait(false);
956-
957-
BasicGetResult? result = await k;
958-
959-
using Activity? activity = result != null
960-
? RabbitMQActivitySource.Receive(result.RoutingKey,
961-
result.Exchange,
962-
result.DeliveryTag, result.BasicProperties, result.Body.Length)
963-
: RabbitMQActivitySource.ReceiveEmpty(queue);
958+
BasicGetResult? result = await k;
964959

965-
activity?.SetStartTime(k.StartTime);
960+
using Activity? activity = result != null
961+
? RabbitMQActivitySource.Receive(result.RoutingKey,
962+
result.Exchange,
963+
result.DeliveryTag, result.BasicProperties, result.Body.Length)
964+
: RabbitMQActivitySource.ReceiveEmpty(queue);
966965

967-
return result;
966+
activity?.SetStartTime(k.StartTime);
967+
return result;
968+
}
969+
else
970+
{
971+
return null;
972+
}
968973
}
969974
finally
970975
{
@@ -1168,14 +1173,17 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
11681173
try
11691174
{
11701175
enqueued = Enqueue(k);
1176+
if (enqueued)
1177+
{
1178+
byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret);
1179+
var method = new ConnectionUpdateSecret(newSecretBytes, reason);
1180+
await ModelSendAsync(in method, k.CancellationToken)
1181+
.ConfigureAwait(false);
11711182

1172-
byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret);
1173-
var method = new ConnectionUpdateSecret(newSecretBytes, reason);
1174-
await ModelSendAsync(in method, k.CancellationToken)
1175-
.ConfigureAwait(false);
1183+
bool result = await k;
1184+
Debug.Assert(result);
1185+
}
11761186

1177-
bool result = await k;
1178-
Debug.Assert(result);
11791187
return;
11801188
}
11811189
finally
@@ -1199,13 +1207,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
11991207
try
12001208
{
12011209
enqueued = Enqueue(k);
1210+
if (enqueued)
1211+
{
1212+
var method = new BasicQos(prefetchSize, prefetchCount, global);
1213+
await ModelSendAsync(in method, k.CancellationToken)
1214+
.ConfigureAwait(false);
12021215

1203-
var method = new BasicQos(prefetchSize, prefetchCount, global);
1204-
await ModelSendAsync(in method, k.CancellationToken)
1205-
.ConfigureAwait(false);
1216+
bool result = await k;
1217+
Debug.Assert(result);
1218+
}
12061219

1207-
bool result = await k;
1208-
Debug.Assert(result);
12091220
return;
12101221
}
12111222
finally
@@ -1239,17 +1250,19 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
12391250
}
12401251

12411252
enqueued = Enqueue(k);
1253+
if (enqueued)
1254+
{
1255+
var method = new ConfirmSelect(false);
1256+
await ModelSendAsync(in method, k.CancellationToken)
1257+
.ConfigureAwait(false);
12421258

1243-
var method = new ConfirmSelect(false);
1244-
await ModelSendAsync(in method, k.CancellationToken)
1245-
.ConfigureAwait(false);
1246-
1247-
bool result = await k;
1248-
Debug.Assert(result);
1259+
bool result = await k;
1260+
Debug.Assert(result);
12491261

1250-
// Note:
1251-
// Non-null means confirms are enabled
1252-
_confirmSemaphore = new SemaphoreSlim(1, 1);
1262+
// Note:
1263+
// Non-null means confirms are enabled
1264+
_confirmSemaphore = new SemaphoreSlim(1, 1);
1265+
}
12531266

12541267
return;
12551268
}
@@ -1284,12 +1297,14 @@ await ModelSendAsync(in method, k.CancellationToken)
12841297
else
12851298
{
12861299
enqueued = Enqueue(k);
1300+
if (enqueued)
1301+
{
1302+
await ModelSendAsync(in method, k.CancellationToken)
1303+
.ConfigureAwait(false);
12871304

1288-
await ModelSendAsync(in method, k.CancellationToken)
1289-
.ConfigureAwait(false);
1290-
1291-
bool result = await k;
1292-
Debug.Assert(result);
1305+
bool result = await k;
1306+
Debug.Assert(result);
1307+
}
12931308
}
12941309

12951310
return;
@@ -1331,12 +1346,14 @@ await ModelSendAsync(in method, k.CancellationToken)
13311346
else
13321347
{
13331348
enqueued = Enqueue(k);
1349+
if (enqueued)
1350+
{
1351+
await ModelSendAsync(in method, k.CancellationToken)
1352+
.ConfigureAwait(false);
13341353

1335-
await ModelSendAsync(in method, k.CancellationToken)
1336-
.ConfigureAwait(false);
1337-
1338-
bool result = await k;
1339-
Debug.Assert(result);
1354+
bool result = await k;
1355+
Debug.Assert(result);
1356+
}
13401357
}
13411358

13421359
return;
@@ -1371,12 +1388,14 @@ await ModelSendAsync(in method, k.CancellationToken)
13711388
else
13721389
{
13731390
enqueued = Enqueue(k);
1391+
if (enqueued)
1392+
{
1393+
await ModelSendAsync(in method, k.CancellationToken)
1394+
.ConfigureAwait(false);
13741395

1375-
await ModelSendAsync(in method, k.CancellationToken)
1376-
.ConfigureAwait(false);
1377-
1378-
bool result = await k;
1379-
Debug.Assert(result);
1396+
bool result = await k;
1397+
Debug.Assert(result);
1398+
}
13801399
}
13811400

13821401
return;
@@ -1412,12 +1431,14 @@ await ModelSendAsync(in method, k.CancellationToken)
14121431
else
14131432
{
14141433
enqueued = Enqueue(k);
1434+
if (enqueued)
1435+
{
1436+
await ModelSendAsync(in method, k.CancellationToken)
1437+
.ConfigureAwait(false);
14151438

1416-
await ModelSendAsync(in method, k.CancellationToken)
1417-
.ConfigureAwait(false);
1418-
1419-
bool result = await k;
1420-
Debug.Assert(result);
1439+
bool result = await k;
1440+
Debug.Assert(result);
1441+
}
14211442
}
14221443

14231444
return;
@@ -1481,6 +1502,8 @@ await ModelSendAsync(in method, k.CancellationToken)
14811502
}
14821503
else
14831504
{
1505+
// Note: since this method must return something
1506+
// we don't check enqueued here
14841507
enqueued = Enqueue(k);
14851508

14861509
await ModelSendAsync(in method, k.CancellationToken)
@@ -1526,12 +1549,14 @@ await ModelSendAsync(in method, k.CancellationToken)
15261549
else
15271550
{
15281551
enqueued = Enqueue(k);
1552+
if (enqueued)
1553+
{
1554+
await ModelSendAsync(in method, k.CancellationToken)
1555+
.ConfigureAwait(false);
15291556

1530-
await ModelSendAsync(in method, k.CancellationToken)
1531-
.ConfigureAwait(false);
1532-
1533-
bool result = await k;
1534-
Debug.Assert(result);
1557+
bool result = await k;
1558+
Debug.Assert(result);
1559+
}
15351560
}
15361561

15371562
return;
@@ -1672,13 +1697,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
16721697
try
16731698
{
16741699
enqueued = Enqueue(k);
1700+
if (enqueued)
1701+
{
1702+
var method = new TxCommit();
1703+
await ModelSendAsync(in method, k.CancellationToken)
1704+
.ConfigureAwait(false);
16751705

1676-
var method = new TxCommit();
1677-
await ModelSendAsync(in method, k.CancellationToken)
1678-
.ConfigureAwait(false);
1706+
bool result = await k;
1707+
Debug.Assert(result);
1708+
}
16791709

1680-
bool result = await k;
1681-
Debug.Assert(result);
16821710
return;
16831711
}
16841712
finally
@@ -1701,13 +1729,16 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
17011729
try
17021730
{
17031731
enqueued = Enqueue(k);
1732+
if (enqueued)
1733+
{
1734+
var method = new TxRollback();
1735+
await ModelSendAsync(in method, k.CancellationToken)
1736+
.ConfigureAwait(false);
17041737

1705-
var method = new TxRollback();
1706-
await ModelSendAsync(in method, k.CancellationToken)
1707-
.ConfigureAwait(false);
1738+
bool result = await k;
1739+
Debug.Assert(result);
1740+
}
17081741

1709-
bool result = await k;
1710-
Debug.Assert(result);
17111742
return;
17121743
}
17131744
finally

0 commit comments

Comments
 (0)