Skip to content

Commit e76fc2b

Browse files
committed
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult given the current notifications available. It announces all transactions being added to mempool or included in blocks, but announces no evictions and gives no indication if the transaction is in the mempool or a block. Block notifications for zmq are also substandard, in that it only announces block tips, while all block transactions are still announced. This commit adds a unified stream which can be used to closely track mempool: 1) getrawmempool to fill out mempool knowledge 2) if txhash is announced, add or remove from set based on add/remove flag 3) if blockhash is announced, get block txn list, remove from those transactions local view of mempool 4) if we drop a sequence number, go to (1) The mempool sequence number starts at the value 1, and increments each time a transaction enters the mempool, or is evicted from the mempool for any reason, including block inclusion. The mempool sequence number is published via ZMQ for any transaction-related notification. These features allow for ZMQ/RPC consumer to track mempool state in a more exacting way, without unnecesarily polling getrawmempool. See interface_zmq.py::test_mempool_sync for example usage.
1 parent 1b615e6 commit e76fc2b

19 files changed

+206
-41
lines changed

src/init.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node)
488488
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
489489
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
490490
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
491+
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
491492
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
492493
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
493494
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
494495
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
496+
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
495497
#else
496498
hidden_args.emplace_back("-zmqpubhashblock=<address>");
497499
hidden_args.emplace_back("-zmqpubhashtx=<address>");
498500
hidden_args.emplace_back("-zmqpubrawblock=<address>");
499501
hidden_args.emplace_back("-zmqpubrawtx=<address>");
502+
hidden_args.emplace_back("-zmqpubsequence=<n>");
500503
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
501504
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
502505
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
503506
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
507+
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
504508
#endif
505509

506510
argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);

src/interfaces/chain.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ class NotificationsProxy : public CValidationInterface
5959
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
6060
: m_notifications(std::move(notifications)) {}
6161
virtual ~NotificationsProxy() = default;
62-
void TransactionAddedToMempool(const CTransactionRef& tx) override
62+
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override
6363
{
64-
m_notifications->transactionAddedToMempool(tx);
64+
m_notifications->transactionAddedToMempool(tx, mempool_sequence);
6565
}
66-
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
66+
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
6767
{
68-
m_notifications->transactionRemovedFromMempool(tx, reason);
68+
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
6969
}
7070
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
7171
{
@@ -405,7 +405,7 @@ class ChainImpl : public Chain
405405
if (!m_node.mempool) return;
406406
LOCK2(::cs_main, m_node.mempool->cs);
407407
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
408-
notifications.transactionAddedToMempool(entry.GetSharedTx());
408+
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */);
409409
}
410410
}
411411
NodeContext& m_node;

src/interfaces/chain.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ class Chain
242242
{
243243
public:
244244
virtual ~Notifications() {}
245-
virtual void transactionAddedToMempool(const CTransactionRef& tx) {}
246-
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
245+
virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
246+
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
247247
virtual void blockConnected(const CBlock& block, int height) {}
248248
virtual void blockDisconnected(const CBlock& block, int height) {}
249249
virtual void updatedBlockTip() {}

src/rpc/blockchain.cpp

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
500500
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
501501
}
502502

503-
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
503+
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence)
504504
{
505505
if (verbose) {
506+
if (include_mempool_sequence) {
507+
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
508+
}
506509
LOCK(pool.cs);
507510
UniValue o(UniValue::VOBJ);
508511
for (const CTxMemPoolEntry& e : pool.mapTx) {
@@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
516519
}
517520
return o;
518521
} else {
522+
uint64_t mempool_sequence;
519523
std::vector<uint256> vtxid;
520-
pool.queryHashes(vtxid);
521-
524+
{
525+
LOCK(pool.cs);
526+
pool.queryHashes(vtxid);
527+
mempool_sequence = pool.GetSequence();
528+
}
522529
UniValue a(UniValue::VARR);
523530
for (const uint256& hash : vtxid)
524531
a.push_back(hash.ToString());
525532

526-
return a;
533+
if (!include_mempool_sequence) {
534+
return a;
535+
} else {
536+
UniValue o(UniValue::VOBJ);
537+
o.pushKV("txids", a);
538+
o.pushKV("mempool_sequence", mempool_sequence);
539+
return o;
540+
}
527541
}
528542
}
529543

