Skip to content

Commit 927a1d7

Browse files
committed
Merge #10286: Call wallet notify callbacks in scheduler thread (without cs_main)
89f0312 Remove redundant pwallet nullptr check (Matt Corallo) c4784b5 Add a dev notes document describing the new wallet RPC blocking (Matt Corallo) 3ea8b75 Give ZMQ consistent order with UpdatedBlockTip on scheduler thread (Matt Corallo) cb06edf Fix wallet RPC race by waiting for callbacks in sendrawtransaction (Matt Corallo) e545ded Also call other wallet notify callbacks in scheduler thread (Matt Corallo) 17220d6 Use callbacks to cache whether wallet transactions are in mempool (Matt Corallo) 5d67a78 Add calls to CWallet::BlockUntilSyncedToCurrentChain() in RPCs (Matt Corallo) 5ee3172 Add CWallet::BlockUntilSyncedToCurrentChain() (Matt Corallo) 0b2f42d Add CallFunctionInQueue to wait on validation interface queue drain (Matt Corallo) 2b4b345 Add ability to assert a lock is not held in DEBUG_LOCKORDER (Matt Corallo) 0343676 Call TransactionRemovedFromMempool in the CScheduler thread (Matt Corallo) a7d3936 Add a CValidationInterface::TransactionRemovedFromMempool (Matt Corallo) Pull request description: Based on #10179, this effectively reverts #9583, regaining most of the original speedups of #7946. This concludes the work of #9725, #10178, and #10179. See individual commit messages for more information. Tree-SHA512: eead4809b0a75d1fb33b0765174ff52c972e45040635e38cf3686cef310859c1e6b3c00e7186cbd17374c6ae547bfbd6c1718fe36f26c76ba8a8b052d6ed7bc9
2 parents aca77a4 + 89f0312 commit 927a1d7

File tree

12 files changed

+354
-28
lines changed

12 files changed

+354
-28
lines changed

doc/developer-notes.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,3 +675,16 @@ A few guidelines for introducing and reviewing new RPC interfaces:
675675

676676
- *Rationale*: If a RPC response is not a JSON object then it is harder to avoid API breakage if
677677
new data in the response is needed.
678+
679+
- Wallet RPCs call BlockUntilSyncedToCurrentChain to maintain consistency with
680+
`getblockchaininfo`'s state immediately prior to the call's execution. Wallet
681+
RPCs whose behavior does *not* depend on the current chainstate may omit this
682+
call.
683+
684+
- *Rationale*: In previous versions of Bitcoin Core, the wallet was always
685+
in-sync with the chainstate (by virtue of them all being updated in the
686+
same cs_main lock). In order to maintain the behavior that wallet RPCs
687+
return results as of at least the highest best-known block an RPC
688+
client may be aware of prior to entering a wallet RPC call, we must block
689+
until the wallet is caught up to the chainstate as of the RPC call's entry.
690+
This also makes the API much easier for RPC clients to reason about.

src/init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ void Shutdown()
261261
#endif
262262
UnregisterAllValidationInterfaces();
263263
GetMainSignals().UnregisterBackgroundSignalScheduler();
264+
GetMainSignals().UnregisterWithMempoolSignals(mempool);
264265
#ifdef ENABLE_WALLET
265266
CloseWallets();
266267
#endif
@@ -1236,6 +1237,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
12361237
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
12371238

12381239
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
1240+
GetMainSignals().RegisterWithMempoolSignals(mempool);
12391241

