Skip to content

Commit e796fdd

Browse files
committed
Merge #19507: Expand functional zmq transaction tests
7356292 Have zmq reorg test cover mempool txns (Gregory Sanders) a0f4f9c Add zmq test for transaction pub during reorg (Gregory Sanders) 2399a06 Add test case for mempool->block zmq notification (Gregory Sanders) e70512a Make ordering of zmq consumption irrelevant to functional test (Gregory Sanders) Pull request description: Tests written to better define what messages are sent when. Also did a bit of refactoring to make sure the exact notification channel ordering doesn't matter. Confusions below aside, I believe having these more descriptive tests helps describe what behavior we expect from ZMQ notificaitons. Remaining confusion: 1) Notification patterns seem to vary wildly with the inclusion of mempool transactions being reorg'ed. See difference between "Add zmq test for transaction pub during reorg" and "Have zmq reorg test cover mempool txns" commits for specifics. 2) Why does a reorg'ed transaction get announced 3 times? From what I understand it can get announced once for disconnected block, once for mempool entry. What's the third? It occurs a 4th time when included in a block(not added in test) ACKs for top commit: laanwj: code review ACK 7356292 promag: Code review ACK 7356292. Tree-SHA512: 573662429523fd6a1af23dd907117320bc68cb51a93fba9483c9a2160bdce51fb590fcd97bcd2b2751d543d5c1148efa4e22e1c3901144f882b990ed2b450038
2 parents 89a8299 + 7356292 commit e796fdd

File tree

1 file changed

+70
-22
lines changed

1 file changed

+70
-22
lines changed

test/functional/interface_zmq.py

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,28 +54,31 @@ def run_test(self):
5454
self.ctx.destroy(linger=None)
5555

5656
def test_basic(self):
57-
# All messages are received in the same socket which means
58-
# that this test fails if the publishing order changes.
59-
# Note that the publishing order is not defined in the documentation and
60-
# is subject to change.
6157
import zmq
6258

6359
# Invalid zmq arguments don't take down the node, see #17185.
6460
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
6561

6662
address = 'tcp://127.0.0.1:28332'
67-
socket = self.ctx.socket(zmq.SUB)
68-
socket.set(zmq.RCVTIMEO, 60000)
63+
sockets = []
64+
subs = []
65+
services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"]
66+
for service in services:
67+
sockets.append(self.ctx.socket(zmq.SUB))
68+
sockets[-1].set(zmq.RCVTIMEO, 60000)
69+
subs.append(ZMQSubscriber(sockets[-1], service))
6970

7071
# Subscribe to all available topics.
71-
hashblock = ZMQSubscriber(socket, b"hashblock")
72-
hashtx = ZMQSubscriber(socket, b"hashtx")
73-
rawblock = ZMQSubscriber(socket, b"rawblock")
74-
rawtx = ZMQSubscriber(socket, b"rawtx")
72+
hashblock = subs[0]
73+
hashtx = subs[1]
74+
rawblock = subs[2]
75+
rawtx = subs[3]
7576

7677
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
7778
connect_nodes(self.nodes[0], 1)
78-
socket.connect(address)
79+
for socket in sockets:
80+
socket.connect(address)
81+
7982
# Relax so that the subscriber is ready before publishing zmq messages
8083
sleep(0.2)
8184

@@ -96,15 +99,16 @@ def test_basic(self):
9699
tx.calc_sha256()
97100
assert_equal(tx.hash, txid.hex())
98101

102+
# Should receive the generated raw block.
103+
block = rawblock.receive()
104+
assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
105+
99106
# Should receive the generated block hash.
100107
hash = hashblock.receive().hex()
101108
assert_equal(genhashes[x], hash)
102109
# The block should only have the coinbase txid.
103110
assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
104111

105-
# Should receive the generated raw block.
106-
block = rawblock.receive()
107-
assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
108112

109113
if self.is_wallet_compiled():
110114
self.log.info("Wait for tx from second node")
@@ -119,6 +123,13 @@ def test_basic(self):
119123
hex = rawtx.receive()
120124
assert_equal(payment_txid, hash256_reversed(hex).hex())
121125

126+
# Mining the block with this tx should result in second notification
127+
# after coinbase tx notification
128+
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
129+
hashtx.receive()
130+
txid = hashtx.receive()
131+
assert_equal(payment_txid, txid.hex())
132+
122133

123134
self.log.info("Test the getzmqnotifications RPC")
124135
assert_equal(self.nodes[0].getzmqnotifications(), [
@@ -131,30 +142,67 @@ def test_basic(self):
131142
assert_equal(self.nodes[1].getzmqnotifications(), [])
132143

133144
def test_reorg(self):
145+
if not self.is_wallet_compiled():
146+
self.log.info("Skipping reorg test because wallet is disabled")
147+
return
148+
134149
import zmq
135150
address = 'tcp://127.0.0.1:28333'
136-
socket = self.ctx.socket(zmq.SUB)
137-
socket.set(zmq.RCVTIMEO, 60000)
138-
hashblock = ZMQSubscriber(socket, b'hashblock')
151+
152+
services = [b"hashblock", b"hashtx"]
153+
sockets = []
154+
subs = []
155+
for service in services:
156+
sockets.append(self.ctx.socket(zmq.SUB))
157+
# 2 second timeout to check end of notifications
158+
sockets[-1].set(zmq.RCVTIMEO, 2000)
159+
subs.append(ZMQSubscriber(sockets[-1], service))
160+
161+
# Subscribe to all available topics.
162+
hashblock = subs[0]
163+
hashtx = subs[1]
139164

140165
# Should only notify the tip if a reorg occurs
141-
self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)])
142-
socket.connect(address)
166+
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]])
167+
for socket in sockets:
168+
socket.connect(address)
143169
# Relax so that the subscriber is ready before publishing zmq messages
144170
sleep(0.2)
145171

146-
# Generate 1 block in nodes[0] and receive all notifications
147-
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
172+
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
173+
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
174+
disconnect_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
175+
disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
148176
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
177+
assert_equal(hashtx.receive().hex(), payment_txid)
178+
assert_equal(hashtx.receive().hex(), disconnect_cb)
149179

150180
# Generate 2 blocks in nodes[1]
151-
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
181+
connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
152182

153183
# nodes[0] will reorg chain after connecting back nodes[1]
154184
connect_nodes(self.nodes[0], 1)
185+
self.sync_blocks() # tx in mempool valid but not advertised
155186

156187
# Should receive nodes[1] tip
157188
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
158189

190+
# During reorg:
191+
# Get old payment transaction notification from disconnect and disconnected cb
192+
assert_equal(hashtx.receive().hex(), payment_txid)
193+
assert_equal(hashtx.receive().hex(), disconnect_cb)
194+
# And the payment transaction again due to mempool entry
195+
assert_equal(hashtx.receive().hex(), payment_txid)
196+
assert_equal(hashtx.receive().hex(), payment_txid)
197+
# And the new connected coinbases
198+
for i in [0, 1]:
199+
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
200+
201+
# If we do a simple invalidate we announce the disconnected coinbase
202+
self.nodes[0].invalidateblock(connect_blocks[1])
203+
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
204+
# And the current tip
205+
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
206+
159207
if __name__ == '__main__':
160208
ZMQTest().main()

0 commit comments

Comments
 (0)