Skip to content

Commit c59abe2

Browse files
committed
Use semaphores instead of condition variables
1 parent 2692ed3 commit c59abe2

File tree

3 files changed

+91
-47
lines changed

3 files changed

+91
-47
lines changed

src/net.cpp

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ void ThreadOpenAddedConnections2(void* parg);
3535
void ThreadMapPort2(void* parg);
3636
#endif
3737
void ThreadDNSAddressSeed2(void* parg);
38-
bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest = NULL, bool fOneShot = false);
38+
bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false);
3939

4040

4141

@@ -66,10 +66,7 @@ CCriticalSection cs_vOneShots;
6666
set<CNetAddr> setservAddNodeAddresses;
6767
CCriticalSection cs_setservAddNodeAddresses;
6868

69-
static CWaitableCriticalSection csOutbound;
70-
static int nOutbound = 0;
71-
static CConditionVariable condOutbound;
72-
69+
static CSemaphore *semOutbound = NULL;
7370

7471
void AddOneShot(string strDest)
7572
{
@@ -463,10 +460,6 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout)
463460
LOCK(cs_vNodes);
464461
vNodes.push_back(pnode);
465462
}
466-
{
467-
WAITABLE_LOCK(csOutbound);
468-
nOutbound++;
469-
}
470463

471464
pnode->nTimeConnected = GetTime();
472465
return pnode;
@@ -612,14 +605,8 @@ void ThreadSocketHandler2(void* parg)
612605
// remove from vNodes
613606
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
614607

615-
if (!pnode->fInbound)
616-
{
617-
WAITABLE_LOCK(csOutbound);
618-
nOutbound--;
619-
620-
// Connection slot(s) were removed, notify connection creator(s)
621-
NOTIFY(condOutbound);
622-
}
608+
// release outbound grant (if any)
609+
pnode->grantOutbound.Release();
623610

624611
// close socket and cleanup
625612
pnode->CloseSocketDisconnect();
@@ -1295,8 +1282,11 @@ void static ProcessOneShot()
12951282
vOneShots.pop_front();
12961283
}
12971284
CAddress addr;
1298-
if (!OpenNetworkConnection(addr, strDest.c_str(), true))
1299-
AddOneShot(strDest);
1285+
CSemaphoreGrant grant(*semOutbound, true);
1286+
if (grant) {
1287+
if (!OpenNetworkConnection(addr, &grant, strDest.c_str(), true))
1288+
AddOneShot(strDest);
1289+
}
13001290
}
13011291

13021292
void ThreadOpenConnections2(void* parg)
@@ -1312,7 +1302,7 @@ void ThreadOpenConnections2(void* parg)
13121302
BOOST_FOREACH(string strAddr, mapMultiArgs["-connect"])
13131303
{
13141304
CAddress addr;
1315-
OpenNetworkConnection(addr, strAddr.c_str());
1305+
OpenNetworkConnection(addr, NULL, strAddr.c_str());
13161306
for (int i = 0; i < 10 && i < nLoop; i++)
13171307
{
13181308
Sleep(500);
@@ -1335,13 +1325,9 @@ void ThreadOpenConnections2(void* parg)
13351325
if (fShutdown)
13361326
return;
13371327

1338-
// Limit outbound connections
1339-
int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
1328+
13401329
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
1341-
{
1342-
WAITABLE_LOCK(csOutbound);
1343-
WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
1344-
}
1330+
CSemaphoreGrant grant(*semOutbound);
13451331
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
13461332
if (fShutdown)
13471333
return;
@@ -1374,11 +1360,15 @@ void ThreadOpenConnections2(void* parg)
13741360

13751361
// Only connect to one address per a.b.?.? range.
13761362
// Do this here so we don't have to critsect vNodes inside mapAddresses critsect.
1363+
int nOutbound = 0;
13771364
set<vector<unsigned char> > setConnected;
13781365
{
13791366
LOCK(cs_vNodes);
1380-
BOOST_FOREACH(CNode* pnode, vNodes)
1367+
BOOST_FOREACH(CNode* pnode, vNodes) {
13811368
setConnected.insert(pnode->addr.GetGroup());
1369+
if (!pnode->fInbound)
1370+
nOutbound++;
1371+
}
13821372
}
13831373

13841374
int64 nANow = GetAdjustedTime();
@@ -1408,7 +1398,7 @@ void ThreadOpenConnections2(void* parg)
14081398
}
14091399

14101400
if (addrConnect.IsValid())
1411-
OpenNetworkConnection(addrConnect);
1401+
OpenNetworkConnection(addrConnect, &grant);
14121402
}
14131403
}
14141404

@@ -1442,7 +1432,8 @@ void ThreadOpenAddedConnections2(void* parg)
14421432
while(!fShutdown) {
14431433
BOOST_FOREACH(string& strAddNode, mapMultiArgs["-addnode"]) {
14441434
CAddress addr;
1445-
OpenNetworkConnection(addr, strAddNode.c_str());
1435+
CSemaphoreGrant grant(*semOutbound);
1436+
OpenNetworkConnection(addr, &grant, strAddNode.c_str());
14461437
Sleep(500);
14471438
}
14481439
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
@@ -1485,7 +1476,8 @@ void ThreadOpenAddedConnections2(void* parg)
14851476
}
14861477
BOOST_FOREACH(vector<CService>& vserv, vservConnectAddresses)
14871478
{
1488-
OpenNetworkConnection(CAddress(*(vserv.begin())));
1479+
CSemaphoreGrant grant(*semOutbound);
1480+
OpenNetworkConnection(CAddress(*(vserv.begin())), &grant);
14891481
Sleep(500);
14901482
if (fShutdown)
14911483
return;
@@ -1500,7 +1492,8 @@ void ThreadOpenAddedConnections2(void* parg)
15001492
}
15011493
}
15021494