12401242
/* Start the RPC server already. It will be started in "warmup" mode
12411243
* and not really process calls already (but it will signify connections

src/rpc/rawtransaction.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "init.h"
1212
#include "keystore.h"
1313
#include "validation.h"
14+
#include "validationinterface.h"
1415
#include "merkleblock.h"
1516
#include "net.h"
1617
#include "policy/policy.h"
@@ -30,6 +31,7 @@
3031
#include "wallet/wallet.h"
3132
#endif
3233

34+
#include <future>
3335
#include <stdint.h>
3436

3537
#include <univalue.h>
@@ -917,7 +919,9 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
917919
);
918920

919921
ObserveSafeMode();
920-
LOCK(cs_main);
922+
923+
std::promise<void> promise;
924+
921925
RPCTypeCheck(request.params, {UniValue::VSTR, UniValue::VBOOL});
922926

923927
// parse hex string from parameter
@@ -931,6 +935,8 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
931935
if (!request.params[1].isNull() && request.params[1].get_bool())
932936
nMaxRawTxFee = 0;
933937

938+
{ // cs_main scope
939+
LOCK(cs_main);
934940
CCoinsViewCache &view = *pcoinsTip;
935941
bool fHaveChain = false;
936942
for (size_t o = 0; !fHaveChain && o < tx->vout.size(); o++) {
@@ -952,10 +958,24 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
952958
}
953959
throw JSONRPCError(RPC_TRANSACTION_ERROR, state.GetRejectReason());
954960
}
961+
} else {
962+
// If wallet is enabled, ensure that the wallet has been made aware
963+
// of the new transaction prior to returning. This prevents a race
964+
// where a user might call sendrawtransaction with a transaction
965+
// to/from their wallet, immediately call some wallet RPC, and get
966+
// a stale result because callbacks have not yet been processed.
967+
CallFunctionInValidationInterfaceQueue([&promise] {
968+
promise.set_value();
969+
});
955970
}
956971
} else if (fHaveChain) {
957972
throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain");
958973
}
974+
975+
} // cs_main
976+
977+
promise.get_future().wait();
978+
959979
if(!g_connman)
960980
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");
961981

@@ -964,6 +984,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
964984
{
965985
pnode->PushInventory(inv);
966986
});
987+
967988
return hashTx.GetHex();
968989
}
969990

src/sync.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,16 @@ void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine,
155155
abort();
156156
}
157157

158+
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs)
159+
{
160+
for (const std::pair<void*, CLockLocation>& i : *lockstack) {
161+
if (i.first == cs) {
162+
fprintf(stderr, "Assertion failed: lock %s held in %s:%i; locks held:\n%s", pszName, pszFile, nLine, LocksHeld().c_str());
163+
abort();
164+
}
165+
}
166+
}
167+
158168
void DeleteLock(void* cs)
159169
{
160170
if (!lockdata.available) {

src/sync.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,17 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs
7777
void LeaveCritical();
7878
std::string LocksHeld();
7979
void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
80+
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
8081
void DeleteLock(void* cs);
8182
#else
8283
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {}
8384
void static inline LeaveCritical() {}
8485
void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
86+
void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
8587
void static inline DeleteLock(void* cs) {}
8688
#endif
8789
#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs)
90+
#define AssertLockNotHeld(cs) AssertLockNotHeldInternal(#cs, __FILE__, __LINE__, &cs)
8891

8992
/**
9093
* Wrapped mutex: supports recursive locking, but no waiting

src/txmempool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ class CTxMemPool
513513
// to track size/count of descendant transactions. First version of
514514
// addUnchecked can be used to have it call CalculateMemPoolAncestors(), and
515515
// then invoke the second version.
516+
// Note that addUnchecked is ONLY called from ATMP outside of tests
517+
// and any other callers may break wallet's in-mempool tracking (due to
518+
// lack of CValidationInterface::TransactionAddedToMempool callbacks).
516519
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, bool validFeeEstimate = true);
517520
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, setEntries &setAncestors, bool validFeeEstimate = true);
518521

src/validation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2494,7 +2494,7 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams,
24942494

24952495
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
24962496
assert(trace.pblock && trace.pindex);
2497-
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, *trace.conflictedTxs);
2497+
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, trace.conflictedTxs);
24982498
}
24992499
}
25002500
// When we reach this point, we switched to a new tip (stored in pindexNewTip).

src/validationinterface.cpp

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "primitives/block.h"
1010
#include "scheduler.h"
1111
#include "sync.h"
12+
#include "txmempool.h"
1213
#include "util.h"
1314

1415
#include <list>
@@ -21,6 +22,7 @@ struct MainSignalsInstance {
2122
boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
2223
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef>&)> BlockConnected;
2324
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &)> BlockDisconnected;
25+
boost::signals2::signal<void (const CTransactionRef &)> TransactionRemovedFromMempool;
2426
boost::signals2::signal<void (const CBlockLocator &)> SetBestChain;
2527
boost::signals2::signal<void (const uint256 &)> Inventory;
2628
boost::signals2::signal<void (int64_t nBestBlockTime, CConnman* connman)> Broadcast;
@@ -50,6 +52,14 @@ void CMainSignals::FlushBackgroundCallbacks() {
5052
m_internals->m_schedulerClient.EmptyQueue();
5153
}
5254

55+
void CMainSignals::RegisterWithMempoolSignals(CTxMemPool& pool) {
56+
pool.NotifyEntryRemoved.connect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2));
57+
}
58+
59+
void CMainSignals::UnregisterWithMempoolSignals(CTxMemPool& pool) {
60+
pool.NotifyEntryRemoved.disconnect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2));
61+
}
62+
5363
CMainSignals& GetMainSignals()
5464
{
5565
return g_signals;
@@ -60,6 +70,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) {
6070
g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
6171
g_signals.m_internals->BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
6272
g_signals.m_internals->BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
73+
g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
6374
g_signals.m_internals->SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
6475
g_signals.m_internals->Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
6576
g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
@@ -75,6 +86,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
7586
g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
7687
g_signals.m_internals->BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
7788
g_signals.m_internals->BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
89+
g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
7890
g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
7991
g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
8092
}
@@ -87,32 +99,57 @@ void UnregisterAllValidationInterfaces() {
8799
g_signals.m_internals->TransactionAddedToMempool.disconnect_all_slots();
88100
g_signals.m_internals->BlockConnected.disconnect_all_slots();
89101
g_signals.m_internals->BlockDisconnected.disconnect_all_slots();
102+
g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots();
90103
g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots();
91104
g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots();
92105
}
93106

107+
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) {
108+
g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
109+
}
110+
111+
void CMainSignals::MempoolEntryRemoved(CTransactionRef ptx, MemPoolRemovalReason reason) {
112+
if (reason != MemPoolRemovalReason::BLOCK && reason != MemPoolRemovalReason::CONFLICT) {
113+
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
114+
m_internals->TransactionRemovedFromMempool(ptx);
115+
});
116+
}
117+
}
118+
94119
void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
95-
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
120+
m_internals->m_schedulerClient.AddToProcessQueue([pindexNew, pindexFork, fInitialDownload, this] {
121+
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
122+
});
96123
}
97124

98125
void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
99-
m_internals->TransactionAddedToMempool(ptx);
126+
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
127+
m_internals->TransactionAddedToMempool(ptx);
128+
});
100129
}
101130

102-
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) {
103-
m_internals->BlockConnected(pblock, pindex, vtxConflicted);
131+
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>>& pvtxConflicted) {
132+
m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] {
133+
m_internals->BlockConnected(pblock, pindex, *pvtxConflicted);
134+
});
104135
}
105136

106137
void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &pblock) {
107-
m_internals->BlockDisconnected(pblock);
138+
m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] {
139+
m_internals->BlockDisconnected(pblock);
140+
});
108141
}
109142

110143
void CMainSignals::SetBestChain(const CBlockLocator &locator) {
111-
m_internals->SetBestChain(locator);
144+
m_internals->m_schedulerClient.AddToProcessQueue([locator, this] {
145+
m_internals->SetBestChain(locator);
146+
});
112147
}
113148

114149
void CMainSignals::Inventory(const uint256 &hash) {
115-
m_internals->Inventory(hash);
150+
m_internals->m_schedulerClient.AddToProcessQueue([hash, this] {
151+
m_internals->Inventory(hash);
152+
});
116153
}
117154

118155
void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) {

src/validationinterface.h

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
#ifndef BITCOIN_VALIDATIONINTERFACE_H
77
#define BITCOIN_VALIDATIONINTERFACE_H
88

9-
#include <memory>
10-
119
#include "primitives/transaction.h" // CTransaction(Ref)
1210

11+
#include <functional>
12+
#include <memory>
13+
1314
class CBlock;
1415
class CBlockIndex;
1516
struct CBlockLocator;
@@ -20,6 +21,8 @@ class CValidationInterface;
2021
class CValidationState;
2122
class uint256;
2223
class CScheduler;
24+
class CTxMemPool;
25+
enum class MemPoolRemovalReason;
2326

2427
// These functions dispatch to one or all registered wallets
2528

@@ -29,23 +32,66 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn);
2932
void UnregisterValidationInterface(CValidationInterface* pwalletIn);
3033
/** Unregister all wallets from core */
3134
void UnregisterAllValidationInterfaces();
35+
/**
36+
* Pushes a function to callback onto the notification queue, guaranteeing any
37+
* callbacks generated prior to now are finished when the function is called.
38+
*
39+
* Be very careful blocking on func to be called if any locks are held -
40+
* validation interface clients may not be able to make progress as they often
41+
* wait for things like cs_main, so blocking until func is called with cs_main
42+
* will result in a deadlock (that DEBUG_LOCKORDER will miss).
43+
*/
44+
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
3245

