Skip to content

Commit d9ae1ce

Browse files
committed
Merge #9289: net: drop boost::thread_group
67ee4ec net: misc header cleanups (Cory Fields) 8b3159e net: make proxy receives interruptible (Cory Fields) 5cb0fce net: remove thread_interrupted catch (Cory Fields) d3d7056 net: make net processing interruptible (Cory Fields) 0985052 net: make net interruptible (Cory Fields) 799df91 net: add CThreadInterrupt and InterruptibleSleep (Cory Fields) 7325b15 net: a few small cleanups before replacing boost threads (Cory Fields)
2 parents c0ddd32 + 67ee4ec commit d9ae1ce

File tree

11 files changed

+222
-81
lines changed

11 files changed

+222
-81
lines changed

src/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ BITCOIN_CORE_H = \
138138
support/lockedpool.h \
139139
sync.h \
140140
threadsafety.h \
141+
threadinterrupt.h \
141142
timedata.h \
142143
torcontrol.h \
143144
txdb.h \
@@ -327,6 +328,7 @@ libbitcoin_util_a_SOURCES = \
327328
rpc/protocol.cpp \
328329
support/cleanse.cpp \
329330
sync.cpp \
331+
threadinterrupt.cpp \
330332
util.cpp \
331333
utilmoneystr.cpp \
332334
utilstrencodings.cpp \

src/init.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup)
176176
InterruptRPC();
177177
InterruptREST();
178178
InterruptTorControl();
179+
if (g_connman)
180+
g_connman->Interrupt();
179181
threadGroup.interrupt_all();
180182
}
181183

@@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
15721574
connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe;
15731575
connOptions.nMaxOutboundLimit = nMaxOutboundLimit;
15741576

1575-
if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions))
1577+
if (!connman.Start(scheduler, strNodeError, connOptions))
15761578
return InitError(strNodeError);
15771579

15781580
// ********************************************************* Step 12: finished

src/net.cpp

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
#include <miniupnpc/upnperrors.h>
3636
#endif
3737

38-
#include <boost/filesystem.hpp>
39-
#include <boost/thread.hpp>
4038

4139
#include <math.h>
4240

@@ -1042,7 +1040,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
10421040
void CConnman::ThreadSocketHandler()
10431041
{
10441042
unsigned int nPrevNodeCount = 0;
1045-
while (true)
1043+
while (!interruptNet)
10461044
{
10471045
//
10481046
// Disconnect nodes
@@ -1180,7 +1178,8 @@ void CConnman::ThreadSocketHandler()
11801178

11811179
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
11821180
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1183-
boost::this_thread::interruption_point();
1181+
if (interruptNet)
1182+
return;
11841183

11851184
if (nSelect == SOCKET_ERROR)
11861185
{
@@ -1193,7 +1192,8 @@ void CConnman::ThreadSocketHandler()
11931192
}
11941193
FD_ZERO(&fdsetSend);
11951194
FD_ZERO(&fdsetError);
1196-
MilliSleep(timeout.tv_usec/1000);
1195+
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
1196+
return;
11971197
}
11981198

11991199
//
@@ -1219,7 +1219,8 @@ void CConnman::ThreadSocketHandler()
12191219
}
12201220
BOOST_FOREACH(CNode* pnode, vNodesCopy)
12211221
{
1222-
boost::this_thread::interruption_point();
1222+
if (interruptNet)
1223+
return;
12231224

12241225
//
12251226
// Receive
@@ -1241,7 +1242,7 @@ void CConnman::ThreadSocketHandler()
12411242
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
12421243
pnode->CloseSocketDisconnect();
12431244
if(notify)
1244-
messageHandlerCondition.notify_one();
1245+
condMsgProc.notify_one();
12451246
pnode->nLastRecv = GetTime();
12461247
pnode->nRecvBytes += nBytes;
12471248
RecordBytesRecv(nBytes);
@@ -1469,7 +1470,8 @@ void CConnman::ThreadDNSAddressSeed()
14691470
// less influence on the network topology, and reduces traffic to the seeds.
14701471
if ((addrman.size() > 0) &&
14711472
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
1472-
MilliSleep(11 * 1000);
1473+
if (!interruptNet.sleep_for(std::chrono::seconds(11)))
1474+
return;
14731475

14741476
LOCK(cs_vNodes);
14751477
int nRelevant = 0;
@@ -1580,10 +1582,12 @@ void CConnman::ThreadOpenConnections()
15801582
OpenNetworkConnection(addr, false, NULL, strAddr.c_str());
15811583
for (int i = 0; i < 10 && i < nLoop; i++)
15821584
{
1583-
MilliSleep(500);
1585+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1586+
return;
15841587
}
15851588
}
1586-
MilliSleep(500);
1589+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1590+
return;
15871591
}
15881592
}
15891593

