Skip to content

Commit 0985052

Browse files
committed
net: make net interruptible
Also now that net threads are interruptible, switch them to use std threads/binds/mutexes/condvars.
1 parent 799df91 commit 0985052

File tree

3 files changed

+91
-37
lines changed

3 files changed

+91
-37
lines changed

src/init.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup)
176176
InterruptRPC();
177177
InterruptREST();
178178
InterruptTorControl();
179+
if (g_connman)
180+
g_connman->Interrupt();
179181
threadGroup.interrupt_all();
180182
}
181183

@@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
15721574
connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe;
15731575
connOptions.nMaxOutboundLimit = nMaxOutboundLimit;
15741576

1575-
if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions))
1577+
if (!connman.Start(scheduler, strNodeError, connOptions))
15761578
return InitError(strNodeError);
15771579

15781580
// ********************************************************* Step 12: finished

src/net.cpp

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
10421042
void CConnman::ThreadSocketHandler()
10431043
{
10441044
unsigned int nPrevNodeCount = 0;
1045-
while (true)
1045+
while (!interruptNet)
10461046
{
10471047
//
10481048
// Disconnect nodes
@@ -1180,7 +1180,8 @@ void CConnman::ThreadSocketHandler()
11801180

11811181
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
11821182
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1183-
boost::this_thread::interruption_point();
1183+
if (interruptNet)
1184+
return;
11841185

11851186
if (nSelect == SOCKET_ERROR)
11861187
{
@@ -1193,7 +1194,8 @@ void CConnman::ThreadSocketHandler()
11931194
}
11941195
FD_ZERO(&fdsetSend);
11951196
FD_ZERO(&fdsetError);
1196-
MilliSleep(timeout.tv_usec/1000);
1197+
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
1198+
return;
11971199
}
11981200

11991201
//
@@ -1219,7 +1221,8 @@ void CConnman::ThreadSocketHandler()
12191221
}
12201222
BOOST_FOREACH(CNode* pnode, vNodesCopy)
12211223
{
1222-
boost::this_thread::interruption_point();
1224+
if (interruptNet)
1225+
return;
12231226

12241227
//
12251228
// Receive
@@ -1241,7 +1244,7 @@ void CConnman::ThreadSocketHandler()
12411244
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
12421245
pnode->CloseSocketDisconnect();
12431246
if(notify)
1244-
messageHandlerCondition.notify_one();
1247+
condMsgProc.notify_one();
12451248
pnode->nLastRecv = GetTime();
12461249
pnode->nRecvBytes += nBytes;
12471250
RecordBytesRecv(nBytes);
@@ -1469,7 +1472,8 @@ void CConnman::ThreadDNSAddressSeed()
14691472
// less influence on the network topology, and reduces traffic to the seeds.
14701473
if ((addrman.size() > 0) &&
14711474
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
1472-
MilliSleep(11 * 1000);
1475+
if (!interruptNet.sleep_for(std::chrono::seconds(11)))
1476+
return;
14731477

14741478
LOCK(cs_vNodes);
14751479
int nRelevant = 0;
@@ -1580,10 +1584,12 @@ void CConnman::ThreadOpenConnections()
15801584
OpenNetworkConnection(addr, false, NULL, strAddr.c_str());
15811585
for (int i = 0; i < 10 && i < nLoop; i++)
15821586
{
1583-
MilliSleep(500);
1587+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1588+
return;
15841589
}
15851590
}
1586-
MilliSleep(500);
1591+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1592+
return;
15871593
}
15881594
}
15891595

@@ -1592,14 +1598,16 @@ void CConnman::ThreadOpenConnections()
15921598

