Skip to content

Commit 3516a31

Browse files
committed
Merge #18234: refactor: Replace boost::mutex,condition_var,chrono with std equivalents in scheduler
70a6b52 lint-cppcheck: Remove -DHAVE_WORKING_BOOST_SLEEP_FOR (Anthony Towns) 294937b scheduler_tests: re-enable mockforward test (Anthony Towns) cea19f6 Drop unused reverselock.h (Anthony Towns) d0ebd93 scheduler: switch from boost to std (Anthony Towns) b9c4260 sync.h: add REVERSE_LOCK (Anthony Towns) 306f71b scheduler: don't rely on boost interrupt on shutdown (Anthony Towns) Pull request description: Replacing boost functionality with C++11 stuff. Motivated by #18227, but should stand alone. Changing from `boost::condition_var` to `std::condition_var` means `threadGroup.interrupt_all` isn't enough to interrupt `serviceQueue` anymore, so that means calling `stop()` before `join_all()` is needed. And the existing reverselock.h code doesn't work with sync.h's DebugLock code (because the reversed lock won't be removed from `g_lockstack` which then leads to incorrect potential deadlock warnings), so I've replaced that with a dedicated class and macro that's aware of our debug lock behaviour. Fixes #16027, Fixes #14200, Fixes #18227 ACKs for top commit: laanwj: ACK 70a6b52 Tree-SHA512: d1da13adeabcf9186d114e2dad9a4fdbe2e440f7afbccde0c13dfbaf464efcd850b69d3371c5bf8b179d7ceb9d81f4af3cc22960b90834e41eaaf6d52ef7d331
2 parents 45cdcd4 + 70a6b52 commit 3516a31

14 files changed

+151
-117
lines changed

src/Makefile.am

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ BITCOIN_CORE_H = \
178178
random.h \
179179
randomenv.h \
180180
reverse_iterator.h \
181-
reverselock.h \
182181
rpc/blockchain.h \
183182
rpc/client.h \
184183
rpc/protocol.h \

src/init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ void Shutdown(NodeContext& node)
206206

207207
// After everything has been shut down, but before things get flushed, stop the
208208
// CScheduler/checkqueue threadGroup
209+
if (node.scheduler) node.scheduler->stop();
209210
threadGroup.interrupt_all();
210211
threadGroup.join_all();
211212

src/reverselock.h

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/rpc/misc.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ static UniValue mockscheduler(const JSONRPCRequest& request)
393393
// protect against null pointer dereference
394394
CHECK_NONFATAL(g_rpc_node);
395395
CHECK_NONFATAL(g_rpc_node->scheduler);
396-
g_rpc_node->scheduler->MockForward(boost::chrono::seconds(delta_seconds));
396+
g_rpc_node->scheduler->MockForward(std::chrono::seconds(delta_seconds));
397397

398398
return NullUniValue;
399399
}

src/scheduler.cpp

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <scheduler.h>
66

77
#include <random.h>
8-
#include <reverselock.h>
98

109
#include <assert.h>
1110
#include <utility>
@@ -20,18 +19,9 @@ CScheduler::~CScheduler()
2019
}
2120

2221