1503-
bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest, bool fOneShot)
1495+
// if succesful, this moves the passed grant to the constructed node
1496+
bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *strDest, bool fOneShot)
15041497
{
15051498
//
15061499
// Initiate outbound network connection
@@ -1522,6 +1515,8 @@ bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest, boo
15221515
return false;
15231516
if (!pnode)
15241517
return false;
1518+
if (grantOutbound)
1519+
grantOutbound->MoveTo(pnode->grantOutbound);
15251520
pnode->fNetworkNode = true;
15261521
if (fOneShot)
15271522
pnode->fOneShot = true;
@@ -1770,6 +1765,12 @@ void StartNode(void* parg)
17701765
#endif
17711766
#endif
17721767

1768+
if (semOutbound == NULL) {
1769+
// initialize semaphore
1770+
int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
1771+
semOutbound = new CSemaphore(nMaxOutbound);
1772+
}
1773+
17731774
if (pnodeLocalHost == NULL)
17741775
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices));
17751776

@@ -1823,7 +1824,8 @@ bool StopNode()
18231824
fShutdown = true;
18241825
nTransactionsUpdated++;
18251826
int64 nStart = GetTime();
1826-
NOTIFY_ALL(condOutbound);
1827+
for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
1828+
semOutbound->post();
18271829
do
18281830
{
18291831
int nThreadsRunning = 0;

src/net.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ class CNode
147147
bool fNetworkNode;
148148
bool fSuccessfullyConnected;
149149
bool fDisconnect;
150+
CSemaphoreGrant grantOutbound;
150151
protected:
151152
int nRefCount;
152153

src/util.h

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ typedef int pid_t; /* define for windows compatiblity */
2323
#include <boost/filesystem/path.hpp>
2424
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
2525
#include <boost/interprocess/sync/scoped_lock.hpp>
26-
#include <boost/interprocess/sync/interprocess_condition.hpp>
26+
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
2727
#include <boost/interprocess/sync/lock_options.hpp>
2828
#include <boost/date_time/gregorian/gregorian_types.hpp>
2929
#include <boost/date_time/posix_time/posix_time_types.hpp>
@@ -275,24 +275,10 @@ class CMutexLock
275275
};
276276

277277
typedef CMutexLock<CCriticalSection> CCriticalBlock;
278-
typedef CMutexLock<CWaitableCriticalSection> CWaitableCriticalBlock;
279-
typedef boost::interprocess::interprocess_condition CConditionVariable;
280-
281-
/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */
282-
#define WAIT(name,condition) \
283-
do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0)
284-
285-
/** Notify waiting threads that a condition may hold now */
286-
#define NOTIFY(name) \
287-
do { (name).notify_one(); } while(0)
288-
289-
#define NOTIFY_ALL(name) \
290-
do { (name).notify_all(); } while(0)
291278

292279
#define LOCK(cs) CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__)
293280
#define LOCK2(cs1,cs2) CCriticalBlock criticalblock1(cs1, #cs1, __FILE__, __LINE__),criticalblock2(cs2, #cs2, __FILE__, __LINE__)
294281
#define TRY_LOCK(cs,name) CCriticalBlock name(cs, #cs, __FILE__, __LINE__, true)
295-
#define WAITABLE_LOCK(cs) CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__)
296282

297283
#define ENTER_CRITICAL_SECTION(cs) \
298284
{ \
@@ -306,6 +292,61 @@ typedef boost::interprocess::interprocess_condition CConditionVariable;
306292
LeaveCritical(); \
307293
}
308294

295+
typedef boost::interprocess::interprocess_semaphore CSemaphore;
296+
297+
/** RAII-style semaphore lock */
298+
class CSemaphoreGrant
299+
{
300+
private:
301+
CSemaphore *sem;
302+
bool fHaveGrant;
303+
304+
public:
305+
void Acquire() {
306+
if (fHaveGrant)
307+
return;
308+
sem->wait();
309+
fHaveGrant = true;
310+
}
311+
312+
void Release() {
313+
if (!fHaveGrant)
314+
return;
315+
sem->post();
316+
fHaveGrant = false;
317+
}
318+
319+
bool TryAcquire() {
320+
if (!fHaveGrant && sem->try_wait())
321+
fHaveGrant = true;
322+
return fHaveGrant;
323+
}
324+
325+
void MoveTo(CSemaphoreGrant &grant) {
326+
grant.Release();
327+
grant.sem = sem;
328+
grant.fHaveGrant = fHaveGrant;
329+
sem = NULL;
330+
fHaveGrant = false;
331+
}
332+
333+
CSemaphoreGrant() : sem(NULL), fHaveGrant(false) {}
334+
335+
CSemaphoreGrant(CSemaphore &sema, bool fTry = false) : sem(&sema), fHaveGrant(false) {
336+
if (fTry)
337+
TryAcquire();
338+
else
339+
Acquire();
340+
}
341+
342+
~CSemaphoreGrant() {
343+
Release();
344+
}
345+
346+
operator bool() {
347+
return fHaveGrant;
348+
}
349+
};
309350

310351
inline std::string i64tostr(int64 n)
311352
{

0 commit comments

Comments
 (0)