15931599
// Minimum time before next feeler connection (in microseconds).
15941600
int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
1595-
while (true)
1601+
while (!interruptNet)
15961602
{
15971603
ProcessOneShot();
15981604

1599-
MilliSleep(500);
1605+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1606+
return;
16001607

16011608
CSemaphoreGrant grant(*semOutbound);
1602-
boost::this_thread::interruption_point();
1609+
if (interruptNet)
1610+
return;
16031611

16041612
// Add seed nodes if DNS seeds are all down (an infrastructure attack?).
16051613
if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
@@ -1657,7 +1665,7 @@ void CConnman::ThreadOpenConnections()
16571665

16581666
int64_t nANow = GetAdjustedTime();
16591667
int nTries = 0;
1660-
while (true)
1668+
while (!interruptNet)
16611669
{
16621670
CAddrInfo addr = addrman.Select(fFeeler);
16631671

@@ -1700,7 +1708,8 @@ void CConnman::ThreadOpenConnections()
17001708
if (fFeeler) {
17011709
// Add small amount of random noise before connection to avoid synchronization.
17021710
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
1703-
MilliSleep(randsleep);
1711+
if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
1712+
return;
17041713
LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
17051714
}
17061715

@@ -1779,11 +1788,12 @@ void CConnman::ThreadOpenAddedConnections()
17791788
// OpenNetworkConnection can detect existing connections to that IP/port.
17801789
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
17811790
OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
1782-
MilliSleep(500);
1791+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1792+
return;
17831793
}
17841794
}
1785-
1786-
MilliSleep(120000); // Retry every 2 minutes
1795+
if (!interruptNet.sleep_for(std::chrono::minutes(2)))
1796+
return;
17871797
}
17881798
}
17891799

@@ -1793,7 +1803,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
17931803
//
17941804
// Initiate outbound network connection
17951805
//
1796-
boost::this_thread::interruption_point();
1806+
if (interruptNet) {
1807+
return false;
1808+
}
17971809
if (!fNetworkActive) {
17981810
return false;
17991811
}
@@ -1819,13 +1831,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
18191831
return true;
18201832
}
18211833

1822-
18231834
void CConnman::ThreadMessageHandler()
18241835
{
1825-
boost::mutex condition_mutex;
1826-
boost::unique_lock<boost::mutex> lock(condition_mutex);
1827-
1828-
while (true)
1836+
while (!flagInterruptMsgProc)
18291837
{
18301838
std::vector<CNode*> vNodesCopy;
18311839
{
@@ -1860,15 +1868,17 @@ void CConnman::ThreadMessageHandler()
18601868
}
18611869
}
18621870
}
1863-
boost::this_thread::interruption_point();
1871+
if (flagInterruptMsgProc)
1872+
return;
18641873

18651874
// Send messages
18661875
{
18671876
TRY_LOCK(pnode->cs_vSend, lockSend);
18681877
if (lockSend)
18691878
GetNodeSignals().SendMessages(pnode, *this);
18701879
}
1871-
boost::this_thread::interruption_point();
1880+
if (flagInterruptMsgProc)
1881+
return;
18721882
}
18731883

18741884
{
@@ -1877,8 +1887,10 @@ void CConnman::ThreadMessageHandler()
18771887
pnode->Release();
18781888
}
18791889

1880-
if (fSleep)
1881-
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100));
1890+
if (fSleep) {
1891+
std::unique_lock<std::mutex> lock(mutexMsgProc);
1892+
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
1893+
}
18821894
}
18831895
}
18841896

@@ -2070,14 +2082,15 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
20702082
nMaxOutbound = 0;
20712083
nBestHeight = 0;
20722084
clientInterface = NULL;
2085+
flagInterruptMsgProc = false;
20732086
}
20742087

20752088
NodeId CConnman::GetNewNodeId()
20762089
{
20772090
return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
20782091
}
20792092

2080-
bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions)
2093+
bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
20812094
{
20822095
nTotalBytesRecv = 0;
20832096
nTotalBytesSent = 0;
@@ -2144,24 +2157,26 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
21442157
//
21452158
// Start threads
21462159
//
2160+
interruptNet.reset();
2161+
flagInterruptMsgProc = false;
21472162

21482163
// Send and receive from sockets, accept connections
2149-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
2164+
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
21502165

21512166
if (!GetBoolArg("-dnsseed", true))
21522167
LogPrintf("DNS seeding disabled\n");
21532168
else
2154-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));
2169+
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
21552170