23-
#if BOOST_VERSION < 105000
24-
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
25-
{
26-
// Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
27-
// start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
28-
return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch()).count());
29-
}
30-
#endif
31-
3222
void CScheduler::serviceQueue()
3323
{
34-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
24+
WAIT_LOCK(newTaskMutex, lock);
3525
++nThreadsServicingQueue;
3626

3727
// newTaskMutex is locked throughout this loop EXCEPT
@@ -40,7 +30,7 @@ void CScheduler::serviceQueue()
4030
while (!shouldStop()) {
4131
try {
4232
if (!shouldStop() && taskQueue.empty()) {
43-
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
33+
REVERSE_LOCK(lock);
4434
}
4535
while (!shouldStop() && taskQueue.empty()) {
4636
// Wait until there is something to do.
@@ -50,21 +40,13 @@ void CScheduler::serviceQueue()
5040
// Wait until either there is a new task, or until
5141
// the time of the first item on the queue:
5242

53-
// wait_until needs boost 1.50 or later; older versions have timed_wait:
54-
#if BOOST_VERSION < 105000
55-
while (!shouldStop() && !taskQueue.empty() &&
56-
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
57-
// Keep waiting until timeout
58-
}
59-
#else
60-
// Some boost versions have a conflicting overload of wait_until that returns void.
61-
// Explicitly use a template here to avoid hitting that overload.
6243
while (!shouldStop() && !taskQueue.empty()) {
63-
boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
64-
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
44+
std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
45+
if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
6546
break; // Exit loop after timeout, it means we reached the time of the event
47+
}
6648
}
67-
#endif
49+
6850
// If there are multiple threads, the queue can empty while we're waiting (another
6951
// thread may service the task we were waiting on).
7052
if (shouldStop() || taskQueue.empty())
@@ -76,7 +58,7 @@ void CScheduler::serviceQueue()
7658
{
7759
// Unlock before calling f, so it can reschedule itself or another task
7860
// without deadlocking:
79-
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
61+
REVERSE_LOCK(lock);
8062
f();
8163
}
8264
} catch (...) {
@@ -91,7 +73,7 @@ void CScheduler::serviceQueue()
9173
void CScheduler::stop(bool drain)
9274
{
9375
{
94-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
76+
LOCK(newTaskMutex);
9577
if (drain)
9678
stopWhenEmpty = true;
9779
else
@@ -100,29 +82,29 @@ void CScheduler::stop(bool drain)
10082
newTaskScheduled.notify_all();
10183
}
10284

103-
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
85+
void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
10486
{
10587
{
106-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
88+
LOCK(newTaskMutex);
10789
taskQueue.insert(std::make_pair(t, f));
10890
}
10991
newTaskScheduled.notify_one();
11092
}
11193

11294
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
11395
{
114-
schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
96+
schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds));
11597
}
11698

117-
void CScheduler::MockForward(boost::chrono::seconds delta_seconds)
99+
void CScheduler::MockForward(std::chrono::seconds delta_seconds)
118100
{
119-
assert(delta_seconds.count() > 0 && delta_seconds < boost::chrono::hours{1});
101+
assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
120102

121103
{
122-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
104+
LOCK(newTaskMutex);
123105

124106
// use temp_queue to maintain updated schedule
125-
std::multimap<boost::chrono::system_clock::time_point, Function> temp_queue;
107+
std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
126108

127109
for (const auto& element : taskQueue) {
128110
temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
@@ -147,10 +129,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds
147129
scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
148130
}
149131

150-
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
151-
boost::chrono::system_clock::time_point &last) const
132+
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
133+
std::chrono::system_clock::time_point &last) const
152134
{
153-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
135+
LOCK(newTaskMutex);
154136
size_t result = taskQueue.size();
155137
if (!taskQueue.empty()) {
156138
first = taskQueue.begin()->first;
@@ -160,7 +142,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
160142
}
161143

162144
bool CScheduler::AreThreadsServicingQueue() const {
163-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
145+
LOCK(newTaskMutex);
164146
return nThreadsServicingQueue;
165147
}
166148

@@ -174,7 +156,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
174156
if (m_are_callbacks_running) return;
175157
if (m_callbacks_pending.empty()) return;
176158
}
177-
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
159+
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
178160
}
179161

180162
void SingleThreadedSchedulerClient::ProcessQueue() {

src/scheduler.h

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77

88
//
99
// NOTE:
10-
// boost::thread / boost::chrono should be ported to std::thread / std::chrono
10+
// boost::thread should be ported to std::thread
1111
// when we support C++11.
1212
//
13-
#include <boost/chrono/chrono.hpp>
14-
#include <boost/thread.hpp>
13+
#include <condition_variable>
14+
#include <functional>
15+
#include <list>
1516
#include <map>
1617

1718
#include <sync.h>
@@ -27,8 +28,8 @@
2728
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
2829
// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
2930
//
30-
// ... then at program shutdown, clean up the thread running serviceQueue:
31-
// t->interrupt();
31+
// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
32+
// s->stop();
3233
// t->join();
3334
// delete t;
3435
// delete s; // Must be done after thread is interrupted/joined.
@@ -43,7 +44,7 @@ class CScheduler
4344
typedef std::function<void()> Function;
4445

4546
// Call func at/after time t
46-
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
47+
void schedule(Function f, std::chrono::system_clock::time_point t);
4748

4849
// Convenience method: call f once deltaMilliSeconds from now
4950
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
@@ -60,7 +61,7 @@ class CScheduler
6061
* Iterates through items on taskQueue and reschedules them
6162
* to be delta_seconds sooner.
6263
*/
63-
void MockForward(boost::chrono::seconds delta_seconds);
64+
void MockForward(std::chrono::seconds delta_seconds);
6465

6566
// To keep things as simple as possible, there is no unschedule.
6667

@@ -75,20 +76,20 @@ class CScheduler
7576

7677
// Returns number of tasks waiting to be serviced,
7778
// and first and last task times
78-
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
79-
boost::chrono::system_clock::time_point &last) const;
79+
size_t getQueueInfo(std::chrono::system_clock::time_point &first,
80+
std::chrono::system_clock::time_point &last) const;
8081

8182
// Returns true if there are threads actively running in serviceQueue()
8283
bool AreThreadsServicingQueue() const;
8384

8485
private:
85-
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
86-
boost::condition_variable newTaskScheduled;
87-
mutable boost::mutex newTaskMutex;
88-
int nThreadsServicingQueue;
89-
bool stopRequested;
90-
bool stopWhenEmpty;
91-
bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
86+
mutable Mutex newTaskMutex;
87+
std::condition_variable newTaskScheduled;
88+
std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
89+
int nThreadsServicingQueue GUARDED_BY(newTaskMutex);
90+
bool stopRequested GUARDED_BY(newTaskMutex);
91+
bool stopWhenEmpty GUARDED_BY(newTaskMutex);
92+
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
9293
};
9394

