Skip to content

Commit 3c9d9d2

Browse files
author
MarcoFalke
committed
Merge #21008: test: fix zmq test flakiness, improve speed
ef21fb7 zmq test: speedup test by whitelisting peers (immediate tx relay) (Sebastian Falbesoner) 5c65463 zmq test: fix flakiness by using more robust sync method (Sebastian Falbesoner) 8666033 zmq test: accept arbitrary sequence start number in ZMQSubscriber (Sebastian Falbesoner) 6014d6e zmq test: dedup message reception handling in ZMQSubscriber (Sebastian Falbesoner) Pull request description: Fixes #20934 by using the "sync up" method described in bitcoin/bitcoin#20538 (comment). After improving robustness with this approach (commits 1-3), it turned out that there were still some fails, but those were unrelated to zmq: Out of 500 runs, 3 times `sync_mempool()` or `sync_blocks()` timed out, which can happen because the trickle relay time has no upper bound -- hence in rare cases, it takes longer than 60s. This is fixed by enabling immediate tx relay on node1 (commit 4), which as a nice side-effect also gives us a rough 2x speedup for the test. For further details, also see the explanations in the commit messages. There is no guarantee that the test is still not flaky, but it would help if potential reviewers would run the following script locally and report how many runs failed (feel free to do less than 1000 runs, as this takes quite a long if ran with `--valgrind`): ``` #!/bin/sh OUTPUT_FILE=./zmq_results echo ===== repeated zmq test ===== > $OUTPUT_FILE for i in `seq 1000`; do echo ------------------------ echo ----- test run $i ----- echo ------------------------ echo --- $i --- >> $OUTPUT_FILE ./test/functional/interface_zmq.py --valgrind if [ $? -ne 0 ]; then echo "FAILED. /o\\" >> $OUTPUT_FILE else echo "PASSED. \\o/" >> $OUTPUT_FILE fi done echo Failed test runs: grep FAILED $OUTPUT_FILE | wc -l ``` ACKs for top commit: jonatack: Light ACK ef21fb7 with the caveat that I was unable to make the test fail with valgrind both here and on master, so I can't vouch that it actually fixes the CI flakiness. The test does run ~2x faster with this. Tree-SHA512: 7a1e7592fbbd98e69e1e1294486b91253e589c72b3c6bbb7f587028ec07cca59b7d984e4ebf256c4bc3e8a529ec77d31842f3dd874038aea0b684abfea50306a
2 parents 9bbf08b + ef21fb7 commit 3c9d9d2

File tree

1 file changed

+54
-22
lines changed

1 file changed

+54
-22
lines changed

test/functional/interface_zmq.py

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,31 @@ def hash256_reversed(byte_str):
2727

2828
class ZMQSubscriber:
2929
def __init__(self, socket, topic):
30-
self.sequence = 0
30+
self.sequence = None # no sequence number received yet
3131
self.socket = socket
3232
self.topic = topic
3333

3434
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
3535

36-
def receive(self):
36+
# Receive message from publisher and verify that topic and sequence match
37+
def _receive_from_publisher_and_check(self):
3738
topic, body, seq = self.socket.recv_multipart()
3839
# Topic should match the subscriber topic.
3940
assert_equal(topic, self.topic)
4041
# Sequence should be incremental.
41-
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
42+
received_seq = struct.unpack('<I', seq)[-1]
43+
if self.sequence is None:
44+
self.sequence = received_seq
45+
else:
46+
assert_equal(received_seq, self.sequence)
4247
self.sequence += 1
4348
return body
4449

50+
def receive(self):
51+
return self._receive_from_publisher_and_check()
52+
4553
def receive_sequence(self):
46-
topic, body, seq = self.socket.recv_multipart()
47-
# Topic should match the subscriber topic.
48-
assert_equal(topic, self.topic)
49-
# Sequence should be incremental.
50-
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
51-
self.sequence += 1
54+
body = self._receive_from_publisher_and_check()
5255
hash = body[:32].hex()
5356
label = chr(body[32])
5457
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
@@ -64,6 +67,9 @@ def set_test_params(self):
6467
self.num_nodes = 2
6568
if self.is_wallet_compiled():
6669
self.requires_wallet = True
70+
# This test isn't testing txn relay/timing, so set whitelist on the
71+
# peers for instant txn relay. This speeds up the test run time 2-3x.
72+
self.extra_args = [["[email protected]"]] * self.num_nodes
6773