@@ -1592,14 +1596,16 @@ void CConnman::ThreadOpenConnections()
15921596

15931597
// Minimum time before next feeler connection (in microseconds).
15941598
int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
1595-
while (true)
1599+
while (!interruptNet)
15961600
{
15971601
ProcessOneShot();
15981602

1599-
MilliSleep(500);
1603+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1604+
return;
16001605

16011606
CSemaphoreGrant grant(*semOutbound);
1602-
boost::this_thread::interruption_point();
1607+
if (interruptNet)
1608+
return;
16031609

16041610
// Add seed nodes if DNS seeds are all down (an infrastructure attack?).
16051611
if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
@@ -1657,7 +1663,7 @@ void CConnman::ThreadOpenConnections()
16571663

16581664
int64_t nANow = GetAdjustedTime();
16591665
int nTries = 0;
1660-
while (true)
1666+
while (!interruptNet)
16611667
{
16621668
CAddrInfo addr = addrman.Select(fFeeler);
16631669

@@ -1700,7 +1706,8 @@ void CConnman::ThreadOpenConnections()
17001706
if (fFeeler) {
17011707
// Add small amount of random noise before connection to avoid synchronization.
17021708
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
1703-
MilliSleep(randsleep);
1709+
if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
1710+
return;
17041711
LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
17051712
}
17061713

@@ -1779,11 +1786,12 @@ void CConnman::ThreadOpenAddedConnections()
17791786
// OpenNetworkConnection can detect existing connections to that IP/port.
17801787
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
17811788
OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
1782-
MilliSleep(500);
1789+
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
1790+
return;
17831791
}
17841792
}
1785-
1786-
MilliSleep(120000); // Retry every 2 minutes
1793+
if (!interruptNet.sleep_for(std::chrono::minutes(2)))
1794+
return;
17871795
}
17881796
}
17891797

@@ -1793,7 +1801,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
17931801
//
17941802
// Initiate outbound network connection
17951803
//
1796-
boost::this_thread::interruption_point();
1804+
if (interruptNet) {
1805+
return false;
1806+
}
17971807
if (!fNetworkActive) {
17981808
return false;
17991809
}
@@ -1806,7 +1816,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
18061816
return false;
18071817

18081818
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure);
1809-
boost::this_thread::interruption_point();
18101819

18111820
if (!pnode)
18121821
return false;
@@ -1820,13 +1829,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
18201829
return true;
18211830
}
18221831

