@@ -1262,209 +1262,213 @@ void CConnman::InactivityCheck(CNode *pnode)
1262
1262
}
1263
1263
}
1264
1264
1265
- void CConnman::ThreadSocketHandler ()
1265
+ void CConnman::SocketHandler ()
1266
1266
{
1267
- while (!interruptNet)
1268
- {
1269
- DisconnectNodes ();
1270
- NotifyNumConnectionsChanged ();
1267
+ //
1268
+ // Find which sockets have data to receive
1269
+ //
1270
+ struct timeval timeout;
1271
+ timeout.tv_sec = 0 ;
1272
+ timeout.tv_usec = 50000 ; // frequency to poll pnode->vSend
1271
1273
1272
- //
1273
- // Find which sockets have data to receive
1274
- //
1275
- struct timeval timeout;
1276
- timeout.tv_sec = 0 ;
1277
- timeout.tv_usec = 50000 ; // frequency to poll pnode->vSend
1278
-
1279
- fd_set fdsetRecv;
1280
- fd_set fdsetSend;
1281
- fd_set fdsetError;
1282
- FD_ZERO (&fdsetRecv);
1283
- FD_ZERO (&fdsetSend);
1284
- FD_ZERO (&fdsetError);
1285
- SOCKET hSocketMax = 0 ;
1286
- bool have_fds = false ;
1274
+ fd_set fdsetRecv;
1275
+ fd_set fdsetSend;
1276
+ fd_set fdsetError;
1277
+ FD_ZERO (&fdsetRecv);
1278
+ FD_ZERO (&fdsetSend);
1279
+ FD_ZERO (&fdsetError);
1280
+ SOCKET hSocketMax = 0 ;
1281
+ bool have_fds = false ;
1287
1282
1288
- for (const ListenSocket& hListenSocket : vhListenSocket) {
1289
- FD_SET (hListenSocket.socket , &fdsetRecv);
1290
- hSocketMax = std::max (hSocketMax, hListenSocket.socket );
1291
- have_fds = true ;
1292
- }
1283
+ for (const ListenSocket& hListenSocket : vhListenSocket) {
1284
+ FD_SET (hListenSocket.socket , &fdsetRecv);
1285
+ hSocketMax = std::max (hSocketMax, hListenSocket.socket );
1286
+ have_fds = true ;
1287
+ }
1293
1288
1289
+ {
1290
+ LOCK (cs_vNodes);
1291
+ for (CNode* pnode : vNodes)
1294
1292
{
1295
- LOCK (cs_vNodes);
1296
- for (CNode* pnode : vNodes)
1293
+ // Implement the following logic:
1294
+ // * If there is data to send, select() for sending data. As this only
1295
+ // happens when optimistic write failed, we choose to first drain the
1296
+ // write buffer in this case before receiving more. This avoids
1297
+ // needlessly queueing received data, if the remote peer is not themselves
1298
+ // receiving data. This means properly utilizing TCP flow control signalling.
1299
+ // * Otherwise, if there is space left in the receive buffer, select() for
1300
+ // receiving data.
1301
+ // * Hand off all complete messages to the processor, to be handled without
1302
+ // blocking here.
1303
+
1304
+ bool select_recv = !pnode->fPauseRecv ;
1305
+ bool select_send;
1297
1306
{
1298
- // Implement the following logic:
1299
- // * If there is data to send, select() for sending data. As this only
1300
- // happens when optimistic write failed, we choose to first drain the
1301
- // write buffer in this case before receiving more. This avoids
1302
- // needlessly queueing received data, if the remote peer is not themselves
1303
- // receiving data. This means properly utilizing TCP flow control signalling.
1304
- // * Otherwise, if there is space left in the receive buffer, select() for
1305
- // receiving data.
1306
- // * Hand off all complete messages to the processor, to be handled without
1307
- // blocking here.
1308
-
1309
- bool select_recv = !pnode->fPauseRecv ;
1310
- bool select_send;
1311
- {
1312
- LOCK (pnode->cs_vSend );
1313
- select_send = !pnode->vSendMsg .empty ();
1314
- }
1307
+ LOCK (pnode->cs_vSend );
1308
+ select_send = !pnode->vSendMsg .empty ();
1309
+ }
1315
1310
1316
- LOCK (pnode->cs_hSocket );
1317
- if (pnode->hSocket == INVALID_SOCKET)
1318
- continue ;
1311
+ LOCK (pnode->cs_hSocket );
1312
+ if (pnode->hSocket == INVALID_SOCKET)
1313
+ continue ;
1319
1314
1320
- FD_SET (pnode->hSocket , &fdsetError);
1321
- hSocketMax = std::max (hSocketMax, pnode->hSocket );
1322
- have_fds = true ;
1315
+ FD_SET (pnode->hSocket , &fdsetError);
1316
+ hSocketMax = std::max (hSocketMax, pnode->hSocket );
1317
+ have_fds = true ;
1323
1318
1324
- if (select_send) {
1325
- FD_SET (pnode->hSocket , &fdsetSend);
1326
- continue ;
1327
- }
1328
- if (select_recv) {
1329
- FD_SET (pnode->hSocket , &fdsetRecv);
1330
- }
1319
+ if (select_send) {
1320
+ FD_SET (pnode->hSocket , &fdsetSend);
1321
+ continue ;
1322
+ }
1323
+ if (select_recv) {
1324
+ FD_SET (pnode->hSocket , &fdsetRecv);
1331
1325
}
1332
1326
}
1327
+ }
1333
1328
1334
- int nSelect = select (have_fds ? hSocketMax + 1 : 0 ,
1335
- &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1336
- if (interruptNet)
1337
- return ;
1329
+ int nSelect = select (have_fds ? hSocketMax + 1 : 0 ,
1330
+ &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1331
+ if (interruptNet)
1332
+ return ;
1338
1333
1339
- if (nSelect == SOCKET_ERROR)
1334
+ if (nSelect == SOCKET_ERROR)
1335
+ {
1336
+ if (have_fds)
1340
1337
{
1341
- if (have_fds)
1342
- {
1343
- int nErr = WSAGetLastError ();
1344
- LogPrintf (" socket select error %s\n " , NetworkErrorString (nErr));
1345
- for (unsigned int i = 0 ; i <= hSocketMax; i++)
1346
- FD_SET (i, &fdsetRecv);
1347
- }
1348
- FD_ZERO (&fdsetSend);
1349
- FD_ZERO (&fdsetError);
1350
- if (!interruptNet.sleep_for (std::chrono::milliseconds (timeout.tv_usec /1000 )))
1351
- return ;
1338
+ int nErr = WSAGetLastError ();
1339
+ LogPrintf (" socket select error %s\n " , NetworkErrorString (nErr));
1340
+ for (unsigned int i = 0 ; i <= hSocketMax; i++)
1341
+ FD_SET (i, &fdsetRecv);
1352
1342
}
1343
+ FD_ZERO (&fdsetSend);
1344
+ FD_ZERO (&fdsetError);
1345
+ if (!interruptNet.sleep_for (std::chrono::milliseconds (timeout.tv_usec /1000 )))
1346
+ return ;
1347
+ }
1353
1348
1354
- //
1355
- // Accept new connections
1356
- //
1357
- for (const ListenSocket& hListenSocket : vhListenSocket)
1349
+ //
1350
+ // Accept new connections
1351
+ //
1352
+ for (const ListenSocket& hListenSocket : vhListenSocket)
1353
+ {
1354
+ if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET (hListenSocket.socket , &fdsetRecv))
1358
1355
{
1359
- if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET (hListenSocket.socket , &fdsetRecv))
1360
- {
1361
- AcceptConnection (hListenSocket);
1362
- }
1356
+ AcceptConnection (hListenSocket);
1363
1357
}
1358
+ }
1359
+
1360
+ //
1361
+ // Service each socket
1362
+ //
1363
+ std::vector<CNode*> vNodesCopy;
1364
+ {
1365
+ LOCK (cs_vNodes);
1366
+ vNodesCopy = vNodes;
1367
+ for (CNode* pnode : vNodesCopy)
1368
+ pnode->AddRef ();
1369
+ }
1370
+ for (CNode* pnode : vNodesCopy)
1371
+ {
1372
+ if (interruptNet)
1373
+ return ;
1364
1374
1365
1375
//
1366
- // Service each socket
1376
+ // Receive
1367
1377
//
1368
- std::vector<CNode*> vNodesCopy;
1378
+ bool recvSet = false ;
1379
+ bool sendSet = false ;
1380
+ bool errorSet = false ;
1369
1381
{
1370
- LOCK (cs_vNodes);
1371
- vNodesCopy = vNodes;
1372
- for (CNode* pnode : vNodesCopy)
1373
- pnode->AddRef ();
1382
+ LOCK (pnode->cs_hSocket );
1383
+ if (pnode->hSocket == INVALID_SOCKET)
1384
+ continue ;
1385
+ recvSet = FD_ISSET (pnode->hSocket , &fdsetRecv);
1386
+ sendSet = FD_ISSET (pnode->hSocket , &fdsetSend);
1387
+ errorSet = FD_ISSET (pnode->hSocket , &fdsetError);
1374
1388
}
1375
- for (CNode* pnode : vNodesCopy )
1389
+ if (recvSet || errorSet )
1376
1390
{
1377
- if (interruptNet)
1378
- return ;
1379
-
1380
- //
1381
- // Receive
1382
- //
1383
- bool recvSet = false ;
1384
- bool sendSet = false ;
1385
- bool errorSet = false ;
1391
+ // typical socket buffer is 8K-64K
1392
+ char pchBuf[0x10000 ];
1393
+ int nBytes = 0 ;
1386
1394
{
1387
1395
LOCK (pnode->cs_hSocket );
1388
1396
if (pnode->hSocket == INVALID_SOCKET)
1389
1397
continue ;
1390
- recvSet = FD_ISSET (pnode->hSocket , &fdsetRecv);
1391
- sendSet = FD_ISSET (pnode->hSocket , &fdsetSend);
1392
- errorSet = FD_ISSET (pnode->hSocket , &fdsetError);
1398
+ nBytes = recv (pnode->hSocket , pchBuf, sizeof (pchBuf), MSG_DONTWAIT);
1393
1399
}
1394
- if (recvSet || errorSet )
1400
+ if (nBytes > 0 )
1395
1401
{
1396
- // typical socket buffer is 8K-64K
1397
- char pchBuf[0x10000 ];
1398
- int nBytes = 0 ;
1399
- {
1400
- LOCK (pnode->cs_hSocket );
1401
- if (pnode->hSocket == INVALID_SOCKET)
1402
- continue ;
1403
- nBytes = recv (pnode->hSocket , pchBuf, sizeof (pchBuf), MSG_DONTWAIT);
1404
- }
1405
- if (nBytes > 0 )
1406
- {
1407
- bool notify = false ;
1408
- if (!pnode->ReceiveMsgBytes (pchBuf, nBytes, notify))
1409
- pnode->CloseSocketDisconnect ();
1410
- RecordBytesRecv (nBytes);
1411
- if (notify) {
1412
- size_t nSizeAdded = 0 ;
1413
- auto it (pnode->vRecvMsg .begin ());
1414
- for (; it != pnode->vRecvMsg .end (); ++it) {
1415
- if (!it->complete ())
1416
- break ;
1417
- nSizeAdded += it->vRecv .size () + CMessageHeader::HEADER_SIZE;
1418
- }
1419
- {
1420
- LOCK (pnode->cs_vProcessMsg );
1421
- pnode->vProcessMsg .splice (pnode->vProcessMsg .end (), pnode->vRecvMsg , pnode->vRecvMsg .begin (), it);
1422
- pnode->nProcessQueueSize += nSizeAdded;
1423
- pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
1424
- }
1425
- WakeMessageHandler ();
1426
- }
1427
- }
1428
- else if (nBytes == 0 )
1429
- {
1430
- // socket closed gracefully
1431
- if (!pnode->fDisconnect ) {
1432
- LogPrint (BCLog::NET, " socket closed\n " );
1433
- }
1402
+ bool notify = false ;
1403
+ if (!pnode->ReceiveMsgBytes (pchBuf, nBytes, notify))
1434
1404
pnode->CloseSocketDisconnect ();
1435
- }
1436
- else if (nBytes < 0 )
1437
- {
1438
- // error
1439
- int nErr = WSAGetLastError ();
1440
- if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
1405
+ RecordBytesRecv (nBytes);
1406
+ if (notify) {
1407
+ size_t nSizeAdded = 0 ;
1408
+ auto it (pnode->vRecvMsg .begin ());
1409
+ for (; it != pnode->vRecvMsg .end (); ++it) {
1410
+ if (!it->complete ())
1411
+ break ;
1412
+ nSizeAdded += it->vRecv .size () + CMessageHeader::HEADER_SIZE;
1413
+ }
1441
1414
{
1442
- if (!pnode->fDisconnect )
1443
- LogPrintf (" socket recv error %s\n " , NetworkErrorString (nErr));
1444
- pnode->CloseSocketDisconnect ();
1415
+ LOCK (pnode->cs_vProcessMsg );
1416
+ pnode->vProcessMsg .splice (pnode->vProcessMsg .end (), pnode->vRecvMsg , pnode->vRecvMsg .begin (), it);
1417
+ pnode->nProcessQueueSize += nSizeAdded;
1418
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
1445
1419
}
1420
+ WakeMessageHandler ();
1446
1421
}
1447
1422
}
1448
-
1449
- //
1450
- // Send
1451
- //
1452
- if (sendSet)
1423
+ else if (nBytes == 0 )
1453
1424
{
1454
- LOCK (pnode->cs_vSend );
1455
- size_t nBytes = SocketSendData (pnode);
1456
- if (nBytes) {
1457
- RecordBytesSent (nBytes);
1425
+ // socket closed gracefully
1426
+ if (!pnode->fDisconnect ) {
1427
+ LogPrint (BCLog::NET, " socket closed\n " );
1428
+ }
1429
+ pnode->CloseSocketDisconnect ();
1430
+ }
1431
+ else if (nBytes < 0 )
1432
+ {
1433
+ // error
1434
+ int nErr = WSAGetLastError ();
1435
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
1436
+ {
1437
+ if (!pnode->fDisconnect )
1438
+ LogPrintf (" socket recv error %s\n " , NetworkErrorString (nErr));
1439
+ pnode->CloseSocketDisconnect ();
1458
1440
}
1459
1441
}
1460
-
1461
- InactivityCheck (pnode);
1462
1442
}
1443
+
1444
+ //
1445
+ // Send
1446
+ //
1447
+ if (sendSet)
1463
1448
{
1464
- LOCK (cs_vNodes);
1465
- for (CNode* pnode : vNodesCopy)
1466
- pnode->Release ();
1449
+ LOCK (pnode->cs_vSend );
1450
+ size_t nBytes = SocketSendData (pnode);
1451
+ if (nBytes) {
1452
+ RecordBytesSent (nBytes);
1453
+ }
1467
1454
}
1455
+
1456
+ InactivityCheck (pnode);
1457
+ }
1458
+ {
1459
+ LOCK (cs_vNodes);
1460
+ for (CNode* pnode : vNodesCopy)
1461
+ pnode->Release ();
1462
+ }
1463
+ }
1464
+
1465
+ void CConnman::ThreadSocketHandler ()
1466
+ {
1467
+ while (!interruptNet)
1468
+ {
1469
+ DisconnectNodes ();
1470
+ NotifyNumConnectionsChanged ();
1471
+ SocketHandler ();
1468
1472
}
1469
1473
}
1470
1474
0 commit comments