5
5
#include < scheduler.h>
6
6
7
7
#include < random.h>
8
- #include < reverselock.h>
9
8
10
9
#include < assert.h>
11
10
#include < utility>
@@ -20,18 +19,9 @@ CScheduler::~CScheduler()
20
19
}
21
20
22
21
23
- #if BOOST_VERSION < 105000
24
- static boost::system_time toPosixTime (const boost::chrono::system_clock::time_point& t)
25
- {
26
- // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
27
- // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
28
- return boost::posix_time::from_time_t (0 ) + boost::posix_time::milliseconds (boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch ()).count ());
29
- }
30
- #endif
31
-
32
22
void CScheduler::serviceQueue ()
33
23
{
34
- boost::unique_lock<boost::mutex> lock (newTaskMutex);
24
+ WAIT_LOCK (newTaskMutex, lock );
35
25
++nThreadsServicingQueue;
36
26
37
27
// newTaskMutex is locked throughout this loop EXCEPT
@@ -40,7 +30,7 @@ void CScheduler::serviceQueue()
40
30
while (!shouldStop ()) {
41
31
try {
42
32
if (!shouldStop () && taskQueue.empty ()) {
43
- reverse_lock<boost::unique_lock<boost::mutex> > rlock (lock);
33
+ REVERSE_LOCK (lock);
44
34
}
45
35
while (!shouldStop () && taskQueue.empty ()) {
46
36
// Wait until there is something to do.
@@ -50,21 +40,13 @@ void CScheduler::serviceQueue()
50
40
// Wait until either there is a new task, or until
51
41
// the time of the first item on the queue:
52
42
53
- // wait_until needs boost 1.50 or later; older versions have timed_wait:
54
- #if BOOST_VERSION < 105000
55
- while (!shouldStop () && !taskQueue.empty () &&
56
- newTaskScheduled.timed_wait (lock, toPosixTime (taskQueue.begin ()->first ))) {
57
- // Keep waiting until timeout
58
- }
59
- #else
60
- // Some boost versions have a conflicting overload of wait_until that returns void.
61
- // Explicitly use a template here to avoid hitting that overload.
62
43
while (!shouldStop () && !taskQueue.empty ()) {
63
- boost ::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin ()->first ;
64
- if (newTaskScheduled.wait_until <> (lock, timeToWaitFor) == boost ::cv_status::timeout)
44
+ std ::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin ()->first ;
45
+ if (newTaskScheduled.wait_until (lock, timeToWaitFor) == std ::cv_status::timeout) {
65
46
break ; // Exit loop after timeout, it means we reached the time of the event
47
+ }
66
48
}
67
- # endif
49
+
68
50
// If there are multiple threads, the queue can empty while we're waiting (another
69
51
// thread may service the task we were waiting on).
70
52
if (shouldStop () || taskQueue.empty ())
@@ -76,7 +58,7 @@ void CScheduler::serviceQueue()
76
58
{
77
59
// Unlock before calling f, so it can reschedule itself or another task
78
60
// without deadlocking:
79
- reverse_lock<boost::unique_lock<boost::mutex> > rlock (lock);
61
+ REVERSE_LOCK (lock);
80
62
f ();
81
63
}
82
64
} catch (...) {
@@ -91,7 +73,7 @@ void CScheduler::serviceQueue()
91
73
void CScheduler::stop (bool drain)
92
74
{
93
75
{
94
- boost::unique_lock<boost::mutex> lock (newTaskMutex);
76
+ LOCK (newTaskMutex);
95
77
if (drain)
96
78
stopWhenEmpty = true ;
97
79
else
@@ -100,29 +82,29 @@ void CScheduler::stop(bool drain)
100
82
newTaskScheduled.notify_all ();
101
83
}
102
84
103
- void CScheduler::schedule (CScheduler::Function f, boost ::chrono::system_clock::time_point t)
85
+ void CScheduler::schedule (CScheduler::Function f, std ::chrono::system_clock::time_point t)
104
86
{
105
87
{
106
- boost::unique_lock<boost::mutex> lock (newTaskMutex);
88
+ LOCK (newTaskMutex);
107
89
taskQueue.insert (std::make_pair (t, f));
108
90
}
109
91
newTaskScheduled.notify_one ();
110
92
}
111
93
112
94
void CScheduler::scheduleFromNow (CScheduler::Function f, int64_t deltaMilliSeconds)
113
95
{
114
- schedule (f, boost ::chrono::system_clock::now () + boost ::chrono::milliseconds (deltaMilliSeconds));
96
+ schedule (f, std ::chrono::system_clock::now () + std ::chrono::milliseconds (deltaMilliSeconds));
115
97
}
116
98
117
- void CScheduler::MockForward (boost ::chrono::seconds delta_seconds)
99
+ void CScheduler::MockForward (std ::chrono::seconds delta_seconds)
118
100
{
119
- assert (delta_seconds.count () > 0 && delta_seconds < boost ::chrono::hours{1 });
101
+ assert (delta_seconds.count () > 0 && delta_seconds < std ::chrono::hours{1 });
120
102
121
103
{
122
- boost::unique_lock<boost::mutex> lock (newTaskMutex);
104
+ LOCK (newTaskMutex);
123
105
124
106
// use temp_queue to maintain updated schedule
125
- std::multimap<boost ::chrono::system_clock::time_point, Function> temp_queue;
107
+ std::multimap<std ::chrono::system_clock::time_point, Function> temp_queue;
126
108
127
109
for (const auto & element : taskQueue) {
128
110
temp_queue.emplace_hint (temp_queue.cend (), element.first - delta_seconds, element.second );
@@ -147,10 +129,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds
147
129
scheduleFromNow (std::bind (&Repeat, this , f, deltaMilliSeconds), deltaMilliSeconds);
148
130
}
149
131
150
- size_t CScheduler::getQueueInfo (boost ::chrono::system_clock::time_point &first,
151
- boost ::chrono::system_clock::time_point &last) const
132
+ size_t CScheduler::getQueueInfo (std ::chrono::system_clock::time_point &first,
133
+ std ::chrono::system_clock::time_point &last) const
152
134
{
153
- boost::unique_lock<boost::mutex> lock (newTaskMutex);
135
+ LOCK (newTaskMutex);
154
136
size_t result = taskQueue.size ();
155
137
if (!taskQueue.empty ()) {
156
138
first = taskQueue.begin ()->first ;
@@ -160,7 +142,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
160
142
}
161
143
162
144
bool CScheduler::AreThreadsServicingQueue () const {
163
- boost::unique_lock<boost::mutex> lock (newTaskMutex);
145
+ LOCK (newTaskMutex);
164
146
return nThreadsServicingQueue;
165
147
}
166
148
@@ -174,7 +156,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
174
156
if (m_are_callbacks_running) return ;
175
157
if (m_callbacks_pending.empty ()) return ;
176
158
}
177
- m_pscheduler->schedule (std::bind (&SingleThreadedSchedulerClient::ProcessQueue, this ));
159
+ m_pscheduler->schedule (std::bind (&SingleThreadedSchedulerClient::ProcessQueue, this ), std::chrono::system_clock::now () );
178
160
}
179
161
180
162
void SingleThreadedSchedulerClient::ProcessQueue () {
0 commit comments