Skip to content

Commit b8c3b48

Browse files
committed
refactor: introduce WakeupPipe, move wakeup select pipe logic there
1 parent ed7d976 commit b8c3b48

File tree

5 files changed

+193
-94
lines changed

5 files changed

+193
-94
lines changed

src/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ BITCOIN_CORE_H = \
363363
util/ui_change_type.h \
364364
util/url.h \
365365
util/vector.h \
366+
util/wpipe.h \
366367
validation.h \
367368
validationinterface.h \
368369
versionbits.h \
@@ -797,6 +798,7 @@ libbitcoin_util_a_SOURCES = \
797798
util/thread.cpp \
798799
util/threadnames.cpp \
799800
util/tokenpipe.cpp \
801+
util/wpipe.cpp \
800802
$(BITCOIN_CORE_H)
801803

802804
if USE_LIBEVENT

src/net.cpp

Lines changed: 41 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <util/thread.h>
3030
#include <util/time.h>
3131
#include <util/translation.h>
32+
#include <util/wpipe.h>
3233
#include <validation.h> // for fDIP0001ActiveAtTip
3334

3435
#include <masternode/meta.h>
@@ -119,7 +120,7 @@ static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50;
119120
// We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in
120121
// the socket handler thread
121122
static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500;
122-
#endif
123+
#endif /* USE_WAKEUP_PIPE */
123124

124125
const std::string NET_MESSAGE_COMMAND_OTHER = "*other*";
125126

@@ -1284,7 +1285,9 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
12841285
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
12851286
}
12861287
}
1287-
WakeSelect();
1288+
if (m_wakeup_pipe) {
1289+
m_wakeup_pipe->Write();
1290+
}
12881291
}
12891292

12901293
// We received a new connection, harvest entropy from the time (and our peer count)
@@ -1569,14 +1572,14 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
15691572
}
15701573
}
15711574

1572-
#ifdef USE_WAKEUP_PIPE
1573-
// We add a pipe to the read set so that the select() call can be woken up from the outside
1574-
// This is done when data is added to send buffers (vSendMsg) or when new peers are added
1575-
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
1576-
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
1577-
// run on Linux and friends.
1578-
recv_set.insert(wakeupPipe[0]);
1579-
#endif
1575+
if (m_wakeup_pipe) {
1576+
// We add a pipe to the read set so that the select() call can be woken up from the outside
1577+
// This is done when data is added to send buffers (vSendMsg) or when new peers are added
1578+
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
1579+
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
1580+
// run on Linux and friends.
1581+
recv_set.insert(m_wakeup_pipe->m_pipe[0]);
1582+
}
15801583

15811584
return !recv_set.empty() || !send_set.empty() || !error_set.empty();
15821585
}
@@ -1594,9 +1597,8 @@ void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
15941597
timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000;
15951598
timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;
15961599

1597-
wakeupSelectNeeded = true;
1598-
int n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);
1599-
wakeupSelectNeeded = false;
1600+
int n{-1};
1601+
ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);});
16001602
if (n == -1) {
16011603
LogPrintf("kevent wait error\n");
16021604
} else if (n > 0) {
@@ -1628,9 +1630,8 @@ void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
16281630
const size_t maxEvents = 64;
16291631
epoll_event events[maxEvents];
16301632

1631-
wakeupSelectNeeded = true;
1632-
int n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
1633-
wakeupSelectNeeded = false;
1633+
int n{-1};
1634+
ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);});
16341635
for (int i = 0; i < n; i++) {
16351636
auto& e = events[i];
16361637
if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) {
@@ -1685,9 +1686,8 @@ void CConnman::SocketEventsPoll(const std::vector<CNode*>& nodes,
16851686
vpollfds.push_back(std::move(it.second));
16861687
}
16871688

1688-
wakeupSelectNeeded = true;
1689-
int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
1690-
wakeupSelectNeeded = false;
1689+
int r{-1};
1690+
ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);});
16911691
if (r < 0) {
16921692
return;
16931693
}
@@ -1744,9 +1744,8 @@ void CConnman::SocketEventsSelect(const std::vector<CNode*>& nodes,
17441744
hSocketMax = std::max(hSocketMax, hSocket);
17451745
}
17461746

1747-
wakeupSelectNeeded = true;
1748-
int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
1749-
wakeupSelectNeeded = false;
1747+
int nSelect{-1};
1748+
ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);});
17501749
if (interruptNet)
17511750
return;
17521751

