Skip to content

Commit 68c3c7e

Browse files
committed
Add functional tests for zmq sequence topic and mempool sequence logic
1 parent e76fc2b commit 68c3c7e

File tree

1 file changed

+308
-6
lines changed

1 file changed

+308
-6
lines changed

test/functional/interface_zmq.py

Lines changed: 308 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,23 @@
66
import struct
77

88
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2WSH_OP_TRUE
9+
from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment
910
from test_framework.test_framework import BitcoinTestFramework
10-
from test_framework.messages import CTransaction, hash256
11-
from test_framework.util import assert_equal, connect_nodes
11+
from test_framework.messages import CTransaction, hash256, FromHex
12+
from test_framework.util import (
13+
assert_equal,
14+
connect_nodes,
15+
assert_raises_rpc_error,
16+
)
1217
from io import BytesIO
1318
from time import sleep
1419

20+
# Test may be skipped and not have zmq installed
21+
try:
22+
import zmq
23+
except ImportError:
24+
pass
25+
1526
def hash256_reversed(byte_str):
1627
return hash256(byte_str)[::-1]
1728

@@ -21,7 +32,6 @@ def __init__(self, socket, topic):
2132
self.socket = socket
2233
self.topic = topic
2334

24-
import zmq
2535
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
2636

2737
def receive(self):
@@ -33,6 +43,22 @@ def receive(self):
3343
self.sequence += 1
3444
return body
3545

46+
def receive_sequence(self):
47+
topic, body, seq = self.socket.recv_multipart()
48+
# Topic should match the subscriber topic.
49+
assert_equal(topic, self.topic)
50+
# Sequence should be incremental.
51+
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
52+
self.sequence += 1
53+
hash = body[:32].hex()
54+
label = chr(body[32])
55+
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
56+
if mempool_sequence is not None:
57+
assert label == "A" or label == "R"
58+
else:
59+
assert label == "D" or label == "C"
60+
return (hash, label, mempool_sequence)
61+
3662

3763
class ZMQTest (BitcoinTestFramework):
3864
def set_test_params(self):
@@ -43,18 +69,18 @@ def skip_test_if_missing_module(self):
4369
self.skip_if_no_bitcoind_zmq()
4470

4571
def run_test(self):
46-
import zmq
4772
self.ctx = zmq.Context()
4873
try:
4974
self.test_basic()
75+
self.test_sequence()
76+
self.test_mempool_sync()
5077
self.test_reorg()
5178
finally:
5279
# Destroy the ZMQ context.
5380
self.log.debug("Destroying ZMQ context")
5481
self.ctx.destroy(linger=None)
5582

5683
def test_basic(self):
57-
import zmq
5884

5985
# Invalid zmq arguments don't take down the node, see #17185.
6086
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
@@ -146,7 +172,6 @@ def test_reorg(self):
146172
self.log.info("Skipping reorg test because wallet is disabled")
147173
return
148174

149-
import zmq
150175
address = 'tcp://127.0.0.1:28333'
151176

152177
services = [b"hashblock", b"hashtx"]
@@ -204,5 +229,282 @@ def test_reorg(self):
204229
# And the current tip
205230
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
206231