@@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool()
534548
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
535549
{
536550
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
551+
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
537552
},
538553
{
539554
RPCResult{"for verbose = false",
@@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool()
546561
{
547562
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
548563
}},
564+
RPCResult{"for verbose = false and mempool_sequence = true",
565+
RPCResult::Type::OBJ, "", "",
566+
{
567+
{RPCResult::Type::ARR, "txids", "",
568+
{
569+
{RPCResult::Type::STR_HEX, "", "The transaction id"},
570+
}},
571+
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
572+
}},
549573
},
550574
RPCExamples{
551575
HelpExampleCli("getrawmempool", "true")
@@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool()
557581
if (!request.params[0].isNull())
558582
fVerbose = request.params[0].get_bool();
559583

560-
return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
584+
bool include_mempool_sequence = false;
585+
if (!request.params[1].isNull()) {
586+
include_mempool_sequence = request.params[1].get_bool();
587+
}
588+
589+
return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence);
561590
},
562591
};
563592
}
@@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] =
24512480
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
24522481
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
24532482
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
2454-
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
2483+
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} },
24552484
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
24562485
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} },
24572486
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },

src/rpc/blockchain.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn
4343
UniValue MempoolInfoToJSON(const CTxMemPool& pool);
4444

4545
/** Mempool to JSON */
46-
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false);
46+
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false);
4747

4848
/** Block header to JSON */
4949
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main);

src/rpc/client.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
142142
{ "pruneblockchain", 0, "height" },
143143
{ "keypoolrefill", 0, "newsize" },
144144
{ "getrawmempool", 0, "verbose" },
145+
{ "getrawmempool", 1, "mempool_sequence" },
145146
{ "estimatesmartfee", 0, "conf_target" },
146147
{ "estimaterawfee", 0, "conf_target" },
147148
{ "estimaterawfee", 1, "threshold" },

src/txmempool.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces
409409

410410
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
411411
{
412+
// We increment mempool sequence value no matter removal reason
413+
// even if not directly reported below.
414+
uint64_t mempool_sequence = GetAndIncrementSequence();
415+
412416
if (reason != MemPoolRemovalReason::BLOCK) {
413417
// Notify clients that a transaction has been removed from the mempool
414418
// for any reason except being included in a block. Clients interested
415419
// in transactions included in blocks can subscribe to the BlockConnected
416420
// notification.
417-
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
421+
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
418422
}
419423

420424
const uint256 hash = it->GetTx().GetHash();

src/txmempool.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,11 @@ class CTxMemPool
501501
mutable uint64_t m_epoch;
502502
mutable bool m_has_epoch_guard;
503503

504+
// In-memory counter for external mempool tracking purposes.
505+
// This number is incremented once every time a transaction
506+
// is added or removed from the mempool for any reason.
507+
mutable uint64_t m_sequence_number{1};
508+
504509
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
505510

506511
bool m_is_loaded GUARDED_BY(cs){false};
@@ -776,6 +781,15 @@ class CTxMemPool
776781
return m_unbroadcast_txids.count(txid) != 0;
777782
}
778783

784+
/** Guards this internal counter for external reporting */
785+
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
786+
return m_sequence_number++;
787+
}
788+
789+
uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
790+
return m_sequence_number;
791+
}
792+
779793
private:
780794
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
781795
* the descendants for a single transaction that has been added to the

src/validation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1058,7 +1058,7 @@ bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs
10581058

10591059
if (!Finalize(args, workspace)) return false;
10601060

1061-
GetMainSignals().TransactionAddedToMempool(ptx);
1061+
GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence());
10621062

10631063
return true;
10641064
}

src/validationinterface.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,18 +199,18 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
199199
fInitialDownload);
200200
}
201201

202-
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx) {
203-
auto event = [tx, this] {
204-
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx); });
202+
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
203+
auto event = [tx, mempool_sequence, this] {
204+
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
205205
};
206206
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
207207
tx->GetHash().ToString(),
208208
tx->GetWitnessHash().ToString());
209209
}
210210

211-
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
212-
auto event = [tx, reason, this] {
213-
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); });
211+
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
212+
auto event = [tx, reason, mempool_sequence, this] {
213+
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
214214
};
215215
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
216216
tx->GetHash().ToString(),

0 commit comments

Comments
 (0)