Skip to content

Commit 21ed30a

Browse files
committed
Merge #10179: Give CValidationInterface Support for calling notifications on the CScheduler Thread
1f668b6 Expose if CScheduler is being serviced, assert its not in EmptyQueue (Matt Corallo) 3192975 Flush CValidationInterface callbacks prior to destruction (Matt Corallo) 08096bb Support more than one CScheduler thread for serial clients (Matt Corallo) 2fbf2db Add default arg to CScheduler to schedule() a callback now (Matt Corallo) cda1429 Give CMainSignals a reference to the global scheduler (Matt Corallo) 3a19fed Make ValidationInterface signals-type-agnostic (Matt Corallo) ff6a834 Use TestingSetup to DRY qt rpcnestedtests (Matt Corallo) Tree-SHA512: fab91e34e30b080ed4d0a6d8c1214910e383c45440676e37be61d0bde6ae98d61e8903d22b846e95ba4e73a6ce788798350266feba246d8a2ab357e8523e4ac5
2 parents 9edda0c + 1f668b6 commit 21ed30a

File tree

8 files changed

+274
-73
lines changed

8 files changed

+274
-73
lines changed

src/init.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,19 @@ void Shutdown()
215215
fFeeEstimatesInitialized = false;
216216
}
217217

218+
// FlushStateToDisk generates a SetBestChain callback, which we should avoid missing
219+
FlushStateToDisk();
220+
221+
// After there are no more peers/RPC left to give us new data which may generate
222+
// CValidationInterface callbacks, flush them...
223+
GetMainSignals().FlushBackgroundCallbacks();
224+
225+
// Any future callbacks will be dropped. This should absolutely be safe - if
226+
// missing a callback results in an unrecoverable situation, unclean shutdown
227+
// would too. The only reason to do the above flushes is to let the wallet catch
228+
// up with our current chain to avoid any strange pruning edge cases and make
229+
// next startup faster by avoiding rescan.
230+
218231
{
219232
LOCK(cs_main);
220233
if (pcoinsTip != NULL) {
@@ -251,6 +264,7 @@ void Shutdown()
251264
}
252265
#endif
253266
UnregisterAllValidationInterfaces();
267+
GetMainSignals().UnregisterBackgroundSignalScheduler();
254268
#ifdef ENABLE_WALLET
255269
for (CWalletRef pwallet : vpwallets) {
256270
delete pwallet;
@@ -1203,6 +1217,8 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
12031217
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
12041218
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
12051219

1220+
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
1221+
12061222
/* Start the RPC server already. It will be started in "warmup" mode
12071223
* and not really process calls already (but it will signify connections
12081224
* that the server is there and will be ready later). Warmup mode will

src/qt/test/rpcnestedtests.cpp

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "rpc/server.h"
1313
#include "rpcconsole.h"
1414
#include "test/testutil.h"
15+
#include "test/test_bitcoin.h"
1516
#include "univalue.h"
1617
#include "util.h"
1718

@@ -35,24 +36,15 @@ void RPCNestedTests::rpcNestedTests()
3536
{
3637
// do some test setup
3738
// could be moved to a more generic place when we add more tests on QT level
38-
const CChainParams& chainparams = Params();
39-
RegisterAllCoreRPCCommands(tableRPC);
4039
tableRPC.appendCommand("rpcNestedTest", &vRPCCommands[0]);
4140
ClearDatadirCache();
4241
std::string path = QDir::tempPath().toStdString() + "/" + strprintf("test_bitcoin_qt_%lu_%i", (unsigned long)GetTime(), (int)(GetRand(100000)));
4342
QDir dir(QString::fromStdString(path));
4443
dir.mkpath(".");
4544
ForceSetArg("-datadir", path);
4645
//mempool.setSanityCheck(1.0);
47-
pblocktree = new CBlockTreeDB(1 << 20, true);
48-
pcoinsdbview = new CCoinsViewDB(1 << 23, true);
49-
pcoinsTip = new CCoinsViewCache(pcoinsdbview);
50-
InitBlockIndex(chainparams);
51-
{
52-
CValidationState state;
53-
bool ok = ActivateBestChain(state, chainparams);
54-
QVERIFY(ok);
55-
}
46+
47+
TestingSetup test;
5648

5749
SetRPCWarmupFinished();
5850

@@ -145,13 +137,5 @@ void RPCNestedTests::rpcNestedTests()
145137
QVERIFY_EXCEPTION_THROWN(RPCConsole::RPCExecuteCommandLine(result, "rpcNestedTest(abc,,)"), std::runtime_error); //don't tollerate empty arguments when using ,
146138
#endif
147139

148-
UnloadBlockIndex();
149-
delete pcoinsTip;
150-
pcoinsTip = nullptr;
151-
delete pcoinsdbview;
152-
pcoinsdbview = nullptr;
153-
delete pblocktree;
154-
pblocktree = nullptr;
155-
156140
fs::remove_all(fs::path(path));
157141
}

src/scheduler.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,69 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
139139
}
140140
return result;
141141
}
142+
143+
bool CScheduler::AreThreadsServicingQueue() const {
144+
return nThreadsServicingQueue;
145+
}
146+
147+
148+
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
149+
{
150+
LOCK(m_cs_callbacks_pending);
151+
// Try to avoid scheduling too many copies here, but if we
152+
// accidentally have two ProcessQueue's scheduled at once its
153+
// not a big deal.
154+
if (m_are_callbacks_running) return;
155+
if (m_callbacks_pending.empty()) return;
156+
}
157+
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
158+
}
159+
160+
void SingleThreadedSchedulerClient::ProcessQueue() {
161+
std::function<void (void)> callback;
162+
{
163+
LOCK(m_cs_callbacks_pending);
164+
if (m_are_callbacks_running) return;
165+
if (m_callbacks_pending.empty()) return;
166+
m_are_callbacks_running = true;
167+
168+
callback = std::move(m_callbacks_pending.front());
169+
m_callbacks_pending.pop_front();
170+
}
171+
172+
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
173+
// to ensure both happen safely even if callback() throws.
174+
struct RAIICallbacksRunning {
175+
SingleThreadedSchedulerClient* instance;
176+
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
177+
~RAIICallbacksRunning() {
178+
{
179+
LOCK(instance->m_cs_callbacks_pending);
180+
instance->m_are_callbacks_running = false;
181+
}
182+
instance->MaybeScheduleProcessQueue();
183+
}
184+
} raiicallbacksrunning(this);
185+
186+
callback();
187+
}
188+
189+
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
190+
assert(m_pscheduler);
191+
192+
{
193+
LOCK(m_cs_callbacks_pending);
194+
m_callbacks_pending.emplace_back(std::move(func));
195+
}
196+
MaybeScheduleProcessQueue();
197+
}
198+
199+
void SingleThreadedSchedulerClient::EmptyQueue() {
200+
assert(!m_pscheduler->AreThreadsServicingQueue());
201+
bool should_continue = true;
202+
while (should_continue) {
203+
ProcessQueue();
204+
LOCK(m_cs_callbacks_pending);
205+
should_continue = !m_callbacks_pending.empty();
206+
}
207+
}

src/scheduler.h

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include <boost/thread.hpp>
1515
#include <map>
1616

17+
#include "sync.h"
18+
1719
//
1820
// Simple class for background tasks that should be run
1921
// periodically or once "after a while"
@@ -41,7 +43,7 @@ class CScheduler
4143
typedef std::function<void(void)> Function;
4244

4345
// Call func at/after time t
44-
void schedule(Function f, boost::chrono::system_clock::time_point t);
46+
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
4547

4648
// Convenience method: call f once deltaSeconds from now
4749
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
@@ -69,6 +71,9 @@ class CScheduler
6971
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
7072
boost::chrono::system_clock::time_point &last) const;
7173

74+
// Returns true if there are threads actively running in serviceQueue()
75+
bool AreThreadsServicingQueue() const;
76+
7277
private:
7378
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
7479
boost::condition_variable newTaskScheduled;
@@ -79,4 +84,30 @@ class CScheduler
7984
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
8085
};
8186

87+
/**
88+
* Class used by CScheduler clients which may schedule multiple jobs
89+
* which are required to be run serially. Does not require such jobs
90+
* to be executed on the same thread, but no two jobs will be executed
91+
* at the same time.
92+
*/
93+
class SingleThreadedSchedulerClient {
94+
private:
95+
CScheduler *m_pscheduler;
96+
97+
CCriticalSection m_cs_callbacks_pending;
98+
std::list<std::function<void (void)>> m_callbacks_pending;
99+
bool m_are_callbacks_running = false;
100+
101+
void MaybeScheduleProcessQueue();
102+
void ProcessQueue();
103+
104+
public:
105+
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
106+
void AddToProcessQueue(std::function<void (void)> func);
107+
108+
// Processes all remaining queue members on the calling thread, blocking until queue is empty
109+
// Must be called after the CScheduler has no remaining processing threads!
110+
void EmptyQueue();
111+
};
112+
82113
#endif

src/test/test_bitcoin.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
6262
pathTemp = GetTempPath() / strprintf("test_bitcoin_%lu_%i", (unsigned long)GetTime(), (int)(InsecureRandRange(100000)));
6363
fs::create_directories(pathTemp);
6464
ForceSetArg("-datadir", pathTemp.string());
65+
66+
// Note that because we don't bother running a scheduler thread here,
67+
// callbacks via CValidationInterface are unreliable, but that's OK,
68+
// our unit tests aren't testing multiple parts of the code at once.
69+
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
70+
6571
mempool.setSanityCheck(1.0);
6672
pblocktree = new CBlockTreeDB(1 << 20, true);
6773
pcoinsdbview = new CCoinsViewDB(1 << 23, true);
@@ -88,6 +94,8 @@ TestingSetup::~TestingSetup()
8894
UnregisterNodeSignals(GetNodeSignals());
8995
threadGroup.interrupt_all();
9096
threadGroup.join_all();
97+
GetMainSignals().FlushBackgroundCallbacks();
98+
GetMainSignals().UnregisterBackgroundSignalScheduler();
9199
UnloadBlockIndex();
92100
delete pcoinsTip;
93101
delete pcoinsdbview;

src/test/test_bitcoin.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "key.h"
1111
#include "pubkey.h"
1212
#include "random.h"
13+
#include "scheduler.h"
1314
#include "txdb.h"
1415
#include "txmempool.h"
1516

@@ -53,6 +54,7 @@ struct TestingSetup: public BasicTestingSetup {
5354
fs::path pathTemp;
5455
boost::thread_group threadGroup;
5556
CConnman* connman;
57+
CScheduler scheduler;
5658

5759
TestingSetup(const std::string& chainName = CBaseChainParams::MAIN);
5860
~TestingSetup();

0 commit comments

Comments
 (0)