1823-
18241832
void CConnman::ThreadMessageHandler()
18251833
{
1826-
boost::mutex condition_mutex;
1827-
boost::unique_lock<boost::mutex> lock(condition_mutex);
1828-
1829-
while (true)
1834+
while (!flagInterruptMsgProc)
18301835
{
18311836
std::vector<CNode*> vNodesCopy;
18321837
{
@@ -1849,7 +1854,7 @@ void CConnman::ThreadMessageHandler()
18491854
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
18501855
if (lockRecv)
18511856
{
1852-
if (!GetNodeSignals().ProcessMessages(pnode, *this))
1857+
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
18531858
pnode->CloseSocketDisconnect();
18541859

18551860
if (pnode->nSendSize < GetSendBufferSize())
@@ -1861,15 +1866,17 @@ void CConnman::ThreadMessageHandler()
18611866
}
18621867
}
18631868
}
1864-
boost::this_thread::interruption_point();
1869+
if (flagInterruptMsgProc)
1870+
return;
18651871

18661872
// Send messages
18671873
{
18681874
TRY_LOCK(pnode->cs_vSend, lockSend);
18691875
if (lockSend)
1870-
GetNodeSignals().SendMessages(pnode, *this);
1876+
GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
18711877
}
1872-
boost::this_thread::interruption_point();
1878+
if (flagInterruptMsgProc)
1879+
return;
18731880
}
18741881

18751882
{
@@ -1878,8 +1885,10 @@ void CConnman::ThreadMessageHandler()
18781885
pnode->Release();
18791886
}
18801887

1881-
if (fSleep)
1882-
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100));
1888+
if (fSleep) {
1889+
std::unique_lock<std::mutex> lock(mutexMsgProc);
1890+
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
1891+
}
18831892
}
18841893
}
18851894

@@ -2071,14 +2080,15 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
20712080
nMaxOutbound = 0;
20722081
nBestHeight = 0;
20732082
clientInterface = NULL;
2083+
flagInterruptMsgProc = false;
20742084
}
20752085

20762086
NodeId CConnman::GetNewNodeId()
20772087
{
20782088
return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
20792089
}
20802090

2081-
bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions)
2091+
bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
20822092
{
20832093
nTotalBytesRecv = 0;
20842094
nTotalBytesSent = 0;
@@ -2145,24 +2155,27 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
21452155
//
21462156
// Start threads
21472157
//
2158+
InterruptSocks5(false);
2159+
interruptNet.reset();
2160+
flagInterruptMsgProc = false;
2161+
2162+
// Send and receive from sockets, accept connections
2163+
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
21482164

21492165
if (!GetBoolArg("-dnsseed", true))
21502166
LogPrintf("DNS seeding disabled\n");
21512167
else
2152-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));
2153-
2154-
// Send and receive from sockets, accept connections
2155-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
2168+
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
21562169

21572170
// Initiate outbound connections from -addnode
2158-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
2171+
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
21592172

21602173
// Initiate outbound connections unless connect=0
21612174
if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0")
2162-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
2175+
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));
21632176

21642177
// Process messages
2165-
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
2178+
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
21662179

21672180
// Dump network addresses
21682181
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
@@ -2185,12 +2198,34 @@ class CNetCleanup
21852198
}
21862199
instance_of_cnetcleanup;
21872200

2188-
void CConnman::Stop()
2201+
void CConnman::Interrupt()
21892202
{
2190-
LogPrintf("%s\n",__func__);
2203+
{
2204+
std::lock_guard<std::mutex> lock(mutexMsgProc);
2205+
flagInterruptMsgProc = true;
2206+
}
2207+
condMsgProc.notify_all();
2208+
2209+
interruptNet();
2210+
InterruptSocks5(true);
2211+
21912212
if (semOutbound)
21922213
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
21932214
semOutbound->post();
2215+
}
2216+
2217+
void CConnman::Stop()
2218+
{
2219+
if (threadMessageHandler.joinable())
2220+
threadMessageHandler.join();
2221+
if (threadOpenConnections.joinable())
2222+
threadOpenConnections.join();
2223+
if (threadOpenAddedConnections.joinable())
2224+
threadOpenAddedConnections.join();
2225+
if (threadDNSAddressSeed.joinable())
2226+
threadDNSAddressSeed.join();
2227+
if (threadSocketHandler.joinable())
2228+
threadSocketHandler.join();
21942229

21952230
if (fAddressesInitialized)
21962231
{
@@ -2233,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode)
22332268

22342269
CConnman::~CConnman()
22352270
{
2271+
Interrupt();
22362272
Stop();
22372273
}
22382274

0 commit comments

Comments
 (0)