21562171
// Initiate outbound connections from -addnode
2157-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
2172+
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
21582173

21592174
// Initiate outbound connections unless connect=0
21602175
if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0")
2161-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
2176+
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));
21622177

21632178
// Process messages
2164-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
2179+
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
21652180

21662181
// Dump network addresses
21672182
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
@@ -2184,12 +2199,33 @@ class CNetCleanup
21842199
}
21852200
instance_of_cnetcleanup;
21862201

2187-
void CConnman::Stop()
2202+
void CConnman::Interrupt()
21882203
{
2189-
LogPrintf("%s\n",__func__);
2204+
{
2205+
std::lock_guard<std::mutex> lock(mutexMsgProc);
2206+
flagInterruptMsgProc = true;
2207+
}
2208+
condMsgProc.notify_all();
2209+
2210+
interruptNet();
2211+
21902212
if (semOutbound)
21912213
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
21922214
semOutbound->post();
2215+
}
2216+
2217+
void CConnman::Stop()
2218+
{
2219+
if (threadMessageHandler.joinable())
2220+
threadMessageHandler.join();
2221+
if (threadOpenConnections.joinable())
2222+
threadOpenConnections.join();
2223+
if (threadOpenAddedConnections.joinable())
2224+
threadOpenAddedConnections.join();
2225+
if (threadDNSAddressSeed.joinable())
2226+
threadDNSAddressSeed.join();
2227+
if (threadSocketHandler.joinable())
2228+
threadSocketHandler.join();
21932229

21942230
if (fAddressesInitialized)
21952231
{
@@ -2232,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode)
22322268

22332269
CConnman::~CConnman()
22342270
{
2271+
Interrupt();
22352272
Stop();
22362273
}
22372274

src/net.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
#include "streams.h"
2020
#include "sync.h"
2121
#include "uint256.h"
22+
#include "threadinterrupt.h"
2223

2324
#include <atomic>
2425
#include <deque>
2526
#include <stdint.h>
27+
#include <thread>
2628
#include <memory>
29+
#include <condition_variable>
2730

2831
#ifndef WIN32
2932
#include <arpa/inet.h>
@@ -142,8 +145,9 @@ class CConnman
142145
};
143146
CConnman(uint64_t seed0, uint64_t seed1);
144147
~CConnman();
145-
bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options);
148+
bool Start(CScheduler& scheduler, std::string& strNodeError, Options options);
146149
void Stop();
150+
void Interrupt();
147151
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
148152
bool GetNetworkActive() const { return fNetworkActive; };
149153
void SetNetworkActive(bool active);
@@ -402,7 +406,6 @@ class CConnman
402406
std::list<CNode*> vNodesDisconnected;
403407
mutable CCriticalSection cs_vNodes;
404408
std::atomic<NodeId> nLastNodeId;
405-
boost::condition_variable messageHandlerCondition;
406409

407410
/** Services this instance offers */
408411
ServiceFlags nLocalServices;
@@ -419,6 +422,18 @@ class CConnman
419422

420423
/** SipHasher seeds for deterministic randomness */
421424
const uint64_t nSeed0, nSeed1;
425+
426+
std::condition_variable condMsgProc;
427+
std::mutex mutexMsgProc;
428+
std::atomic<bool> flagInterruptMsgProc;
429+
430+
CThreadInterrupt interruptNet;
431+
432+
std::thread threadDNSAddressSeed;
433+
std::thread threadSocketHandler;
434+
std::thread threadOpenAddedConnections;
435+
std::thread threadOpenConnections;
436+
std::thread threadMessageHandler;
422437
};
423438
extern std::unique_ptr<CConnman> g_connman;
424439
void Discover(boost::thread_group& threadGroup);

0 commit comments

Comments
 (0)