6874
def skip_test_if_missing_module(self):
6975
self.skip_if_no_py3_zmq()
@@ -84,23 +90,46 @@ def run_test(self):
8490

8591
# Restart node with the specified zmq notifications enabled, subscribe to
8692
# all of them and return the corresponding ZMQSubscriber objects.
87-
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
93+
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
8894
subscribers = []
8995
for topic, address in services:
9096
socket = self.ctx.socket(zmq.SUB)
91-
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
9297
subscribers.append(ZMQSubscriber(socket, topic.encode()))
9398

94-
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
95-
96-
if connect_nodes:
97-
self.connect_nodes(0, 1)
99+
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] +
100+
self.extra_args[0])
98101

99102
for i, sub in enumerate(subscribers):
100103
sub.socket.connect(services[i][1])
101104

102-
# Relax so that the subscribers are ready before publishing zmq messages
103-
sleep(0.2)
105+
# Ensure that all zmq publisher notification interfaces are ready by
106+
# running the following "sync up" procedure:
107+
# 1. Generate a block on the node
108+
# 2. Try to receive a notification on all subscribers
109+
# 3. If all subscribers get a message within the timeout (1 second),
110+
# we are done, otherwise repeat starting from step 1
111+
for sub in subscribers:
112+
sub.socket.set(zmq.RCVTIMEO, 1000)
113+
while True:
114+
self.nodes[0].generate(1)
115+
recv_failed = False
116+
for sub in subscribers:
117+
try:
118+
sub.receive()
119+
except zmq.error.Again:
120+
self.log.debug("Didn't receive sync-up notification, trying again.")
121+
recv_failed = True
122+
if not recv_failed:
123+
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
124+
break
125+
126+
# set subscriber's desired timeout for the test
127+
for sub in subscribers:
128+
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
129+
130+
self.connect_nodes(0, 1)
131+
if sync_blocks:
132+
self.sync_blocks()
104133

105134
return subscribers
106135

@@ -110,9 +139,7 @@ def test_basic(self):
110139
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
111140

112141
address = 'tcp://127.0.0.1:28332'
113-
subs = self.setup_zmq_test(
114-
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
115-
connect_nodes=True)
142+
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
116143

117144
hashblock = subs[0]
118145
hashtx = subs[1]
@@ -189,6 +216,7 @@ def test_reorg(self):
189216
hashblock, hashtx = self.setup_zmq_test(
190217
[(topic, address) for topic in ["hashblock", "hashtx"]],
191218
recv_timeout=2) # 2 second timeout to check end of notifications
219+
self.disconnect_nodes(0, 1)
192220

193221
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
194222
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@@ -237,6 +265,7 @@ def test_sequence(self):
237265
"""
238266
self.log.info("Testing 'sequence' publisher")
239267
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
268+
self.disconnect_nodes(0, 1)
240269

241270
# Mempool sequence number starts at 1
242271
seq_num = 1
@@ -387,7 +416,7 @@ def test_mempool_sync(self):
387416
return
388417

389418
self.log.info("Testing 'mempool sync' usage of sequence notifier")
390-
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
419+
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
391420

392421
# In-memory counter, should always start at 1
393422
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
@@ -487,10 +516,13 @@ def test_mempool_sync(self):
487516

488517
def test_multiple_interfaces(self):
489518
# Set up two subscribers with different addresses
519+
# (note that after the reorg test, syncing would fail due to different
520+
# chain lengths on node0 and node1; for this test we only need node0, so
521+
# we can disable syncing blocks on the setup)
490522
subscribers = self.setup_zmq_test([
491523
("hashblock", "tcp://127.0.0.1:28334"),
492524
("hashblock", "tcp://127.0.0.1:28335"),
493-
])
525+
], sync_blocks=False)
494526

495527
# Generate 1 block in nodes[0] and receive all notifications
496528
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)

0 commit comments

Comments
 (0)