Skip to content

Commit 9e217f5

Browse files
committed
Merge #19572: ZMQ: Create "sequence" notifier, enabling client-side mempool tracking
759d94e Update zmq notification documentation and sample consumer (Gregory Sanders) 68c3c7e Add functional tests for zmq sequence topic and mempool sequence logic (Gregory Sanders) e76fc2b Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas (Gregory Sanders) 1b615e6 zmq test: Actually make reorg occur (Gregory Sanders) Pull request description: This PR creates a new ZMQ notifier that gives a "total hash history" of block (dis)connection, mempool addition/substraction, all in one pipeline. It also exposes a "mempool sequence number" to both this notifier and `getrawmempool` results, which allows the consumer to use the results together without confusion about ordering of results and without excessive `getrawmempool` polling. See the functional test `interfaces_zmq.py::test_mempool_sync` which shows the proposed user flow for the client-side tracking of mempool contents and confirmations. Inspired by bitcoin/bitcoin#19462 (comment) Alternative to bitcoin/bitcoin#19462 due to noted deficiencies in current zmq notification streams. Also fixes a legacy zmq test that didn't actually trigger a reorg because of identical blocks being generated on each side of the split(oops) ACKs for top commit: laanwj: Code review ACK 759d94e Tree-SHA512: 9daf0d7d996190f3a68ff40340a687519323d7a6c51dcb26be457fbc013217ea7b62fbd0700b74b654433d2e370704feb61e5584399290692464fcfcb72ce3b7
2 parents 8219893 + 759d94e commit 9e217f5

22 files changed

+552
-63
lines changed

contrib/zmq/zmq_sub.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
-zmqpubrawtx=tcp://127.0.0.1:28332 \
1212
-zmqpubrawblock=tcp://127.0.0.1:28332 \
1313
-zmqpubhashtx=tcp://127.0.0.1:28332 \
14-
-zmqpubhashblock=tcp://127.0.0.1:28332
14+
-zmqpubhashblock=tcp://127.0.0.1:28332 \
15+
-zmqpubsequence=tcp://127.0.0.1:28332
1516
1617
We use the asyncio library here. `self.handle()` installs itself as a
1718
future at the end of the function. Since it never returns with the event
@@ -47,16 +48,14 @@ def __init__(self):
4748
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
4849
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
4950
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
51+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
5052
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
5153

5254
async def handle(self) :
53-
msg = await self.zmqSubSocket.recv_multipart()
54-
topic = msg[0]
55-
body = msg[1]
55+
topic, body, seq = await self.zmqSubSocket.recv_multipart()
5656
sequence = "Unknown"
57-
if len(msg[-1]) == 4:
58-
msgSequence = struct.unpack('<I', msg[-1])[-1]
59-
sequence = str(msgSequence)
57+
if len(seq) == 4:
58+
sequence = str(struct.unpack('<I', seq)[-1])
6059
if topic == b"hashblock":
6160
print('- HASH BLOCK ('+sequence+') -')
6261
print(binascii.hexlify(body))
@@ -69,6 +68,12 @@ async def handle(self) :
6968
elif topic == b"rawtx":
7069
print('- RAW TX ('+sequence+') -')
7170
print(binascii.hexlify(body))
71+
elif topic == b"sequence":
72+
hash = binascii.hexlify(body[:32])
73+
label = chr(body[32])
74+
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
75+
print('- SEQUENCE ('+sequence+') -')
76+
print(hash, label, mempool_sequence)
7277
# schedule ourselves to receive the next message
7378
asyncio.ensure_future(self.handle())
7479

doc/zmq.md

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ Currently, the following notifications are supported:
6363
-zmqpubhashblock=address
6464
-zmqpubrawblock=address
6565
-zmqpubrawtx=address
66+
-zmqpubsequence=address
6667

6768
The socket type is PUB and the address must be a valid ZeroMQ socket
6869
address. The same address can be used in more than one notification.
@@ -74,6 +75,7 @@ The option to set the PUB socket's outbound message high water mark
7475
-zmqpubhashblockhwm=n
7576
-zmqpubrawblockhwm=n
7677
-zmqpubrawtxhwm=n
78+
-zmqpubsequencehwm=address
7779

7880
The high water mark value must be an integer greater than or equal to 0.
7981

