Skip to content

Commit 3a4a372

Browse files
author
MarcoFalke
committed
Merge #19090: refactor: Misc scheduler cleanups
fa8337f clang-format scheduler (MarcoFalke) fa3d41b doc: Switch scheduler to doxygen comments (MarcoFalke) fac43f9 scheduler: Replace stop(true) with StopWhenDrained() (MarcoFalke) fa9cca0 doc: Remove unused documentation about unimplemented features (MarcoFalke) fab2950 doc: Switch boost::thread to std::thread in scheduler (MarcoFalke) fa98196 test: Remove unused scheduler.h include from the common setup (MarcoFalke) fa609c4 scheduler: Remove unused REVERSE_LOCK (MarcoFalke) Pull request description: This accumulates a bunch of cleanup that was long overdue, but I haven't yet gotten around to address. Specifically, but not limited to: * Remove unused code, documentation and includes * Upgrade to doxygen documentation Please refer to the individual commits for more details. ACKs for top commit: jnewbery: utACK fa8337f Tree-SHA512: 0c825ad9767e2697a3ef1ec1be13fdc2b18eeb7493ad0be5b65cc9f209391e78b17ee66e35e094c5e171c12b0f1624f287a110f6bddaf3024b708877afa8552e
2 parents ead6d68 + fa8337f commit 3a4a372

File tree

5 files changed

+72
-73
lines changed

5 files changed

+72
-73
lines changed

