@@ -1353,57 +1353,59 @@ bool CConnman::InactivityCheck(const CNode& node) const
1353
1353
return false ;
1354
1354
}
1355
1355
1356
- bool CConnman::GenerateSelectSet (std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
1356
+ bool CConnman::GenerateSelectSet (const std::vector<CNode*>& nodes,
1357
+ std::set<SOCKET>& recv_set,
1358
+ std::set<SOCKET>& send_set,
1359
+ std::set<SOCKET>& error_set)
1357
1360
{
1358
1361
for (const ListenSocket& hListenSocket : vhListenSocket) {
1359
1362
recv_set.insert (hListenSocket.socket );
1360
1363
}
1361
1364
1362
- {
1363
- LOCK (cs_vNodes);
1364
- for (CNode* pnode : vNodes)
1365
+ for (CNode* pnode : nodes) {
1366
+ // Implement the following logic:
1367
+ // * If there is data to send, select() for sending data. As this only
1368
+ // happens when optimistic write failed, we choose to first drain the
1369
+ // write buffer in this case before receiving more. This avoids
1370
+ // needlessly queueing received data, if the remote peer is not themselves
1371
+ // receiving data. This means properly utilizing TCP flow control signalling.
1372
+ // * Otherwise, if there is space left in the receive buffer, select() for
1373
+ // receiving data.
1374
+ // * Hand off all complete messages to the processor, to be handled without
1375
+ // blocking here.
1376
+
1377
+ bool select_recv = !pnode->fPauseRecv ;
1378
+ bool select_send;
1365
1379
{
1366
- // Implement the following logic:
1367
- // * If there is data to send, select() for sending data. As this only
1368
- // happens when optimistic write failed, we choose to first drain the
1369
- // write buffer in this case before receiving more. This avoids
1370
- // needlessly queueing received data, if the remote peer is not themselves
1371
- // receiving data. This means properly utilizing TCP flow control signalling.
1372
- // * Otherwise, if there is space left in the receive buffer, select() for
1373
- // receiving data.
1374
- // * Hand off all complete messages to the processor, to be handled without
1375
- // blocking here.
1376
-
1377
- bool select_recv = !pnode->fPauseRecv ;
1378
- bool select_send;
1379
- {
1380
- LOCK (pnode->cs_vSend );
1381
- select_send = !pnode->vSendMsg .empty ();
1382
- }
1380
+ LOCK (pnode->cs_vSend );
1381
+ select_send = !pnode->vSendMsg .empty ();
1382
+ }
1383
1383
1384
- LOCK (pnode->cs_hSocket );
1385
- if (pnode->hSocket == INVALID_SOCKET)
1386
- continue ;
1384
+ LOCK (pnode->cs_hSocket );
1385
+ if (pnode->hSocket == INVALID_SOCKET)
1386
+ continue ;
1387
1387
1388
- error_set.insert (pnode->hSocket );
1389
- if (select_send) {
1390
- send_set.insert (pnode->hSocket );
1391
- continue ;
1392
- }
1393
- if (select_recv) {
1394
- recv_set.insert (pnode->hSocket );
1395
- }
1388
+ error_set.insert (pnode->hSocket );
1389
+ if (select_send) {
1390
+ send_set.insert (pnode->hSocket );
1391
+ continue ;
1392
+ }
1393
+ if (select_recv) {
1394
+ recv_set.insert (pnode->hSocket );
1396
1395
}
1397
1396
}
1398
1397
1399
1398
return !recv_set.empty () || !send_set.empty () || !error_set.empty ();
1400
1399
}
1401
1400
1402
1401
#ifdef USE_POLL
1403
- void CConnman::SocketEvents (std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
1402
+ void CConnman::SocketEvents (const std::vector<CNode*>& nodes,
1403
+ std::set<SOCKET>& recv_set,
1404
+ std::set<SOCKET>& send_set,
1405
+ std::set<SOCKET>& error_set)
1404
1406
{
1405
1407
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
1406
- if (!GenerateSelectSet (recv_select_set, send_select_set, error_select_set)) {
1408
+ if (!GenerateSelectSet (nodes, recv_select_set, send_select_set, error_select_set)) {
1407
1409
interruptNet.sleep_for (std::chrono::milliseconds (SELECT_TIMEOUT_MILLISECONDS));
1408
1410
return ;
1409
1411
}
@@ -1442,10 +1444,13 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
1442
1444
}
1443
1445
}
1444
1446
#else
1445
- void CConnman::SocketEvents (std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
1447
+ void CConnman::SocketEvents (const std::vector<CNode*>& nodes,
1448
+ std::set<SOCKET>& recv_set,
1449
+ std::set<SOCKET>& send_set,
1450
+ std::set<SOCKET>& error_set)
1446
1451
{
1447
1452
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
1448
- if (!GenerateSelectSet (recv_select_set, send_select_set, error_select_set)) {
1453
+ if (!GenerateSelectSet (nodes, recv_select_set, send_select_set, error_select_set)) {
1449
1454
interruptNet.sleep_for (std::chrono::milliseconds (SELECT_TIMEOUT_MILLISECONDS));
1450
1455
return ;
1451
1456
}
@@ -1519,34 +1524,33 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
1519
1524
1520
1525
void CConnman::SocketHandler ()
1521
1526
{
1522
- std::set<SOCKET> recv_set, send_set, error_set;
1523
- SocketEvents (recv_set, send_set, error_set);
1524
-
1525
- if (interruptNet) return ;
1527
+ std::set<SOCKET> recv_set;
1528
+ std::set<SOCKET> send_set;
1529
+ std::set<SOCKET> error_set;
1526
1530
1527
- //
1528
- // Accept new connections
1529
- //
1530
- for (const ListenSocket& hListenSocket : vhListenSocket)
1531
1531
{
1532
- if (hListenSocket.socket != INVALID_SOCKET && recv_set.count (hListenSocket.socket ) > 0 )
1533
- {
1534
- AcceptConnection (hListenSocket);
1535
- }
1536
- }
1532
+ const NodesSnapshot snap{*this , /* shuffle=*/ false };
1537
1533
1538
- //
1539
- // Service each socket
1540
- //
1541
- std::vector<CNode*> vNodesCopy;
1542
- {
1543
- LOCK (cs_vNodes);
1544
- vNodesCopy = vNodes;
1545
- for (CNode* pnode : vNodesCopy)
1546
- pnode->AddRef ();
1534
+ // Check for the readiness of the already connected sockets and the
1535
+ // listening sockets in one call ("readiness" as in poll(2) or
1536
+ // select(2)). If none are ready, wait for a short while and return
1537
+ // empty sets.
1538
+ SocketEvents (snap.Nodes (), recv_set, send_set, error_set);
1539
+
1540
+ // Service (send/receive) each of the already connected nodes.
1541
+ SocketHandlerConnected (snap.Nodes (), recv_set, send_set, error_set);
1547
1542
}
1548
- for (CNode* pnode : vNodesCopy)
1549
- {
1543
+
1544
+ // Accept new connections from listening sockets.
1545
+ SocketHandlerListening (recv_set);
1546
+ }
1547
+
1548
+ void CConnman::SocketHandlerConnected (const std::vector<CNode*>& nodes,
1549
+ const std::set<SOCKET>& recv_set,
1550
+ const std::set<SOCKET>& send_set,
1551
+ const std::set<SOCKET>& error_set)
1552
+ {
1553
+ for (CNode* pnode : nodes) {
1550
1554
if (interruptNet)
1551
1555
return ;
1552
1556
@@ -1628,10 +1632,17 @@ void CConnman::SocketHandler()
1628
1632
1629
1633
if (InactivityCheck (*pnode)) pnode->fDisconnect = true ;
1630
1634
}
1631
- {
1632
- LOCK (cs_vNodes);
1633
- for (CNode* pnode : vNodesCopy)
1634
- pnode->Release ();
1635
+ }
1636
+
1637
+ void CConnman::SocketHandlerListening (const std::set<SOCKET>& recv_set)
1638
+ {
1639
+ for (const ListenSocket& listen_socket : vhListenSocket) {
1640
+ if (interruptNet) {
1641
+ return ;
1642
+ }
1643
+ if (listen_socket.socket != INVALID_SOCKET && recv_set.count (listen_socket.socket ) > 0 ) {
1644
+ AcceptConnection (listen_socket);
1645
+ }
1635
1646
}
1636
1647
}
1637
1648
@@ -2246,49 +2257,34 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
2246
2257
void CConnman::ThreadMessageHandler ()
2247
2258
{
2248
2259
SetSyscallSandboxPolicy (SyscallSandboxPolicy::MESSAGE_HANDLER);
2249
- FastRandomContext rng;
2250
2260
while (!flagInterruptMsgProc)
2251
2261
{
2252
- std::vector<CNode*> vNodesCopy;
2253
- {
2254
- LOCK (cs_vNodes);
2255
- vNodesCopy = vNodes;
2256
- for (CNode* pnode : vNodesCopy) {
2257
- pnode->AddRef ();
2258
- }
2259
- }
2260
-
2261
2262
bool fMoreWork = false ;
2262
2263
2263
- // Randomize the order in which we process messages from/to our peers.
2264
- // This prevents attacks in which an attacker exploits having multiple
2265
- // consecutive connections in the vNodes list.
2266
- Shuffle (vNodesCopy.begin (), vNodesCopy.end (), rng);
2267
-
2268
- for (CNode* pnode : vNodesCopy)
2269
2264
{
2270
- if (pnode->fDisconnect )
2271
- continue ;
2265
+ // Randomize the order in which we process messages from/to our peers.
2266
+ // This prevents attacks in which an attacker exploits having multiple
2267
+ // consecutive connections in the vNodes list.
2268
+ const NodesSnapshot snap{*this , /* shuffle=*/ true };
2272
2269
2273
- // Receive messages
2274
- bool fMoreNodeWork = m_msgproc->ProcessMessages (pnode, flagInterruptMsgProc);
2275
- fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend );
2276
- if (flagInterruptMsgProc)
2277
- return ;
2278
- // Send messages
2279
- {
2280
- LOCK (pnode->cs_sendProcessing );
2281
- m_msgproc->SendMessages (pnode);
2282
- }
2270
+ for (CNode* pnode : snap.Nodes ()) {
2271
+ if (pnode->fDisconnect )
2272
+ continue ;
2283
2273
2284
- if (flagInterruptMsgProc)
2285
- return ;
2286
- }
2274
+ // Receive messages
2275
+ bool fMoreNodeWork = m_msgproc->ProcessMessages (pnode, flagInterruptMsgProc);
2276
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend );
2277
+ if (flagInterruptMsgProc)
2278
+ return ;
2279
+ // Send messages
2280
+ {
2281
+ LOCK (pnode->cs_sendProcessing );
2282
+ m_msgproc->SendMessages (pnode);
2283
+ }
2287
2284
2288
- {
2289
- LOCK (cs_vNodes);
2290
- for (CNode* pnode : vNodesCopy)
2291
- pnode->Release ();
2285
+ if (flagInterruptMsgProc)
2286
+ return ;
2287
+ }
2292
2288
}
2293
2289
2294
2290
WAIT_LOCK (mutexMsgProc, lock);
0 commit comments