@@ -1849,18 +1848,10 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
18491848
// empty sets.
18501849
SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll);
18511850

1852-
#ifdef USE_WAKEUP_PIPE
1853-
// drain the wakeup pipe
1854-
if (recv_set.count(wakeupPipe[0])) {
1855-
char buf[128];
1856-
while (true) {
1857-
int r = read(wakeupPipe[0], buf, sizeof(buf));
1858-
if (r <= 0) {
1859-
break;
1860-
}
1861-
}
1851+
// Drain the wakeup pipe
1852+
if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) {
1853+
m_wakeup_pipe->Drain();
18621854
}
1863-
#endif
18641855

18651856
// Service (send/receive) each of the already connected nodes.
18661857
SocketHandlerConnected(recv_set, send_set, error_set);
@@ -2138,22 +2129,6 @@ void CConnman::WakeMessageHandler()
21382129
condMsgProc.notify_one();
21392130
}
21402131

2141-
void CConnman::WakeSelect()
2142-
{
2143-
#ifdef USE_WAKEUP_PIPE
2144-
if (wakeupPipe[1] == -1) {
2145-
return;
2146-
}
2147-
2148-
char buf{0};
2149-
if (write(wakeupPipe[1], &buf, sizeof(buf)) != 1) {
2150-
LogPrint(BCLog::NET, "write to wakeupPipe failed\n");
2151-
}
2152-
#endif
2153-
2154-
wakeupSelectNeeded = false;
2155-
}
2156-
21572132
void CConnman::ThreadDNSAddressSeed()
21582133
{
21592134
FastRandomContext rng;
@@ -2993,7 +2968,9 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
29932968
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
29942969
}
29952970
}
2996-
WakeSelect();
2971+
if (m_wakeup_pipe) {
2972+
m_wakeup_pipe->Write();
2973+
}
29972974
}
29982975
}
29992976

@@ -3373,23 +3350,13 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
33733350
}
33743351

33753352
#ifdef USE_WAKEUP_PIPE
3376-
if (pipe(wakeupPipe) != 0) {
3377-
wakeupPipe[0] = wakeupPipe[1] = -1;
3378-
LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n");
3379-
} else {
3380-
int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0);
3381-
if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) {
3382-
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
3383-
}
3384-
fFlags = fcntl(wakeupPipe[1], F_GETFL, 0);
3385-
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
3386-
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
3387-
}
3388-
if (m_edge_trig_events && !m_edge_trig_events->RegisterPipe(wakeupPipe[0])) {
3389-
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterPipe() failed\n");
3390-
}
3353+
m_wakeup_pipe = std::make_unique<WakeupPipe>(m_edge_trig_events.get());
3354+
if (!m_wakeup_pipe->IsValid()) {
3355+
/* We log the error but do not halt initialization */
3356+
LogPrintf("Unable to initialize WakeupPipe instance\n");
3357+
m_wakeup_pipe.reset();
33913358
}
3392-
#endif
3359+
#endif /* USE_WAKEUP_PIPE */
33933360

33943361
// Send and receive from sockets, accept connections
33953362
threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); });
@@ -3555,21 +3522,12 @@ void CConnman::StopNodes()
35553522
vhListenSocket.clear();
35563523
semOutbound.reset();
35573524
semAddnode.reset();
3558-
3559-
if (m_edge_trig_events) {
3560-
#ifdef USE_WAKEUP_PIPE
3561-
if (!m_edge_trig_events->UnregisterPipe(wakeupPipe[0])) {
3562-
LogPrintf("EdgeTriggeredEvents::UnregisterPipe() failed\n");
3563-
}
3564-
#endif
3565-
m_edge_trig_events.reset();
3566-
}
3567-
3568-
#ifdef USE_WAKEUP_PIPE
3569-
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
3570-
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
3571-
wakeupPipe[0] = wakeupPipe[1] = -1;
3572-
#endif
3525+
/**
3526+
* m_wakeup_pipe must be reset *before* m_edge_trig_events as it may
3527+
* attempt to call EdgeTriggeredEvents::UnregisterPipe() in its destructor
3528+
*/
3529+
m_wakeup_pipe.reset();
3530+
m_edge_trig_events.reset();
35733531
}
35743532

35753533
void CConnman::DeleteNode(CNode* pnode)
@@ -4082,8 +4040,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
40824040
}
40834041

40844042
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
4085-
if (!hasPendingData && wakeupSelectNeeded)
4086-
WakeSelect();
4043+
if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
4044+
m_wakeup_pipe->Write();
40874045
}
40884046
}
40894047