9495
/**

src/sync.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#include <util/strencodings.h>
1414
#include <util/threadnames.h>
1515

16-
16+
#include <system_error>
1717
#include <map>
1818
#include <set>
1919

@@ -60,6 +60,11 @@ struct CLockLocation {
6060
mutexName, sourceFile, itostr(sourceLine), (fTry ? " (TRY)" : ""), m_thread_name);
6161
}
6262

63+
std::string Name() const
64+
{
65+
return mutexName;
66+
}
67+
6368
private:
6469
bool fTry;
6570
std::string mutexName;
@@ -155,6 +160,18 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs
155160
push_lock(cs, CLockLocation(pszName, pszFile, nLine, fTry, util::ThreadGetInternalName()));
156161
}
157162

163+
void CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line)
164+
{
165+
if (!g_lockstack.empty()) {
166+
const auto& lastlock = g_lockstack.back();
167+
if (lastlock.first == cs) {
168+
lockname = lastlock.second.Name();
169+
return;
170+
}
171+
}
172+
throw std::system_error(EPERM, std::generic_category(), strprintf("%s:%s %s was not most recent critical section locked", file, line, guardname));
173+
}
174+
158175
void LeaveCritical()
159176
{
160177
pop_lock();

src/sync.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ LEAVE_CRITICAL_SECTION(mutex); // no RAII
5050
#ifdef DEBUG_LOCKORDER
5151
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false);
5252
void LeaveCritical();
53+
void CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line);
5354
std::string LocksHeld();
5455
void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs);
5556
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
@@ -64,6 +65,7 @@ extern bool g_debug_lockorder_abort;
6465
#else
6566
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {}
6667
void static inline LeaveCritical() {}
68+
void static inline CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line) {}
6769
void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs) {}
6870
void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
6971
void static inline DeleteLock(void* cs) {}
@@ -171,8 +173,45 @@ class SCOPED_LOCKABLE UniqueLock : public Base
171173
{
172174
return Base::owns_lock();
173175
}
176+
177+
protected:
178+
// needed for reverse_lock
179+
UniqueLock() { }
180+
181+
public:
182+
/**
183+
* An RAII-style reverse lock. Unlocks on construction and locks on destruction.
184+
*/
185+
class reverse_lock {
186+
public:
187+
explicit reverse_lock(UniqueLock& _lock, const char* _guardname, const char* _file, int _line) : lock(_lock), file(_file), line(_line) {
188+
CheckLastCritical((void*)lock.mutex(), lockname, _guardname, _file, _line);
189+
lock.unlock();
190+
LeaveCritical();
191+
lock.swap(templock);
192+
}
193+
194+
~reverse_lock() {
195+
templock.swap(lock);
196+
EnterCritical(lockname.c_str(), file.c_str(), line, (void*)lock.mutex());
197+
lock.lock();
198+
}
199+
200+
private:
201+
reverse_lock(reverse_lock const&);
202+
reverse_lock& operator=(reverse_lock const&);
203+
204+
UniqueLock& lock;
205+
UniqueLock templock;
206+
std::string lockname;
207+
const std::string file;
208+
const int line;
209+
};
210+
friend class reverse_lock;
174211
};
175212

213+
#define REVERSE_LOCK(g) decltype(g)::reverse_lock PASTE2(revlock, __COUNTER__)(g, #g, __FILE__, __LINE__)
214+
176215
template<typename MutexArg>
177216
using DebugLock = UniqueLock<typename std::remove_reference<typename std::remove_pointer<MutexArg>::type>::type>;
178217

0 commit comments

Comments
 (0)