@@ -87,7 +89,15 @@ Each PUB notification has a topic and body, where the header
8789
corresponds to the notification type. For instance, for the
8890
notification `-zmqpubhashtx` the topic is `hashtx` (no null
8991
terminator) and the body is the transaction hash (32
90-
bytes).
92+
bytes) for all but `sequence` topic. For `sequence`, the body
93+
is structured as the following based on the type of message:
94+
95+
<32-byte hash>C : Blockhash connected
96+
<32-byte hash>D : Blockhash disconnected
97+
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
98+
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
99+
100+
Where the 8-byte uints correspond to the mempool sequence number.
91101

92102
These options can also be provided in bitcoin.conf.
93103

@@ -124,13 +134,20 @@ No authentication or authorization is done on connecting clients; it
124134
is assumed that the ZeroMQ port is exposed only to trusted entities,
125135
using other means such as firewalling.
126136

127-
Note that when the block chain tip changes, a reorganisation may occur
128-
and just the tip will be notified. It is up to the subscriber to
129-
retrieve the chain from the last known block to the new tip. Also note
130-
that no notification occurs if the tip was in the active chain - this
131-
is the case after calling invalidateblock RPC.
137+
Note that for `*block` topics, when the block chain tip changes,
138+
a reorganisation may occur and just the tip will be notified.
139+
It is up to the subscriber to retrieve the chain from the last known
140+
block to the new tip. Also note that no notification will occur if the tip
141+
was in the active chain--as would be the case after calling invalidateblock RPC.
142+
In contrast, the `sequence` topic publishes all block connections and
143+
disconnections.
132144

133145
There are several possibilities that ZMQ notification can get lost
134146
during transmission depending on the communication type you are
135147
using. Bitcoind appends an up-counting sequence number to each
136148
notification which allows listeners to detect lost notifications.
149+
150+
The `sequence` topic refers specifically to the mempool sequence
151+
number, which is also published along with all mempool events. This
152+
is a different sequence value than in ZMQ itself in order to allow a total
153+
ordering of mempool events to be constructed.

src/init.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node)
488488
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
489489
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
490490
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
491+
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
491492
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
492493
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
493494
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
494495
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
496+
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
495497
#else
496498
hidden_args.emplace_back("-zmqpubhashblock=<address>");
497499
hidden_args.emplace_back("-zmqpubhashtx=<address>");
498500
hidden_args.emplace_back("-zmqpubrawblock=<address>");
499501
hidden_args.emplace_back("-zmqpubrawtx=<address>");
502+
hidden_args.emplace_back("-zmqpubsequence=<n>");
500503
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
501504
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
502505
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
503506
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
507+
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
504508
#endif
505509

506510
argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);

src/interfaces/chain.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ class NotificationsProxy : public CValidationInterface
5959
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
6060
: m_notifications(std::move(notifications)) {}
6161
virtual ~NotificationsProxy() = default;
62-
void TransactionAddedToMempool(const CTransactionRef& tx) override
62+
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override
6363
{
64-
m_notifications->transactionAddedToMempool(tx);
64+
m_notifications->transactionAddedToMempool(tx, mempool_sequence);
6565
}
66-
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
66+
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
6767
{
68-
m_notifications->transactionRemovedFromMempool(tx, reason);
68+
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
6969
}
7070
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
7171
{
@@ -405,7 +405,7 @@ class ChainImpl : public Chain
405405
if (!m_node.mempool) return;
406406
LOCK2(::cs_main, m_node.mempool->cs);
407407
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
408-
notifications.transactionAddedToMempool(entry.GetSharedTx());
408+
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */);
409409
}
410410
}
411411
NodeContext& m_node;

src/interfaces/chain.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ class Chain
242242
{
243243
public:
244244
virtual ~Notifications() {}
245-
virtual void transactionAddedToMempool(const CTransactionRef& tx) {}
246-
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
245+
virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
246+
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
247247
virtual void blockConnected(const CBlock& block, int height) {}
248248
virtual void blockDisconnected(const CBlock& block, int height) {}
249249
virtual void updatedBlockTip() {}

src/rpc/blockchain.cpp

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
500500
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
501501
}
502502

