Skip to content

Commit 032488e

Browse files
committed
Move SocketHandler logic to private method.
1 parent 2af9cff commit 032488e

File tree

2 files changed

+167
-162
lines changed

2 files changed

+167
-162
lines changed

src/net.cpp

Lines changed: 166 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,209 +1262,213 @@ void CConnman::InactivityCheck(CNode *pnode)
12621262
}
12631263
}
12641264

1265-
void CConnman::ThreadSocketHandler()
1265+
void CConnman::SocketHandler()
12661266
{
1267-
while (!interruptNet)
1268-
{
1269-
DisconnectNodes();
1270-
NotifyNumConnectionsChanged();
1267+
//
1268+
// Find which sockets have data to receive
1269+
//
1270+
struct timeval timeout;
1271+
timeout.tv_sec = 0;
1272+
timeout.tv_usec = 50000; // frequency to poll pnode->vSend
12711273

1272-
//
1273-
// Find which sockets have data to receive
1274-
//
1275-
struct timeval timeout;
1276-
timeout.tv_sec = 0;
1277-
timeout.tv_usec = 50000; // frequency to poll pnode->vSend
1278-
1279-
fd_set fdsetRecv;
1280-
fd_set fdsetSend;
1281-
fd_set fdsetError;
1282-
FD_ZERO(&fdsetRecv);
1283-
FD_ZERO(&fdsetSend);
1284-
FD_ZERO(&fdsetError);
1285-
SOCKET hSocketMax = 0;
1286-
bool have_fds = false;
1274+
fd_set fdsetRecv;
1275+
fd_set fdsetSend;
1276+
fd_set fdsetError;
1277+
FD_ZERO(&fdsetRecv);
1278+
FD_ZERO(&fdsetSend);
1279+
FD_ZERO(&fdsetError);
1280+
SOCKET hSocketMax = 0;
1281+
bool have_fds = false;
12871282

1288-
for (const ListenSocket& hListenSocket : vhListenSocket) {
1289-
FD_SET(hListenSocket.socket, &fdsetRecv);
1290-
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
1291-
have_fds = true;
1292-
}
1283+
for (const ListenSocket& hListenSocket : vhListenSocket) {
1284+
FD_SET(hListenSocket.socket, &fdsetRecv);
1285+
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
1286+
have_fds = true;
1287+
}
12931288

1289+
{
1290+
LOCK(cs_vNodes);
1291+
for (CNode* pnode : vNodes)
12941292
{
1295-
LOCK(cs_vNodes);
1296-
for (CNode* pnode : vNodes)
1293+
// Implement the following logic:
1294+
// * If there is data to send, select() for sending data. As this only
1295+
// happens when optimistic write failed, we choose to first drain the
1296+
// write buffer in this case before receiving more. This avoids
1297+
// needlessly queueing received data, if the remote peer is not themselves
1298+
// receiving data. This means properly utilizing TCP flow control signalling.
1299+
// * Otherwise, if there is space left in the receive buffer, select() for
1300+
// receiving data.
1301+
// * Hand off all complete messages to the processor, to be handled without
1302+
// blocking here.
1303+
1304+
bool select_recv = !pnode->fPauseRecv;
1305+
bool select_send;
12971306
{
1298-
// Implement the following logic:
1299-
// * If there is data to send, select() for sending data. As this only
1300-
// happens when optimistic write failed, we choose to first drain the
1301-
// write buffer in this case before receiving more. This avoids
1302-
// needlessly queueing received data, if the remote peer is not themselves
1303-
// receiving data. This means properly utilizing TCP flow control signalling.
1304-
// * Otherwise, if there is space left in the receive buffer, select() for
1305-
// receiving data.
1306-
// * Hand off all complete messages to the processor, to be handled without
1307-
// blocking here.
1308-
1309-
bool select_recv = !pnode->fPauseRecv;
1310-
bool select_send;
1311-
{
1312-
LOCK(pnode->cs_vSend);
1313-
select_send = !pnode->vSendMsg.empty();
1314-
}
1307+
LOCK(pnode->cs_vSend);
1308+
select_send = !pnode->vSendMsg.empty();
1309+
}
13151310

1316-
LOCK(pnode->cs_hSocket);
1317-
if (pnode->hSocket == INVALID_SOCKET)
1318-
continue;
1311+
LOCK(pnode->cs_hSocket);
1312+
if (pnode->hSocket == INVALID_SOCKET)
1313+
continue;
13191314

1320-
FD_SET(pnode->hSocket, &fdsetError);
1321-
hSocketMax = std::max(hSocketMax, pnode->hSocket);
1322-
have_fds = true;
1315+
FD_SET(pnode->hSocket, &fdsetError);
1316+
hSocketMax = std::max(hSocketMax, pnode->hSocket);
1317+
have_fds = true;
13231318

1324-
if (select_send) {
1325-
FD_SET(pnode->hSocket, &fdsetSend);
1326-
continue;
1327-
}
1328-
if (select_recv) {
1329-
FD_SET(pnode->hSocket, &fdsetRecv);
1330-
}
1319+
if (select_send) {
1320+
FD_SET(pnode->hSocket, &fdsetSend);
1321+
continue;
1322+
}
1323+
if (select_recv) {
1324+
FD_SET(pnode->hSocket, &fdsetRecv);
13311325
}
13321326
}
1327+
}
13331328

1334-
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
1335-
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1336-
if (interruptNet)
1337-
return;
1329+
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
1330+
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1331+
if (interruptNet)
1332+
return;
13381333

1339-
if (nSelect == SOCKET_ERROR)
1334+
if (nSelect == SOCKET_ERROR)
1335+
{
1336+
if (have_fds)
13401337
{
1341-
if (have_fds)
1342-
{
1343-
int nErr = WSAGetLastError();
1344-
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
1345-
for (unsigned int i = 0; i <= hSocketMax; i++)
1346-
FD_SET(i, &fdsetRecv);
1347-
}
1348-
FD_ZERO(&fdsetSend);
1349-
FD_ZERO(&fdsetError);
1350-
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
1351-
return;
1338+
int nErr = WSAGetLastError();
1339+
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
1340+
for (unsigned int i = 0; i <= hSocketMax; i++)
1341+
FD_SET(i, &fdsetRecv);
13521342
}
1343+
FD_ZERO(&fdsetSend);
1344+
FD_ZERO(&fdsetError);
1345+
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
1346+
return;
1347+
}
13531348

1354-
//
1355-
// Accept new connections
1356-
//
1357-
for (const ListenSocket& hListenSocket : vhListenSocket)
1349+
//
1350+
// Accept new connections
1351+
//
1352+
for (const ListenSocket& hListenSocket : vhListenSocket)
1353+
{
1354+
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
13581355
{
1359-
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
1360-
{
1361-
AcceptConnection(hListenSocket);
1362-
}
1356+
AcceptConnection(hListenSocket);
13631357
}
1358+
}
1359+
1360+
//
1361+
// Service each socket
1362+
//
1363+
std::vector<CNode*> vNodesCopy;
1364+
{
1365+
LOCK(cs_vNodes);
1366+
vNodesCopy = vNodes;
1367+
for (CNode* pnode : vNodesCopy)
1368+
pnode->AddRef();
1369+
}
1370+
for (CNode* pnode : vNodesCopy)
1371+
{
1372+
if (interruptNet)
1373+
return;
13641374

13651375
//
1366-
// Service each socket
1376+
// Receive
13671377
//
1368-
std::vector<CNode*> vNodesCopy;
1378+
bool recvSet = false;
1379+
bool sendSet = false;
1380+
bool errorSet = false;
13691381
{
1370-
LOCK(cs_vNodes);
1371-
vNodesCopy = vNodes;
1372-
for (CNode* pnode : vNodesCopy)
1373-
pnode->AddRef();
1382+
LOCK(pnode->cs_hSocket);
1383+
if (pnode->hSocket == INVALID_SOCKET)
1384+
continue;
1385+
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
1386+
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
1387+
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
13741388
}
1375-
for (CNode* pnode : vNodesCopy)
1389+
if (recvSet || errorSet)
13761390
{
1377-
if (interruptNet)
1378-
return;
1379-
1380-
//
1381-
// Receive
1382-
//
1383-
bool recvSet = false;
1384-
bool sendSet = false;
1385-
bool errorSet = false;
1391+
// typical socket buffer is 8K-64K
1392+
char pchBuf[0x10000];
1393+
int nBytes = 0;
13861394
{
13871395
LOCK(pnode->cs_hSocket);
13881396
if (pnode->hSocket == INVALID_SOCKET)
13891397
continue;
1390-
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
1391-
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
1392-
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
1398+
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
13931399
}
1394-
if (recvSet || errorSet)
1400+
if (nBytes > 0)
13951401
{
1396-
// typical socket buffer is 8K-64K
1397-
char pchBuf[0x10000];
1398-
int nBytes = 0;
1399-
{
1400-
LOCK(pnode->cs_hSocket);
1401-
if (pnode->hSocket == INVALID_SOCKET)
1402-
continue;
1403-
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
1404-
}
1405-
if (nBytes > 0)
1406-
{
1407-
bool notify = false;
1408-
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
1409-
pnode->CloseSocketDisconnect();
1410-
RecordBytesRecv(nBytes);
1411-
if (notify) {
1412-
size_t nSizeAdded = 0;
1413-
auto it(pnode->vRecvMsg.begin());
1414-
for (; it != pnode->vRecvMsg.end(); ++it) {
1415-
if (!it->complete())
1416-
break;
1417-
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
1418-
}
1419-
{
1420-
LOCK(pnode->cs_vProcessMsg);
1421-
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
1422-
pnode->nProcessQueueSize += nSizeAdded;
1423-
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
1424-
}
1425-
WakeMessageHandler();
1426-
}
1427-
}
1428-
else if (nBytes == 0)
1429-
{
1430-
// socket closed gracefully
1431-
if (!pnode->fDisconnect) {
1432-
LogPrint(BCLog::NET, "socket closed\n");
1433-
}
1402+
bool notify = false;
1403+
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
14341404
pnode->CloseSocketDisconnect();
1435-
}
1436-
else if (nBytes < 0)
1437-
{
1438-
// error
1439-
int nErr = WSAGetLastError();
1440-
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
1405+
RecordBytesRecv(nBytes);
1406+
if (notify) {
1407+
size_t nSizeAdded = 0;
1408+
auto it(pnode->vRecvMsg.begin());
1409+
for (; it != pnode->vRecvMsg.end(); ++it) {
1410+
if (!it->complete())
1411+
break;
1412+
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
1413+
}
14411414
{
1442-
if (!pnode->fDisconnect)
1443-
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
1444-
pnode->CloseSocketDisconnect();
1415+
LOCK(pnode->cs_vProcessMsg);
1416+
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
1417+
pnode->nProcessQueueSize += nSizeAdded;
1418+
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
14451419
}
1420+
WakeMessageHandler();
14461421
}
14471422
}
1448-
1449-
//
1450-
// Send
1451-
//
1452-
if (sendSet)
1423+
else if (nBytes == 0)
14531424
{
1454-
LOCK(pnode->cs_vSend);
1455-
size_t nBytes = SocketSendData(pnode);
1456-
if (nBytes) {
1457-
RecordBytesSent(nBytes);
1425+
// socket closed gracefully
1426+
if (!pnode->fDisconnect) {
1427+
LogPrint(BCLog::NET, "socket closed\n");
1428+
}
1429+
pnode->CloseSocketDisconnect();
1430+
}
1431+
else if (nBytes < 0)
1432+
{
1433+
// error
1434+
int nErr = WSAGetLastError();
1435+
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
1436+
{
1437+
if (!pnode->fDisconnect)
1438+
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
1439+
pnode->CloseSocketDisconnect();
14581440
}
14591441
}
1460-
1461-
InactivityCheck(pnode);
14621442
}
1443+
1444+
//
1445+
// Send
1446+
//
1447+
if (sendSet)
14631448
{
1464-
LOCK(cs_vNodes);
1465-
for (CNode* pnode : vNodesCopy)
1466-
pnode->Release();
1449+
LOCK(pnode->cs_vSend);
1450+
size_t nBytes = SocketSendData(pnode);
1451+
if (nBytes) {
1452+
RecordBytesSent(nBytes);
1453+
}
14671454
}
1455+
1456+
InactivityCheck(pnode);
1457+
}
1458+
{
1459+
LOCK(cs_vNodes);
1460+
for (CNode* pnode : vNodesCopy)
1461+
pnode->Release();
1462+
}
1463+
}
1464+
1465+
void CConnman::ThreadSocketHandler()
1466+
{
1467+
while (!interruptNet)
1468+
{
1469+
DisconnectNodes();
1470+
NotifyNumConnectionsChanged();
1471+
SocketHandler();
14681472
}
14691473
}
14701474

src/net.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ class CConnman
339339
void DisconnectNodes();
340340
void NotifyNumConnectionsChanged();
341341
void InactivityCheck(CNode *pnode);
342+
void SocketHandler();
342343
void ThreadSocketHandler();
343344
void ThreadDNSAddressSeed();
344345

0 commit comments

Comments
 (0)