Skip to content

Commit 08096bb

Browse files
committed
Support more than one CScheduler thread for serial clients
This will be used by CValidationInterface soon. This requires a bit of work as we need to ensure that most of our callbacks happen in-order (to avoid synchronization issues in wallet) - we keep our own internal queue and push things onto it, scheduling a queue-draining function immediately upon new callbacks.
1 parent 2fbf2db commit 08096bb

File tree

4 files changed

+90
-10
lines changed

4 files changed

+90
-10
lines changed

src/scheduler.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
139139
}
140140
return result;
141141
}
142+
143+
144+
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
145+
{
146+
LOCK(m_cs_callbacks_pending);
147+
// Try to avoid scheduling too many copies here, but if we
148+
// accidentally have two ProcessQueue's scheduled at once its
149+
// not a big deal.
150+
if (m_are_callbacks_running) return;
151+
if (m_callbacks_pending.empty()) return;
152+
}
153+
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
154+
}
155+
156+
void SingleThreadedSchedulerClient::ProcessQueue() {
157+
std::function<void (void)> callback;
158+
{
159+
LOCK(m_cs_callbacks_pending);
160+
if (m_are_callbacks_running) return;
161+
if (m_callbacks_pending.empty()) return;
162+
m_are_callbacks_running = true;
163+
164+
callback = std::move(m_callbacks_pending.front());
165+
m_callbacks_pending.pop_front();
166+
}
167+
168+
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
169+
// to ensure both happen safely even if callback() throws.
170+
struct RAIICallbacksRunning {
171+
SingleThreadedSchedulerClient* instance;
172+
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
173+
~RAIICallbacksRunning() {
174+
{
175+
LOCK(instance->m_cs_callbacks_pending);
176+
instance->m_are_callbacks_running = false;
177+
}
178+
instance->MaybeScheduleProcessQueue();
179+
}
180+
} raiicallbacksrunning(this);
181+
182+
callback();
183+
}
184+
185+
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
186+
assert(m_pscheduler);
187+
188+
{
189+
LOCK(m_cs_callbacks_pending);
190+
m_callbacks_pending.emplace_back(std::move(func));
191+
}
192+
MaybeScheduleProcessQueue();
193+
}

src/scheduler.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include <boost/thread.hpp>
1515
#include <map>
1616

17+
#include "sync.h"
18+
1719
//
1820
// Simple class for background tasks that should be run
1921
// periodically or once "after a while"
@@ -79,4 +81,26 @@ class CScheduler
7981
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
8082
};
8183

84+
/**
85+
* Class used by CScheduler clients which may schedule multiple jobs
86+
* which are required to be run serially. Does not require such jobs
87+
* to be executed on the same thread, but no two jobs will be executed
88+
* at the same time.
89+
*/
90+
class SingleThreadedSchedulerClient {
91+
private:
92+
CScheduler *m_pscheduler;
93+
94+
CCriticalSection m_cs_callbacks_pending;
95+
std::list<std::function<void (void)>> m_callbacks_pending;
96+
bool m_are_callbacks_running = false;
97+
98+
void MaybeScheduleProcessQueue();
99+
void ProcessQueue();
100+
101+
public:
102+
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
103+
void AddToProcessQueue(std::function<void (void)> func);
104+
};
105+
82106
#endif

src/validationinterface.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
#include "validationinterface.h"
77
#include "init.h"
88
#include "scheduler.h"
9+
#include "sync.h"
10+
#include "util.h"
11+
12+
#include <list>
13+
#include <atomic>
914

1015
#include <boost/signals2/signal.hpp>
1116

@@ -20,22 +25,23 @@ struct MainSignalsInstance {
2025
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
2126
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
2227

23-
CScheduler *m_scheduler = NULL;
28+
// We are not allowed to assume the scheduler only runs in one thread,
29+
// but must ensure all callbacks happen in-order, so we end up creating
30+
// our own queue here :(
31+
SingleThreadedSchedulerClient m_schedulerClient;
32+
33+
MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
2434
};
2535

2636
static CMainSignals g_signals;
2737

28-
CMainSignals::CMainSignals() {
29-
m_internals.reset(new MainSignalsInstance());
30-
}
31-
3238
void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
33-
assert(!m_internals->m_scheduler);
34-
m_internals->m_scheduler = &scheduler;
39+
assert(!m_internals);
40+
m_internals.reset(new MainSignalsInstance(&scheduler));
3541
}
3642

3743
void CMainSignals::UnregisterBackgroundSignalScheduler() {
38-
m_internals->m_scheduler = NULL;
44+
m_internals.reset(nullptr);
3945
}
4046

4147
CMainSignals& GetMainSignals()

src/validationinterface.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ class CMainSignals {
7575
friend void ::UnregisterAllValidationInterfaces();
7676

7777
public:
78-
CMainSignals();
79-
8078
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
8179
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
8280
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */

0 commit comments

Comments
 (0)