3346
class CValidationInterface {
3447
protected:
35-
/** Notifies listeners of updated block chain tip */
48+
/**
49+
* Notifies listeners of updated block chain tip
50+
*
51+
* Called on a background thread.
52+
*/
3653
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
37-
/** Notifies listeners of a transaction having been added to mempool. */
54+
/**
55+
* Notifies listeners of a transaction having been added to mempool.
56+
*
57+
* Called on a background thread.
58+
*/
3859
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
60+
/**
61+
* Notifies listeners of a transaction leaving mempool.
62+
*
63+
* This only fires for transactions which leave mempool because of expiry,
64+
* size limiting, reorg (changes in lock times/coinbase maturity), or
65+
* replacement. This does not include any transactions which are included
66+
* in BlockConnectedDisconnected either in block->vtx or in txnConflicted.
67+
*
68+
* Called on a background thread.
69+
*/
70+
virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {}
3971
/**
4072
* Notifies listeners of a block being connected.
4173
* Provides a vector of transactions evicted from the mempool as a result.
74+
*
75+
* Called on a background thread.
4276
*/
4377
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
44-
/** Notifies listeners of a block being disconnected */
78+
/**
79+
* Notifies listeners of a block being disconnected
80+
*
81+
* Called on a background thread.
82+
*/
4583
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
46-
/** Notifies listeners of the new active block chain on-disk. */
84+
/**
85+
* Notifies listeners of the new active block chain on-disk.
86+
*
87+
* Called on a background thread.
88+
*/
4789
virtual void SetBestChain(const CBlockLocator &locator) {}
48-
/** Notifies listeners about an inventory item being seen on the network. */
90+
/**
91+
* Notifies listeners about an inventory item being seen on the network.
92+
*
93+
* Called on a background thread.
94+
*/
4995
virtual void Inventory(const uint256 &hash) {}
5096
/** Tells listeners to broadcast their data. */
5197
virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {}
@@ -73,6 +119,9 @@ class CMainSignals {
73119
friend void ::RegisterValidationInterface(CValidationInterface*);
74120
friend void ::UnregisterValidationInterface(CValidationInterface*);
75121
friend void ::UnregisterAllValidationInterfaces();
122+
friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
123+
124+
void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason);
76125

77126
public:
78127
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
@@ -82,9 +131,14 @@ class CMainSignals {
82131
/** Call any remaining callbacks on the calling thread */
83132
void FlushBackgroundCallbacks();
84133

134+
/** Register with mempool to call TransactionRemovedFromMempool callbacks */
135+
void RegisterWithMempoolSignals(CTxMemPool& pool);
136+
/** Unregister with mempool */
137+
void UnregisterWithMempoolSignals(CTxMemPool& pool);
138+
85139
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
86140
void TransactionAddedToMempool(const CTransactionRef &);
87-
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef> &);
141+
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>> &);
88142
void BlockDisconnected(const std::shared_ptr<const CBlock> &);
89143
void SetBestChain(const CBlockLocator &);
90144
void Inventory(const uint256 &);

0 commit comments

Comments
 (0)