232+
def test_sequence(self):
233+
"""
234+
Sequence zmq notifications give every blockhash and txhash in order
235+
of processing, regardless of IBD, re-orgs, etc.
236+
Format of messages:
237+
<32-byte hash>C : Blockhash connected
238+
<32-byte hash>D : Blockhash disconnected
239+
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
240+
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
241+
"""
242+
self.log.info("Testing 'sequence' publisher")
243+
address = 'tcp://127.0.0.1:28333'
244+
socket = self.ctx.socket(zmq.SUB)
245+
socket.set(zmq.RCVTIMEO, 60000)
246+
seq = ZMQSubscriber(socket, b'sequence')
247+
248+
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
249+
socket.connect(address)
250+
# Relax so that the subscriber is ready before publishing zmq messages
251+
sleep(0.2)
252+
253+
# Mempool sequence number starts at 1
254+
seq_num = 1
255+
256+
# Generate 1 block in nodes[0] and receive all notifications
257+
dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
258+
259+
# Note: We are not notified of any block transactions, coinbase or mined
260+
assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
261+
262+
# Generate 2 blocks in nodes[1] to a different address to ensure a chain split
263+
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
264+
265+
# nodes[0] will reorg chain after connecting back nodes[1]
266+
connect_nodes(self.nodes[0], 1)
267+
268+
# Then we receive all block (dis)connect notifications for the 2 block reorg
269+
assert_equal((dc_block, "D", None), seq.receive_sequence())
270+
block_count = self.nodes[1].getblockcount()
271+
assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
272+
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
273+
274+
# Rest of test requires wallet functionality
275+
if self.is_wallet_compiled():
276+
self.log.info("Wait for tx from second node")
277+
payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0, replaceable=True)
278+
self.sync_all()
279+
self.log.info("Testing sequence notifications with mempool sequence values")
280+
281+
# Should receive the broadcasted txid.
282+
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
283+
seq_num += 1
284+
285+
self.log.info("Testing RBF notification")
286+
# Replace it to test eviction/addition notification
287+
rbf_info = self.nodes[1].bumpfee(payment_txid)
288+
self.sync_all()
289+
assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
290+
seq_num += 1
291+
assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
292+
seq_num += 1
293+
294+
# Doesn't get published when mined, make a block and tx to "flush" the possibility
295+
# though the mempool sequence number does go up by the number of transactions
296+
# removed from the mempool by the block mining it.
297+
mempool_size = len(self.nodes[0].getrawmempool())
298+
c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
299+
self.sync_all()
300+
# Make sure the number of mined transactions matches the number of txs out of mempool
301+
mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
302+
assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
303+
seq_num += mempool_size_delta
304+
payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
305+
self.sync_all()
306+
assert_equal((c_block, "C", None), seq.receive_sequence())
307+
assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
308+
seq_num += 1
309+
310+
# Spot check getrawmempool results that they only show up when asked for
311+
assert type(self.nodes[0].getrawmempool()) is list
312+
assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
313+
assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
314+
assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
315+
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
316+
317+
self.log.info("Testing reorg notifications")
318+
# Manually invalidate the last block to test mempool re-entry
319+
# N.B. This part could be made more lenient in exact ordering
320+
# since it greatly depends on inner-workings of blocks/mempool
321+
# during "deep" re-orgs. Probably should "re-construct"
322+
# blockchain/mempool state from notifications instead.
323+
block_count = self.nodes[0].getblockcount()
324+
best_hash = self.nodes[0].getbestblockhash()
325+
self.nodes[0].invalidateblock(best_hash)
326+
sleep(2) # Bit of room to make sure transaction things happened
327+
328+
# Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
329+
# of the time they were gathered.
330+
assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
331+
332+
assert_equal((best_hash, "D", None), seq.receive_sequence())
333+
assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
334+
seq_num += 1
335+
336+
# Other things may happen but aren't wallet-deterministic so we don't test for them currently
337+
self.nodes[0].reconsiderblock(best_hash)
338+
self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
339+
self.sync_all()
340+
341+
self.log.info("Evict mempool transaction by block conflict")
342+
orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
343+
344+
# More to be simply mined
345+
more_tx = []
346+
for _ in range(5):
347+
more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
348+
349+
raw_tx = self.nodes[0].getrawtransaction(orig_txid)
350+
bump_info = self.nodes[0].bumpfee(orig_txid)
351+
# Mine the pre-bump tx
352+
block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
353+
tx = FromHex(CTransaction(), raw_tx)
354+
block.vtx.append(tx)
355+
for txid in more_tx:
356+
tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid))
357+
block.vtx.append(tx)
358+
add_witness_commitment(block)
359+
block.solve()
360+
assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
361+
tip = self.nodes[0].getbestblockhash()
362+
assert_equal(int(tip, 16), block.sha256)
363+
orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
364+
365+
# Flush old notifications until evicted tx original entry
366+
(hash_str, label, mempool_seq) = seq.receive_sequence()
367+
while hash_str != orig_txid:
368+
(hash_str, label, mempool_seq) = seq.receive_sequence()
369+
mempool_seq += 1
370+
371+
# Added original tx
372+
assert_equal(label, "A")
373+
# More transactions to be simply mined
374+
for i in range(len(more_tx)):
375+
assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
376+
mempool_seq += 1
377+
# Bumped by rbf
378+
assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
379+
mempool_seq += 1
380+
assert_equal((bump_info["txid"], "A", mempool_seq), seq.receive_sequence())
381+
mempool_seq += 1
382+
# Conflict announced first, then block
383+
assert_equal((bump_info["txid"], "R", mempool_seq), seq.receive_sequence())
384+
mempool_seq += 1
385+
assert_equal((tip, "C", None), seq.receive_sequence())
386+
mempool_seq += len(more_tx)
387+
# Last tx
388+
assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
389+
mempool_seq += 1
390+
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
391+
self.sync_all() # want to make sure we didn't break "consensus" for other tests
392+
393+
def test_mempool_sync(self):
394+
"""
395+
Use sequence notification plus getrawmempool sequence results to "sync mempool"
396+
"""
397+
if not self.is_wallet_compiled():
398+
self.log.info("Skipping mempool sync test")
399+
return
400+
401+
self.log.info("Testing 'mempool sync' usage of sequence notifier")
402+
address = 'tcp://127.0.0.1:28333'
403+
socket = self.ctx.socket(zmq.SUB)
404+
socket.set(zmq.RCVTIMEO, 60000)
405+
seq = ZMQSubscriber(socket, b'sequence')
406+
407+
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
408+
connect_nodes(self.nodes[0], 1)
409+
socket.connect(address)
410+
# Relax so that the subscriber is ready before publishing zmq messages
411+
sleep(0.2)
412+
413+
# In-memory counter, should always start at 1
414+
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
415+
assert_equal(next_mempool_seq, 1)
416+
417+
# Some transactions have been happening but we aren't consuming zmq notifications yet
418+
# or we lost a ZMQ message somehow and want to start over
419+
txids = []
420+
num_txs = 5
421+
for _ in range(num_txs):
422+
txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True))
423+
self.sync_all()
424+
425+
# 1) Consume backlog until we get a mempool sequence number
426+
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
427+
while zmq_mem_seq is None:
428+
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
429+
430+
assert label == "A" or label == "R"
431+
assert hash_str is not None
432+
433+
# 2) We need to "seed" our view of the mempool
434+
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
435+
mempool_view = set(mempool_snapshot["txids"])
436+
get_raw_seq = mempool_snapshot["mempool_sequence"]
437+
assert_equal(get_raw_seq, 6)
438+
# Snapshot may be too old compared to zmq message we read off latest
439+
while zmq_mem_seq >= get_raw_seq:
440+
sleep(2)
441+
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
442+
mempool_view = set(mempool_snapshot["txids"])
443+
get_raw_seq = mempool_snapshot["mempool_sequence"]
444+
445+
# Things continue to happen in the "interim" while waiting for snapshot results
446+
# We have node 0 do all these to avoid p2p races with RBF announcements
447+
for _ in range(num_txs):
448+
txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True))
449+
self.nodes[0].bumpfee(txids[-1])
450+
self.sync_all()
451+
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
452+
final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True)
453+
454+
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
455+
while True:
456+
if zmq_mem_seq == get_raw_seq - 1:
457+
break
458+
(hash_str, label, mempool_sequence) = seq.receive_sequence()
459+
if mempool_sequence is not None:
460+
zmq_mem_seq = mempool_sequence
461+
if zmq_mem_seq > get_raw_seq:
462+
raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
463+
464+
# 4) Moving forward, we apply the delta to our local view
465+
# remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
466+
expected_sequence = get_raw_seq
467+
r_gap = 0
468+
for _ in range(num_txs + 2 + 1 + 1):
469+
(hash_str, label, mempool_sequence) = seq.receive_sequence()
470+
if mempool_sequence is not None:
471+
if mempool_sequence != expected_sequence:
472+
# Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
473+
# position in the incoming block message "C"
474+
if label == "R":
475+
assert mempool_sequence > expected_sequence
476+
r_gap += mempool_sequence - expected_sequence
477+
else:
478+
raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence))
479+
if label == "A":
480+
assert hash_str not in mempool_view
481+
mempool_view.add(hash_str)
482+
expected_sequence = mempool_sequence + 1
483+
elif label == "R":
484+
assert hash_str in mempool_view
485+
mempool_view.remove(hash_str)
486+
expected_sequence = mempool_sequence + 1
487+
elif label == "C":
488+
# (Attempt to) remove all txids from known block connects
489+
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
490+
for txid in block_txids:
491+
if txid in mempool_view:
492+
expected_sequence += 1
493+
mempool_view.remove(txid)
494+
expected_sequence -= r_gap
495+
r_gap = 0
496+
elif label == "D":
497+
# Not useful for mempool tracking per se
498+
continue
499+
else:
500+
raise Exception("Unexpected ZMQ sequence label!")
501+
502+
assert_equal(self.nodes[0].getrawmempool(), [final_txid])
503+
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
504+
505+
# 5) If you miss a zmq/mempool sequence number, go back to step (2)
506+
507+
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
508+
207509
if __name__ == '__main__':
208510
ZMQTest().main()

0 commit comments

Comments
 (0)