Skip to content

Commit 5c65463

Browse files
committed
zmq test: fix flakiness by using more robust sync method
After connecting the subscriber sockets to the node, there is no guarantee that the node's zmq publisher interfaces are ready yet, which means that potentially the first expected notification messages could get lost and the test fails. Currently this is handled by just waiting for a short period of time (200ms), which works most of the time but is still problematic, as in some rare cases the setup time takes much longer, even in the range of multiple seconds. The solution in this commit approaches the problem by using a more robust method of syncing up, originally proposed by instagibbs: 1. Generate a block on the node 2. Try to receive a notification on all subscribers 3. If all subscribers get a message within the timeout (1 second), we are done, otherwise repeat starting from step 1
1 parent 8666033 commit 5c65463

File tree

1 file changed

+37
-12
lines changed

1 file changed

+37
-12
lines changed

test/functional/interface_zmq.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,23 +87,45 @@ def run_test(self):
8787

8888
# Restart node with the specified zmq notifications enabled, subscribe to
8989
# all of them and return the corresponding ZMQSubscriber objects.
90-
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
90+
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
9191
subscribers = []
9292
for topic, address in services:
9393
socket = self.ctx.socket(zmq.SUB)
94-
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
9594
subscribers.append(ZMQSubscriber(socket, topic.encode()))
9695

9796
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
9897

99-
if connect_nodes:
100-
self.connect_nodes(0, 1)
101-
10298
for i, sub in enumerate(subscribers):
10399
sub.socket.connect(services[i][1])
104100

105-
# Relax so that the subscribers are ready before publishing zmq messages
106-
sleep(0.2)
101+
# Ensure that all zmq publisher notification interfaces are ready by
102+
# running the following "sync up" procedure:
103+
# 1. Generate a block on the node
104+
# 2. Try to receive a notification on all subscribers
105+
# 3. If all subscribers get a message within the timeout (1 second),
106+
# we are done, otherwise repeat starting from step 1
107+
for sub in subscribers:
108+
sub.socket.set(zmq.RCVTIMEO, 1000)
109+
while True:
110+
self.nodes[0].generate(1)
111+
recv_failed = False
112+
for sub in subscribers:
113+
try:
114+
sub.receive()
115+
except zmq.error.Again:
116+
self.log.debug("Didn't receive sync-up notification, trying again.")
117+
recv_failed = True
118+
if not recv_failed:
119+
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
120+
break
121+
122+
# set subscriber's desired timeout for the test
123+
for sub in subscribers:
124+
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
125+
126+
self.connect_nodes(0, 1)
127+
if sync_blocks:
128+
self.sync_blocks()
107129

108130
return subscribers
109131

@@ -113,9 +135,7 @@ def test_basic(self):
113135
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
114136

115137
address = 'tcp://127.0.0.1:28332'
116-
subs = self.setup_zmq_test(
117-
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
118-
connect_nodes=True)
138+
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
119139

120140
hashblock = subs[0]
121141
hashtx = subs[1]
@@ -192,6 +212,7 @@ def test_reorg(self):
192212
hashblock, hashtx = self.setup_zmq_test(
193213
[(topic, address) for topic in ["hashblock", "hashtx"]],
194214
recv_timeout=2) # 2 second timeout to check end of notifications
215+
self.disconnect_nodes(0, 1)
195216

196217
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
197218
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@@ -240,6 +261,7 @@ def test_sequence(self):
240261
"""
241262
self.log.info("Testing 'sequence' publisher")
242263
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
264+
self.disconnect_nodes(0, 1)
243265

244266
# Mempool sequence number starts at 1
245267
seq_num = 1
@@ -390,7 +412,7 @@ def test_mempool_sync(self):
390412
return
391413

392414
self.log.info("Testing 'mempool sync' usage of sequence notifier")
393-
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
415+
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
394416

395417
# In-memory counter, should always start at 1
396418
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
@@ -490,10 +512,13 @@ def test_mempool_sync(self):
490512

491513
def test_multiple_interfaces(self):
492514
# Set up two subscribers with different addresses
515+
# (note that after the reorg test, syncing would fail due to different
516+
# chain lengths on node0 and node1; for this test we only need node0, so
517+
# we can disable syncing blocks on the setup)
493518
subscribers = self.setup_zmq_test([
494519
("hashblock", "tcp://127.0.0.1:28334"),
495520
("hashblock", "tcp://127.0.0.1:28335"),
496-
])
521+
], sync_blocks=False)
497522

498523
# Generate 1 block in nodes[0] and receive all notifications
499524
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)

0 commit comments

Comments
 (0)