src/scheduler.cpp

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ void CScheduler::serviceQueue()
3030
// is called.
3131
while (!shouldStop()) {
3232
try {
33-
if (!shouldStop() && taskQueue.empty()) {
34-
REVERSE_LOCK(lock);
35-
}
3633
while (!shouldStop() && taskQueue.empty()) {
3734
// Wait until there is something to do.
3835
newTaskScheduled.wait(lock);
@@ -71,18 +68,6 @@ void CScheduler::serviceQueue()
7168
newTaskScheduled.notify_one();
7269
}
7370

74-
void CScheduler::stop(bool drain)
75-
{
76-
{
77-
LOCK(newTaskMutex);
78-
if (drain)
79-
stopWhenEmpty = true;
80-
else
81-
stopRequested = true;
82-
}
83-
newTaskScheduled.notify_all();
84-
}
85-
8671
void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
8772
{
8873
{
@@ -125,8 +110,8 @@ void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds
125110
scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
126111
}
127112

128-
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
129-
std::chrono::system_clock::time_point &last) const
113+
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
114+
std::chrono::system_clock::time_point& last) const
130115
{
131116
LOCK(newTaskMutex);
132117
size_t result = taskQueue.size();
@@ -137,13 +122,15 @@ size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
137122
return result;
138123
}
139124

140-
bool CScheduler::AreThreadsServicingQueue() const {
125+
bool CScheduler::AreThreadsServicingQueue() const
126+
{
141127
LOCK(newTaskMutex);
142128
return nThreadsServicingQueue;
143129
}
144130

145131

146-
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
132+
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
133+
{
147134
{
148135
LOCK(m_cs_callbacks_pending);
149136
// Try to avoid scheduling too many copies here, but if we
@@ -155,8 +142,9 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
155142
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
156143
}
157144

158-
void SingleThreadedSchedulerClient::ProcessQueue() {
159-
std::function<void ()> callback;
145+
void SingleThreadedSchedulerClient::ProcessQueue()
146+
{
147+
std::function<void()> callback;
160148
{
161149
LOCK(m_cs_callbacks_pending);
162150
if (m_are_callbacks_running) return;
@@ -172,7 +160,8 @@ void SingleThreadedSchedulerClient::ProcessQueue() {
172160
struct RAIICallbacksRunning {
173161
SingleThreadedSchedulerClient* instance;
174162
explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
175-
~RAIICallbacksRunning() {
163+
~RAIICallbacksRunning()
164+
{
176165
{
177166
LOCK(instance->m_cs_callbacks_pending);
178167
instance->m_are_callbacks_running = false;
@@ -184,7 +173,8 @@ void SingleThreadedSchedulerClient::ProcessQueue() {
184173
callback();
185174
}
186175

187-
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void ()> func) {
176+
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
177+
{
188178
assert(m_pscheduler);
189179

190180
{
@@ -194,7 +184,8 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void ()> fun
194184
MaybeScheduleProcessQueue();
195185
}
196186

197-
void SingleThreadedSchedulerClient::EmptyQueue() {
187+
void SingleThreadedSchedulerClient::EmptyQueue()
188+
{
198189
assert(!m_pscheduler->AreThreadsServicingQueue());
199190
bool should_continue = true;
200191
while (should_continue) {
@@ -204,7 +195,8 @@ void SingleThreadedSchedulerClient::EmptyQueue() {
204195
}
205196
}
206197

207-
size_t SingleThreadedSchedulerClient::CallbacksPending() {
198+
size_t SingleThreadedSchedulerClient::CallbacksPending()
199+
{
208200
LOCK(m_cs_callbacks_pending);
209201
return m_callbacks_pending.size();
210202
}

src/scheduler.h

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,30 @@
55
#ifndef BITCOIN_SCHEDULER_H
66
#define BITCOIN_SCHEDULER_H
77

8-
//
9-
// NOTE:
10-
// boost::thread should be ported to std::thread
11-
// when we support C++11.
12-
//
138
#include <condition_variable>
149
#include <functional>
1510
#include <list>
1611
#include <map>
1712

1813
#include <sync.h>
1914

20-
//
21-
// Simple class for background tasks that should be run
22-
// periodically or once "after a while"
23-
//
24-
// Usage:
25-
//
26-
// CScheduler* s = new CScheduler();
27-
// s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { }
28-
// s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
29-
// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
30-
//
31-
// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
32-
// s->stop();
33-
// t->join();
34-
// delete t;
35-
// delete s; // Must be done after thread is interrupted/joined.
36-
//
37-
15+
/**
16+
* Simple class for background tasks that should be run
17+
* periodically or once "after a while"
18+
*
19+
* Usage:
20+
*
21+
* CScheduler* s = new CScheduler();
22+
* s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { }
23+
* s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
24+
* std::thread* t = new std::thread([&] { s->serviceQueue(); });
25+
*
26+
* ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
27+
* s->stop();
28+
* t->join();
29+
* delete t;
30+
* delete s; // Must be done after thread is interrupted/joined.
31+
*/
3832
class CScheduler
3933
{
4034
public:
@@ -43,7 +37,7 @@ class CScheduler
4337

4438
typedef std::function<void()> Function;
4539

46-
// Call func at/after time t
40+
/** Call func at/after time t */
4741
void schedule(Function f, std::chrono::system_clock::time_point t);
4842

4943
/** Call f once after the delta has passed */
@@ -67,23 +61,33 @@ class CScheduler
6761
*/
6862
void MockForward(std::chrono::seconds delta_seconds);
6963

70-
// To keep things as simple as possible, there is no unschedule.
71-
72-
// Services the queue 'forever'. Should be run in a thread,
73-
// and interrupted using boost::interrupt_thread
64+
/**
65+
* Services the queue 'forever'. Should be run in a thread,
66+
* and interrupted using boost::interrupt_thread
67+
*/
7468
void serviceQueue();
7569

76-
// Tell any threads running serviceQueue to stop as soon as they're
77-
// done servicing whatever task they're currently servicing (drain=false)
78-
// or when there is no work left to be done (drain=true)
79-
void stop(bool drain=false);
70+
/** Tell any threads running serviceQueue to stop as soon as the current task is done */
71+
void stop()
72+
{
73+
WITH_LOCK(newTaskMutex, stopRequested = true);
74+
newTaskScheduled.notify_all();
75+
}
76+
/** Tell any threads running serviceQueue to stop when there is no work left to be done */
77+
void StopWhenDrained()
78+
{
79+
WITH_LOCK(newTaskMutex, stopWhenEmpty = true);
80+
newTaskScheduled.notify_all();
81+
}
8082

81-
// Returns number of tasks waiting to be serviced,
82-
// and first and last task times
83-
size_t getQueueInfo(std::chrono::system_clock::time_point &first,
84-
std::chrono::system_clock::time_point &last) const;
83+
/**
84+
* Returns number of tasks waiting to be serviced,
85+
* and first and last task times
86+
*/
87+
size_t getQueueInfo(std::chrono::system_clock::time_point& first,
88+
std::chrono::system_clock::time_point& last) const;
8589

86-
// Returns true if there are threads actively running in serviceQueue()
90+
/** Returns true if there are threads actively running in serviceQueue() */
8791
bool AreThreadsServicingQueue() const;
8892

8993
private:
@@ -106,30 +110,33 @@ class CScheduler
106110
* B() will be able to observe all of the effects of callback A() which executed
107111
* before it.
108112
*/
109-
class SingleThreadedSchedulerClient {
113+
class SingleThreadedSchedulerClient
114+
{
110115
private:
111-
CScheduler *m_pscheduler;
116+
CScheduler* m_pscheduler;
112117

113118
RecursiveMutex m_cs_callbacks_pending;
114-
std::list<std::function<void ()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending);
119+
std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending);
115120
bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false;
116121

117122
void MaybeScheduleProcessQueue();
118123
void ProcessQueue();
119124

120125
public:
121-
explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
126+
explicit SingleThreadedSchedulerClient(CScheduler* pschedulerIn) : m_pscheduler(pschedulerIn) {}
122127

123128
/**
124129
* Add a callback to be executed. Callbacks are executed serially
125130
* and memory is release-acquire consistent between callback executions.
126131
* Practically, this means that callbacks can behave as if they are executed
127132
* in order by a single thread.
128133
*/
129-
void AddToProcessQueue(std::function<void ()> func);
134+
void AddToProcessQueue(std::function<void()> func);
130135

131-
// Processes all remaining queue members on the calling thread, blocking until queue is empty
132-
// Must be called after the CScheduler has no remaining processing threads!
136+
/**
137+
* Processes all remaining queue members on the calling thread, blocking until queue is empty
138+
* Must be called after the CScheduler has no remaining processing threads!
139+
*/
133140
void EmptyQueue();
134141

135142
size_t CallbacksPending();

src/test/scheduler_tests.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ BOOST_AUTO_TEST_CASE(manythreads)
8989
}
9090

9191
// Drain the task queue then exit threads
92-
microTasks.stop(true);
92+
microTasks.StopWhenDrained();
9393
microThreads.join_all(); // ... wait until all the threads are done
9494

9595
int counterSum = 0;
@@ -155,7 +155,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
155155
}
156156

157157
// finish up
158-
scheduler.stop(true);
158+
scheduler.StopWhenDrained();
159159
threads.join_all();
160160

161161
BOOST_CHECK_EQUAL(counter1, 100);
@@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE(mockforward)
186186
scheduler.MockForward(std::chrono::minutes{5});
187187

188188
// ensure scheduler has chance to process all tasks queued for before 1 ms from now.
189-
scheduler.scheduleFromNow([&scheduler] { scheduler.stop(false); }, std::chrono::milliseconds{1});
189+
scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
190190
scheduler_thread.join();
191191

192192
// check that the queue only has one job remaining

src/test/util/setup_common.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <rpc/blockchain.h>
2020
#include <rpc/register.h>
2121
#include <rpc/server.h>
22+
#include <scheduler.h>
2223
#include <script/sigcache.h>
2324
#include <streams.h>
2425
#include <txdb.h>

src/test/util/setup_common.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
#include <node/context.h>
1212
#include <pubkey.h>
1313
#include <random.h>
14-
#include <scheduler.h>
1514
#include <txmempool.h>
1615
#include <util/string.h>
1716

0 commit comments

Comments
 (0)