503-
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
503+
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence)
504504
{
505505
if (verbose) {
506+
if (include_mempool_sequence) {
507+
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
508+
}
506509
LOCK(pool.cs);
507510
UniValue o(UniValue::VOBJ);
508511
for (const CTxMemPoolEntry& e : pool.mapTx) {
@@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
516519
}
517520
return o;
518521
} else {
522+
uint64_t mempool_sequence;
519523
std::vector<uint256> vtxid;
520-
pool.queryHashes(vtxid);
521-
524+
{
525+
LOCK(pool.cs);
526+
pool.queryHashes(vtxid);
527+
mempool_sequence = pool.GetSequence();
528+
}
522529
UniValue a(UniValue::VARR);
523530
for (const uint256& hash : vtxid)
524531
a.push_back(hash.ToString());
525532

526-
return a;
533+
if (!include_mempool_sequence) {
534+
return a;
535+
} else {
536+
UniValue o(UniValue::VOBJ);
537+
o.pushKV("txids", a);
538+
o.pushKV("mempool_sequence", mempool_sequence);
539+
return o;
540+
}
527541
}
528542
}
529543

@@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool()
534548
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
535549
{
536550
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
551+
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
537552
},
538553
{
539554
RPCResult{"for verbose = false",
@@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool()
546561
{
547562
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
548563
}},
564+
RPCResult{"for verbose = false and mempool_sequence = true",
565+
RPCResult::Type::OBJ, "", "",
566+
{
567+
{RPCResult::Type::ARR, "txids", "",
568+
{
569+
{RPCResult::Type::STR_HEX, "", "The transaction id"},
570+
}},
571+
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
572+
}},
549573
},
550574
RPCExamples{
551575
HelpExampleCli("getrawmempool", "true")
@@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool()
557581
if (!request.params[0].isNull())
558582
fVerbose = request.params[0].get_bool();
559583

560-
return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
584+
bool include_mempool_sequence = false;
585+
if (!request.params[1].isNull()) {
586+
include_mempool_sequence = request.params[1].get_bool();
587+
}
588+
589+
return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence);
561590
},
562591
};
563592
}
@@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] =
24512480
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
24522481
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
24532482
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
2454-
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
2483+
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} },
24552484
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
24562485
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} },
24572486
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },

src/rpc/blockchain.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn
4343
UniValue MempoolInfoToJSON(const CTxMemPool& pool);
4444

4545
/** Mempool to JSON */
46-
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false);
46+
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false);
4747

4848
/** Block header to JSON */
4949
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main);

src/rpc/client.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
142142
{ "pruneblockchain", 0, "height" },
143143
{ "keypoolrefill", 0, "newsize" },
144144
{ "getrawmempool", 0, "verbose" },
145+
{ "getrawmempool", 1, "mempool_sequence" },
145146
{ "estimatesmartfee", 0, "conf_target" },
146147
{ "estimaterawfee", 0, "conf_target" },
147148
{ "estimaterawfee", 1, "threshold" },

src/txmempool.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces
409409

410410
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
411411
{
412+
// We increment mempool sequence value no matter removal reason
413+
// even if not directly reported below.
414+
uint64_t mempool_sequence = GetAndIncrementSequence();
415+
412416
if (reason != MemPoolRemovalReason::BLOCK) {
413417
// Notify clients that a transaction has been removed from the mempool
414418
// for any reason except being included in a block. Clients interested
415419
// in transactions included in blocks can subscribe to the BlockConnected
416420
// notification.
417-
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
421+
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
418422
}
419423

420424
const uint256 hash = it->GetTx().GetHash();

src/txmempool.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,11 @@ class CTxMemPool
501501
mutable uint64_t m_epoch;
502502
mutable bool m_has_epoch_guard;
503503

504+
// In-memory counter for external mempool tracking purposes.
505+
// This number is incremented once every time a transaction
506+
// is added or removed from the mempool for any reason.
507+
mutable uint64_t m_sequence_number{1};
508+
504509
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
505510

506511
bool m_is_loaded GUARDED_BY(cs){false};
@@ -776,6 +781,15 @@ class CTxMemPool
776781
return m_unbroadcast_txids.count(txid) != 0;
777782
}
778783

784+
/** Guards this internal counter for external reporting */
785+
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
786+
return m_sequence_number++;
787+
}
788+
789+
uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
790+
return m_sequence_number;
791+
}
792+
779793
private:
780794
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
781795
* the descendants for a single transaction that has been added to the

0 commit comments

Comments
 (0)