@@ -80,34 +80,43 @@ def run_test(self):
80
80
self .log .debug ("Destroying ZMQ context" )
81
81
self .ctx .destroy (linger = None )
82
82
83
+ # Restart node with the specified zmq notifications enabled, subscribe to
84
+ # all of them and return the corresponding ZMQSubscriber objects.
85
+ def setup_zmq_test (self , services , recv_timeout = 60 , connect_nodes = False ):
86
+ subscribers = []
87
+ for topic , address in services :
88
+ socket = self .ctx .socket (zmq .SUB )
89
+ socket .set (zmq .RCVTIMEO , recv_timeout * 1000 )
90
+ subscribers .append (ZMQSubscriber (socket , topic .encode ()))
91
+
92
+ self .restart_node (0 , ["-zmqpub%s=%s" % (topic , address ) for topic , address in services ])
93
+
94
+ if connect_nodes :
95
+ self .connect_nodes (0 , 1 )
96
+
97
+ for i , sub in enumerate (subscribers ):
98
+ sub .socket .connect (services [i ][1 ])
99
+
100
+ # Relax so that the subscribers are ready before publishing zmq messages
101
+ sleep (0.2 )
102
+
103
+ return subscribers
104
+
83
105
def test_basic (self ):
84
106
85
107
# Invalid zmq arguments don't take down the node, see #17185.
86
108
self .restart_node (0 , ["-zmqpubrawtx=foo" , "-zmqpubhashtx=bar" ])
87
109
88
110
address = 'tcp://127.0.0.1:28332'
89
- sockets = []
90
- subs = []
91
- services = [b"hashblock" , b"hashtx" , b"rawblock" , b"rawtx" ]
92
- for service in services :
93
- sockets .append (self .ctx .socket (zmq .SUB ))
94
- sockets [- 1 ].set (zmq .RCVTIMEO , 60000 )
95
- subs .append (ZMQSubscriber (sockets [- 1 ], service ))
96
-
97
- # Subscribe to all available topics.
111
+ subs = self .setup_zmq_test (
112
+ [(topic , address ) for topic in ["hashblock" , "hashtx" , "rawblock" , "rawtx" ]],
113
+ connect_nodes = True )
114
+
98
115
hashblock = subs [0 ]
99
116
hashtx = subs [1 ]
100
117
rawblock = subs [2 ]
101
118
rawtx = subs [3 ]
102
119
103
- self .restart_node (0 , ["-zmqpub%s=%s" % (sub .topic .decode (), address ) for sub in [hashblock , hashtx , rawblock , rawtx ]])
104
- self .connect_nodes (0 , 1 )
105
- for socket in sockets :
106
- socket .connect (address )
107
-
108
- # Relax so that the subscriber is ready before publishing zmq messages
109
- sleep (0.2 )
110
-
111
120
num_blocks = 5
112
121
self .log .info ("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n" : num_blocks })
113
122
genhashes = self .nodes [0 ].generatetoaddress (num_blocks , ADDRESS_BCRT1_UNSPENDABLE )
@@ -174,25 +183,10 @@ def test_reorg(self):
174
183
175
184
address = 'tcp://127.0.0.1:28333'
176
185
177
- services = [b"hashblock" , b"hashtx" ]
178
- sockets = []
179
- subs = []
180
- for service in services :
181
- sockets .append (self .ctx .socket (zmq .SUB ))
182
- # 2 second timeout to check end of notifications
183
- sockets [- 1 ].set (zmq .RCVTIMEO , 2000 )
184
- subs .append (ZMQSubscriber (sockets [- 1 ], service ))
185
-
186
- # Subscribe to all available topics.
187
- hashblock = subs [0 ]
188
- hashtx = subs [1 ]
189
-
190
186
# Should only notify the tip if a reorg occurs
191
- self .restart_node (0 , ["-zmqpub%s=%s" % (sub .topic .decode (), address ) for sub in [hashblock , hashtx ]])
192
- for socket in sockets :
193
- socket .connect (address )
194
- # Relax so that the subscriber is ready before publishing zmq messages
195
- sleep (0.2 )
187
+ hashblock , hashtx = self .setup_zmq_test (
188
+ [(topic , address ) for topic in ["hashblock" , "hashtx" ]],
189
+ recv_timeout = 2 ) # 2 second timeout to check end of notifications
196
190
197
191
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
198
192
payment_txid = self .nodes [0 ].sendtoaddress (self .nodes [0 ].getnewaddress (), 1.0 )
@@ -240,15 +234,7 @@ def test_sequence(self):
240
234
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
241
235
"""
242
236
self .log .info ("Testing 'sequence' publisher" )
243
- address = 'tcp://127.0.0.1:28333'
244
- socket = self .ctx .socket (zmq .SUB )
245
- socket .set (zmq .RCVTIMEO , 60000 )
246
- seq = ZMQSubscriber (socket , b'sequence' )
247
-
248
- self .restart_node (0 , ['-zmqpub%s=%s' % (seq .topic .decode (), address )])
249
- socket .connect (address )
250
- # Relax so that the subscriber is ready before publishing zmq messages
251
- sleep (0.2 )
237
+ [seq ] = self .setup_zmq_test ([("sequence" , "tcp://127.0.0.1:28333" )])
252
238
253
239
# Mempool sequence number starts at 1
254
240
seq_num = 1
@@ -399,16 +385,7 @@ def test_mempool_sync(self):
399
385
return
400
386
401
387
self .log .info ("Testing 'mempool sync' usage of sequence notifier" )
402
- address = 'tcp://127.0.0.1:28333'
403
- socket = self .ctx .socket (zmq .SUB )
404
- socket .set (zmq .RCVTIMEO , 60000 )
405
- seq = ZMQSubscriber (socket , b'sequence' )
406
-
407
- self .restart_node (0 , ['-zmqpub%s=%s' % (seq .topic .decode (), address )])
408
- self .connect_nodes (0 , 1 )
409
- socket .connect (address )
410
- # Relax so that the subscriber is ready before publishing zmq messages
411
- sleep (0.2 )
388
+ [seq ] = self .setup_zmq_test ([("sequence" , "tcp://127.0.0.1:28333" )], connect_nodes = True )
412
389
413
390
# In-memory counter, should always start at 1
414
391
next_mempool_seq = self .nodes [0 ].getrawmempool (mempool_sequence = True )["mempool_sequence" ]
@@ -508,26 +485,17 @@ def test_mempool_sync(self):
508
485
509
486
def test_multiple_interfaces (self ):
510
487
# Set up two subscribers with different addresses
511
- subscribers = []
512
- for i in range (2 ):
513
- address = 'tcp://127.0.0.1:%d' % (28334 + i )
514
- socket = self .ctx .socket (zmq .SUB )
515
- socket .set (zmq .RCVTIMEO , 60000 )
516
- hashblock = ZMQSubscriber (socket , b"hashblock" )
517
- socket .connect (address )
518
- subscribers .append ({'address' : address , 'hashblock' : hashblock })
519
-
520
- self .restart_node (0 , ['-zmqpub%s=%s' % (subscriber ['hashblock' ].topic .decode (), subscriber ['address' ]) for subscriber in subscribers ])
521
-
522
- # Relax so that the subscriber is ready before publishing zmq messages
523
- sleep (0.2 )
488
+ subscribers = self .setup_zmq_test ([
489
+ ("hashblock" , "tcp://127.0.0.1:28334" ),
490
+ ("hashblock" , "tcp://127.0.0.1:28335" ),
491
+ ])
524
492
525
493
# Generate 1 block in nodes[0] and receive all notifications
526
494
self .nodes [0 ].generatetoaddress (1 , ADDRESS_BCRT1_UNSPENDABLE )
527
495
528
496
# Should receive the same block hash on both subscribers
529
- assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [0 ][ 'hashblock' ] .receive ().hex ())
530
- assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [1 ][ 'hashblock' ] .receive ().hex ())
497
+ assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [0 ].receive ().hex ())
498
+ assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [1 ].receive ().hex ())
531
499
532
500
if __name__ == '__main__' :
533
501
ZMQTest ().main ()
0 commit comments