Skip to content

Commit af4100c

Browse files
author
MarcoFalke
committed
Merge #16404: qa: Test ZMQ notification after chain reorg
abdfc5e qa: Test ZMQ notification after chain reorg (João Barbosa) aa2622a qa: Refactor ZMQ test (João Barbosa) 6bc1ff9 doc: Add note regarding ZMQ block notification (João Barbosa) Pull request description: Top commit has no ACKs. Tree-SHA512: b93237adc8c84b3aa72ccc28097090eabcb006cf408083218bebf6fec703bd0de2ded80b6879e77096872e14ba9402a6d3f923b146a54d4c4e41dcb862c3e765
2 parents db67101 + abdfc5e commit af4100c

File tree

2 files changed

+59
-39
lines changed

2 files changed

+59
-39
lines changed

doc/zmq.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ using other means such as firewalling.
111111

112112
Note that when the block chain tip changes, a reorganisation may occur
113113
and just the tip will be notified. It is up to the subscriber to
114-
retrieve the chain from the last known block to the new tip.
114+
retrieve the chain from the last known block to the new tip. Also note
115+
that no notification occurs if the tip was in the active chain - this
116+
is the case after calling invalidateblock RPC.
115117

116118
There are several possibilities that ZMQ notification can get lost
117119
during transmission depending on the communication type you are

test/functional/interface_zmq.py

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@
88
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
99
from test_framework.test_framework import BitcoinTestFramework
1010
from test_framework.messages import CTransaction, hash256
11-
from test_framework.util import assert_equal
11+
from test_framework.util import assert_equal, connect_nodes
1212
from io import BytesIO
1313

14-
ADDRESS = "tcp://127.0.0.1:28332"
15-
1614
def hash256_reversed(byte_str):
1715
return hash256(byte_str)[::-1]
1816

@@ -43,66 +41,62 @@ def skip_test_if_missing_module(self):
4341
self.skip_if_no_py3_zmq()
4442
self.skip_if_no_bitcoind_zmq()
4543

46-
def setup_nodes(self):
44+
def run_test(self):
4745
import zmq
46+
self.ctx = zmq.Context()
47+
try:
48+
self.test_basic()
49+
self.test_reorg()
50+
finally:
51+
# Destroy the ZMQ context.
52+
self.log.debug("Destroying ZMQ context")
53+
self.ctx.destroy(linger=None)
4854

49-
# Initialize ZMQ context and socket.
55+
def test_basic(self):
5056
# All messages are received in the same socket which means
5157
# that this test fails if the publishing order changes.
5258
# Note that the publishing order is not defined in the documentation and
5359
# is subject to change.
54-
self.zmq_context = zmq.Context()
55-
socket = self.zmq_context.socket(zmq.SUB)
60+
import zmq
61+
address = 'tcp://127.0.0.1:28332'
62+
socket = self.ctx.socket(zmq.SUB)
5663
socket.set(zmq.RCVTIMEO, 60000)
57-
socket.connect(ADDRESS)
64+
socket.connect(address)
5865

5966
# Subscribe to all available topics.
60-
self.hashblock = ZMQSubscriber(socket, b"hashblock")
61-
self.hashtx = ZMQSubscriber(socket, b"hashtx")
62-
self.rawblock = ZMQSubscriber(socket, b"rawblock")
63-
self.rawtx = ZMQSubscriber(socket, b"rawtx")
64-
65-
self.extra_args = [
66-
["-zmqpub%s=%s" % (sub.topic.decode(), ADDRESS) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]],
67-
[],
68-
]
69-
self.add_nodes(self.num_nodes, self.extra_args)
70-
self.start_nodes()
71-
self.import_deterministic_coinbase_privkeys()
67+
hashblock = ZMQSubscriber(socket, b"hashblock")
68+
hashtx = ZMQSubscriber(socket, b"hashtx")
69+
rawblock = ZMQSubscriber(socket, b"rawblock")
70+
rawtx = ZMQSubscriber(socket, b"rawtx")
7271

73-
def run_test(self):
74-
try:
75-
self._zmq_test()
76-
finally:
77-
# Destroy the ZMQ context.
78-
self.log.debug("Destroying ZMQ context")
79-
self.zmq_context.destroy(linger=None)
72+
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
73+
connect_nodes(self.nodes[0], 1)
8074

81-
def _zmq_test(self):
8275
num_blocks = 5
8376
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
8477
genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
78+
8579
self.sync_all()
8680

8781
for x in range(num_blocks):
8882
# Should receive the coinbase txid.
89-
txid = self.hashtx.receive()
83+
txid = hashtx.receive()
9084

9185
# Should receive the coinbase raw transaction.
92-
hex = self.rawtx.receive()
86+
hex = rawtx.receive()
9387
tx = CTransaction()
9488
tx.deserialize(BytesIO(hex))
9589
tx.calc_sha256()
9690
assert_equal(tx.hash, txid.hex())
9791

9892
# Should receive the generated block hash.
99-
hash = self.hashblock.receive().hex()
93+
hash = hashblock.receive().hex()
10094
assert_equal(genhashes[x], hash)
10195
# The block should only have the coinbase txid.
10296
assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
10397

10498
# Should receive the generated raw block.
105-
block = self.rawblock.receive()
99+
block = rawblock.receive()
106100
assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
107101

108102
if self.is_wallet_compiled():
@@ -111,23 +105,47 @@ def _zmq_test(self):
111105
self.sync_all()
112106

113107
# Should receive the broadcasted txid.
114-
txid = self.hashtx.receive()
108+
txid = hashtx.receive()
115109
assert_equal(payment_txid, txid.hex())
116110

117111
# Should receive the broadcasted raw transaction.
118-
hex = self.rawtx.receive()
112+
hex = rawtx.receive()
119113
assert_equal(payment_txid, hash256_reversed(hex).hex())
120114

121115

122116
self.log.info("Test the getzmqnotifications RPC")
123117
assert_equal(self.nodes[0].getzmqnotifications(), [
124-
{"type": "pubhashblock", "address": ADDRESS, "hwm": 1000},
125-
{"type": "pubhashtx", "address": ADDRESS, "hwm": 1000},
126-
{"type": "pubrawblock", "address": ADDRESS, "hwm": 1000},
127-
{"type": "pubrawtx", "address": ADDRESS, "hwm": 1000},
118+
{"type": "pubhashblock", "address": address, "hwm": 1000},
119+
{"type": "pubhashtx", "address": address, "hwm": 1000},
120+
{"type": "pubrawblock", "address": address, "hwm": 1000},
121+
{"type": "pubrawtx", "address": address, "hwm": 1000},
128122
])
129123

130124
assert_equal(self.nodes[1].getzmqnotifications(), [])
131125

126+
def test_reorg(self):
127+
import zmq
128+
address = 'tcp://127.0.0.1:28333'
129+
socket = self.ctx.socket(zmq.SUB)
130+
socket.set(zmq.RCVTIMEO, 60000)
131+
socket.connect(address)
132+
hashblock = ZMQSubscriber(socket, b'hashblock')
133+
134+
# Should only notify the tip if a reorg occurs
135+
self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)])
136+
137+
# Generate 1 block in nodes[0] and receive all notifications
138+
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
139+
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
140+
141+
# Generate 2 blocks in nodes[1]
142+
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
143+
144+
# nodes[0] will reorg chain after connecting back nodes[1]
145+
connect_nodes(self.nodes[0], 1)
146+
147+
# Should receive nodes[1] tip
148+
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
149+
132150
if __name__ == '__main__':
133151
ZMQTest().main()

0 commit comments

Comments
 (0)