src/net.h

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <util/check.h>
3232
#include <util/edge.h>
3333
#include <util/system.h>
34+
#include <util/wpipe.h>
3435
#include <consensus/params.h>
3536

3637
#include <atomic>
@@ -45,10 +46,6 @@
4546
#include <queue>
4647
#include <vector>
4748

48-
#ifndef WIN32
49-
#define USE_WAKEUP_PIPE
50-
#endif
51-
5249
class CConnman;
5350
class CDeterministicMNList;
5451
class CDeterministicMNManager;
@@ -1168,7 +1165,6 @@ friend class CNode;
11681165
unsigned int GetReceiveFloodSize() const;
11691166

11701167
void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
1171-
void WakeSelect() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
11721168

11731169
/** Attempts to obfuscate tx time through exponentially distributed emitting.
11741170
Works assuming that a single interval is used.
@@ -1505,14 +1501,19 @@ friend class CNode;
15051501
*/
15061502
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
15071503

1508-
#ifdef USE_WAKEUP_PIPE
1509-
/** a pipe which is added to select() calls to wakeup before the timeout */
1510-
int wakeupPipe[2]{-1,-1};
1511-
#endif
1512-
std::atomic<bool> wakeupSelectNeeded{false};
1513-
15141504
SocketEventsMode socketEventsMode;
15151505
std::unique_ptr<EdgeTriggeredEvents> m_edge_trig_events{nullptr};
1506+
std::unique_ptr<WakeupPipe> m_wakeup_pipe{nullptr};
1507+
1508+
template <typename Callable>
1509+
void ToggleWakeupPipe(Callable&& func)
1510+
{
1511+
if (m_wakeup_pipe) {
1512+
m_wakeup_pipe->Toggle(func);
1513+
} else {
1514+
func();
1515+
}
1516+
}
15161517

15171518
Mutex cs_sendable_receivable_nodes;
15181519
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);

src/util/wpipe.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright (c) 2020-2024 The Dash Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <util/wpipe.h>
6+
7+
#include <logging.h>
8+
#include <util/edge.h>
9+
10+
WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events)
11+
: m_edge_trig_events{edge_trig_events}
12+
{
13+
#ifdef USE_WAKEUP_PIPE
14+
if (pipe(m_pipe.data()) != 0) {
15+
LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed\n");
16+
return;
17+
}
18+
for (size_t idx = 0; idx < m_pipe.size(); idx++) {
19+
int flags = fcntl(m_pipe[idx], F_GETFL, 0);
20+
if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) {
21+
LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed\n", idx);
22+
return;
23+
}
24+
}
25+
if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) {
26+
LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed\n");
27+
return;
28+
}
29+
m_valid = true;
30+
#else
31+
LogPrintf("Attempting to initialize WakeupPipe without support compiled in!\n");
32+
#endif /* USE_WAKEUP_PIPE */
33+
}
34+
35+
WakeupPipe::~WakeupPipe()
36+
{
37+
if (m_valid) {
38+
#ifdef USE_WAKEUP_PIPE
39+
if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) {
40+
LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed\n");
41+
}
42+
close(m_pipe[0]);
43+
close(m_pipe[1]);
44+
#else
45+
assert(false);
46+
#endif /* USE_WAKEUP_PIPE */
47+
}
48+
}
49+
50+
void WakeupPipe::Drain() const
51+
{
52+
#ifdef USE_WAKEUP_PIPE
53+
assert(m_valid && m_pipe[0] != -1);
54+
55+
int ret{0};
56+
std::array<uint8_t, 128> buf;
57+
do {
58+
ret = read(m_pipe[0], buf.data(), buf.size());
59+
} while (ret > 0);
60+
#else
61+
assert(false);
62+
#endif /* USE_WAKEUP_PIPE */
63+
}
64+
65+
void WakeupPipe::Write()
66+
{
67+
#ifdef USE_WAKEUP_PIPE
68+
assert(m_valid && m_pipe[1] != -1);
69+
70+
std::array<uint8_t, 1> buf;
71+
if (write(m_pipe[1], buf.data(), buf.size()) != 1) {
72+
LogPrintf("Write to m_pipe[1] failed\n");
73+
}
74+
75+
m_need_wakeup = false;
76+
#else
77+
assert(false);
78+
#endif /* USE_WAKEUP_PIPE */
79+
}

0 commit comments

Comments
 (0)