Skip to content

Commit 3c60937

Browse files
committed
Merge pull request #6146
f501054 More robust CScheduler unit test (Gavin Andresen)
2 parents ec82d8c + f501054 commit 3c60937

File tree

3 files changed

+70
-18
lines changed

3 files changed

+70
-18
lines changed

src/scheduler.cpp

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include <boost/bind.hpp>
99
#include <utility>
1010

11-
CScheduler::CScheduler() : nThreadsServicingQueue(0)
11+
CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
1212
{
1313
}
1414

@@ -29,32 +29,37 @@ void CScheduler::serviceQueue()
2929
{
3030
boost::unique_lock<boost::mutex> lock(newTaskMutex);
3131
++nThreadsServicingQueue;
32+
stopRequested = false;
33+
stopWhenEmpty = false;
3234

3335
// newTaskMutex is locked throughout this loop EXCEPT
3436
// when the thread is waiting or when the user's function
3537
// is called.
36-
while (1) {
38+
while (!shouldStop()) {
3739
try {
38-
while (taskQueue.empty()) {
40+
while (!shouldStop() && taskQueue.empty()) {
3941
// Wait until there is something to do.
4042
newTaskScheduled.wait(lock);
4143
}
42-
// Wait until either there is a new task, or until
43-
// the time of the first item on the queue:
44+
45+
// Wait until either there is a new task, or until
46+
// the time of the first item on the queue:
4447

4548
// wait_until needs boost 1.50 or later; older versions have timed_wait:
4649
#if BOOST_VERSION < 105000
47-
while (!taskQueue.empty() && newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
50+
while (!shouldStop() && !taskQueue.empty() &&
51+
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
4852
// Keep waiting until timeout
4953
}
5054
#else
51-
while (!taskQueue.empty() && newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
55+
while (!shouldStop() && !taskQueue.empty() &&
56+
newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
5257
// Keep waiting until timeout
5358
}
5459
#endif
5560
// If there are multiple threads, the queue can empty while we're waiting (another
5661
// thread may service the task we were waiting on).
57-
if (taskQueue.empty())
62+
if (shouldStop() || taskQueue.empty())
5863
continue;
5964

6065
Function f = taskQueue.begin()->second;
@@ -70,6 +75,19 @@ void CScheduler::serviceQueue()
7075
throw;
7176
}
7277
}
78+
--nThreadsServicingQueue;
79+
}
80+
81+
void CScheduler::stop(bool drain)
82+
{
83+
{
84+
boost::unique_lock<boost::mutex> lock(newTaskMutex);
85+
if (drain)
86+
stopWhenEmpty = true;
87+
else
88+
stopRequested = true;
89+
}
90+
newTaskScheduled.notify_all();
7391
}
7492

7593
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
@@ -96,3 +114,15 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
96114
{
97115
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
98116
}
117+
118+
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
119+
boost::chrono::system_clock::time_point &last) const
120+
{
121+
boost::unique_lock<boost::mutex> lock(newTaskMutex);
122+
size_t result = taskQueue.size();
123+
if (!taskQueue.empty()) {
124+
first = taskQueue.begin()->first;
125+
last = taskQueue.rbegin()->first;
126+
}
127+
return result;
128+
}

src/scheduler.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,24 @@ class CScheduler
6060
// and interrupted using boost::interrupt_thread
6161
void serviceQueue();
6262

63+
// Tell any threads running serviceQueue to stop as soon as they're
64+
// done servicing whatever task they're currently servicing (drain=false)
65+
// or when there is no work left to be done (drain=true)
66+
void stop(bool drain=false);
67+
68+
// Returns number of tasks waiting to be serviced,
69+
// and first and last task times
70+
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
71+
boost::chrono::system_clock::time_point &last) const;
72+
6373
private:
6474
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
6575
boost::condition_variable newTaskScheduled;
66-
boost::mutex newTaskMutex;
76+
mutable boost::mutex newTaskMutex;
6777
int nThreadsServicingQueue;
78+
bool stopRequested;
79+
bool stopWhenEmpty;
80+
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
6881
};
6982

7083
#endif

src/test/scheduler_tests.cpp

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ static void MicroSleep(uint64_t n)
4242

4343
BOOST_AUTO_TEST_CASE(manythreads)
4444
{
45+
seed_insecure_rand(false);
46+
4547
// Stress test: hundreds of microsecond-scheduled tasks,
4648
// serviced by 10 threads.
4749
//
@@ -54,10 +56,6 @@ BOOST_AUTO_TEST_CASE(manythreads)
5456
// counters should sum to the number of initial tasks performed.
5557
CScheduler microTasks;
5658

57-
boost::thread_group microThreads;
58-
for (int i = 0; i < 5; i++)
59-
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
60-
6159
boost::mutex counterMutex[10];
6260
int counter[10] = { 0 };
6361
boost::random::mt19937 rng(insecure_rand());
@@ -67,6 +65,9 @@ BOOST_AUTO_TEST_CASE(manythreads)
6765

6866
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
6967
boost::chrono::system_clock::time_point now = start;
68+
boost::chrono::system_clock::time_point first, last;
69+
size_t nTasks = microTasks.getQueueInfo(first, last);
70+
BOOST_CHECK(nTasks == 0);
7071

7172
for (int i = 0; i < 100; i++) {
7273
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
@@ -77,9 +78,19 @@ BOOST_AUTO_TEST_CASE(manythreads)
7778
randomDelta(rng), tReschedule);
7879
microTasks.schedule(f, t);
7980
}
81+
nTasks = microTasks.getQueueInfo(first, last);
82+
BOOST_CHECK(nTasks == 100);
83+
BOOST_CHECK(first < last);
84+
BOOST_CHECK(last > now);
85+
86+
// As soon as these are created they will start running and servicing the queue
87+
boost::thread_group microThreads;
88+
for (int i = 0; i < 5; i++)
89+
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
8090

8191
MicroSleep(600);
8292
now = boost::chrono::system_clock::now();
93+
8394
// More threads and more tasks:
8495
for (int i = 0; i < 5; i++)
8596
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
@@ -93,11 +104,9 @@ BOOST_AUTO_TEST_CASE(manythreads)
93104
microTasks.schedule(f, t);
94105
}
95106

96-
// All 2,000 tasks should be finished within 2 milliseconds. Sleep a bit longer.
97-
MicroSleep(2100);
98-
99-
microThreads.interrupt_all();
100-
microThreads.join_all();
107+
// Drain the task queue then exit threads
108+
microTasks.stop(true);
109+
microThreads.join_all(); // ... wait until all the threads are done
101110

102111
int counterSum = 0;
103112
for (int i = 0; i < 10; i++) {

0 commit comments

Comments
 (0)