2
2
# Copyright (c) 2015-2016 The Bitcoin Core developers
3
3
# Distributed under the MIT software license, see the accompanying
4
4
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
- """Test the ZMQ API ."""
5
+ """Test the ZMQ notification interface ."""
6
6
import configparser
7
7
import os
8
8
import struct
13
13
hash256 ,
14
14
)
15
15
16
+ class ZMQSubscriber :
17
+ def __init__ (self , socket , topic ):
18
+ self .sequence = 0
19
+ self .socket = socket
20
+ self .topic = topic
21
+
22
+ import zmq
23
+ self .socket .setsockopt (zmq .SUBSCRIBE , self .topic )
24
+
25
+ def receive (self ):
26
+ topic , body , seq = self .socket .recv_multipart ()
27
+ # Topic should match the subscriber topic.
28
+ assert_equal (topic , self .topic )
29
+ # Sequence should be incremental.
30
+ assert_equal (struct .unpack ('<I' , seq )[- 1 ], self .sequence )
31
+ self .sequence += 1
32
+ return body
33
+
34
+
16
35
class ZMQTest (BitcoinTestFramework ):
17
36
def set_test_params (self ):
18
37
self .num_nodes = 2
@@ -24,130 +43,79 @@ def setup_nodes(self):
24
43
except ImportError :
25
44
raise SkipTest ("python3-zmq module not available." )
26
45
27
- # Check that bitcoin has been built with ZMQ enabled
46
+ # Check that bitcoin has been built with ZMQ enabled.
28
47
config = configparser .ConfigParser ()
29
48
if not self .options .configfile :
30
- self .options .configfile = os .path .dirname (__file__ ) + "/ ../config.ini"
49
+ self .options .configfile = os .path .abspath ( os . path . join ( os . path . dirname (__file__ ), " ../config.ini"))
31
50
config .read_file (open (self .options .configfile ))
32
51
33
52
if not config ["components" ].getboolean ("ENABLE_ZMQ" ):
34
53
raise SkipTest ("bitcoind has not been built with zmq enabled." )
35
54
36
- self .zmqContext = zmq .Context ()
37
- self .zmqSubSocket = self .zmqContext .socket (zmq .SUB )
38
- self .zmqSubSocket .set (zmq .RCVTIMEO , 60000 )
39
- self .zmqSubSocket .setsockopt (zmq .SUBSCRIBE , b"hashblock" )
40
- self .zmqSubSocket .setsockopt (zmq .SUBSCRIBE , b"hashtx" )
41
- self .zmqSubSocket .setsockopt (zmq .SUBSCRIBE , b"rawblock" )
42
- self .zmqSubSocket .setsockopt (zmq .SUBSCRIBE , b"rawtx" )
43
- ip_address = "tcp://127.0.0.1:28332"
44
- self .zmqSubSocket .connect (ip_address )
45
- self .extra_args = [['-zmqpubhashblock=%s' % ip_address , '-zmqpubhashtx=%s' % ip_address ,
46
- '-zmqpubrawblock=%s' % ip_address , '-zmqpubrawtx=%s' % ip_address ], []]
55
+ # Initialize ZMQ context and socket.
56
+ # All messages are received in the same socket which means
57
+ # that this test fails if the publishing order changes.
58
+ # Note that the publishing order is not defined in the documentation and
59
+ # is subject to change.
60
+ address = "tcp://127.0.0.1:28332"
61
+ self .zmq_context = zmq .Context ()
62
+ socket = self .zmq_context .socket (zmq .SUB )
63
+ socket .set (zmq .RCVTIMEO , 60000 )
64
+ socket .connect (address )
65
+
66
+ # Subscribe to all available topics.
67
+ self .hashblock = ZMQSubscriber (socket , b"hashblock" )
68
+ self .hashtx = ZMQSubscriber (socket , b"hashtx" )
69
+ self .rawblock = ZMQSubscriber (socket , b"rawblock" )
70
+ self .rawtx = ZMQSubscriber (socket , b"rawtx" )
71
+
72
+ self .extra_args = [["-zmqpub%s=%s" % (sub .topic .decode (), address ) for sub in [self .hashblock , self .hashtx , self .rawblock , self .rawtx ]], []]
47
73
self .add_nodes (self .num_nodes , self .extra_args )
48
74
self .start_nodes ()
49
75
50
76
def run_test (self ):
51
77
try :
52
78
self ._zmq_test ()
53
79
finally :
54
- # Destroy the zmq context
55
- self .log .debug ("Destroying zmq context" )
56
- self .zmqContext .destroy (linger = None )
80
+ # Destroy the ZMQ context.
81
+ self .log .debug ("Destroying ZMQ context" )
82
+ self .zmq_context .destroy (linger = None )
57
83
58
84
def _zmq_test (self ):
59
- genhashes = self .nodes [0 ].generate (1 )
85
+ num_blocks = 5
86
+ self .log .info ("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n" : num_blocks })
87
+ genhashes = self .nodes [0 ].generate (num_blocks )
60
88
self .sync_all ()
61
89
62
- self .log .info ("Wait for tx" )
63
- msg = self .zmqSubSocket .recv_multipart ()
64
- topic = msg [0 ]
65
- assert_equal (topic , b"hashtx" )
66
- txhash = msg [1 ]
67
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
68
- assert_equal (msgSequence , 0 ) # must be sequence 0 on hashtx
69
-
70
- # rawtx
71
- msg = self .zmqSubSocket .recv_multipart ()
72
- topic = msg [0 ]
73
- assert_equal (topic , b"rawtx" )
74
- body = msg [1 ]
75
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
76
- assert_equal (msgSequence , 0 ) # must be sequence 0 on rawtx
77
-
78
- # Check that the rawtx hashes to the hashtx
79
- assert_equal (hash256 (body ), txhash )
80
-
81
- self .log .info ("Wait for block" )
82
- msg = self .zmqSubSocket .recv_multipart ()
83
- topic = msg [0 ]
84
- assert_equal (topic , b"hashblock" )
85
- body = msg [1 ]
86
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
87
- assert_equal (msgSequence , 0 ) # must be sequence 0 on hashblock
88
- blkhash = bytes_to_hex_str (body )
89
- assert_equal (genhashes [0 ], blkhash ) # blockhash from generate must be equal to the hash received over zmq
90
-
91
- # rawblock
92
- msg = self .zmqSubSocket .recv_multipart ()
93
- topic = msg [0 ]
94
- assert_equal (topic , b"rawblock" )
95
- body = msg [1 ]
96
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
97
- assert_equal (msgSequence , 0 ) #must be sequence 0 on rawblock
98
-
99
- # Check the hash of the rawblock's header matches generate
100
- assert_equal (genhashes [0 ], bytes_to_hex_str (hash256 (body [:80 ])))
101
-
102
- self .log .info ("Generate 10 blocks (and 10 coinbase txes)" )
103
- n = 10
104
- genhashes = self .nodes [1 ].generate (n )
105
- self .sync_all ()
90
+ for x in range (num_blocks ):
91
+ # Should receive the coinbase txid.
92
+ txid = self .hashtx .receive ()
93
+
94
+ # Should receive the coinbase raw transaction.
95
+ hex = self .rawtx .receive ()
96
+ assert_equal (hash256 (hex ), txid )
106
97
107
- zmqHashes = []
108
- zmqRawHashed = []
109
- blockcount = 0
110
- for x in range (n * 4 ):
111
- msg = self .zmqSubSocket .recv_multipart ()
112
- topic = msg [0 ]
113
- body = msg [1 ]
114
- if topic == b"hashblock" :
115
- zmqHashes .append (bytes_to_hex_str (body ))
116
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
117
- assert_equal (msgSequence , blockcount + 1 )
118
- blockcount += 1
119
- if topic == b"rawblock" :
120
- zmqRawHashed .append (bytes_to_hex_str (hash256 (body [:80 ])))
121
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
122
- assert_equal (msgSequence , blockcount )
123
-
124
- for x in range (n ):
125
- assert_equal (genhashes [x ], zmqHashes [x ]) # blockhash from generate must be equal to the hash received over zmq
126
- assert_equal (genhashes [x ], zmqRawHashed [x ])
98
+ # Should receive the generated block hash.
99
+ hash = bytes_to_hex_str (self .hashblock .receive ())
100
+ assert_equal (genhashes [x ], hash )
101
+ # The block should only have the coinbase txid.
102
+ assert_equal ([bytes_to_hex_str (txid )], self .nodes [1 ].getblock (hash )["tx" ])
103
+
104
+ # Should receive the generated raw block.
105
+ block = self .rawblock .receive ()
106
+ assert_equal (genhashes [x ], bytes_to_hex_str (hash256 (block [:80 ])))
127
107
128
108
self .log .info ("Wait for tx from second node" )
129
- # test tx from a second node
130
- hashRPC = self .nodes [1 ].sendtoaddress (self .nodes [0 ].getnewaddress (), 1.0 )
109
+ payment_txid = self .nodes [1 ].sendtoaddress (self .nodes [0 ].getnewaddress (), 1.0 )
131
110
self .sync_all ()
132
111
133
- # now we should receive a zmq msg because the tx was broadcast
134
- msg = self .zmqSubSocket .recv_multipart ()
135
- topic = msg [0 ]
136
- assert_equal (topic , b"hashtx" )
137
- body = msg [1 ]
138
- hashZMQ = bytes_to_hex_str (body )
139
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
140
- assert_equal (msgSequence , blockcount + 1 )
141
-
142
- msg = self .zmqSubSocket .recv_multipart ()
143
- topic = msg [0 ]
144
- assert_equal (topic , b"rawtx" )
145
- body = msg [1 ]
146
- hashedZMQ = bytes_to_hex_str (hash256 (body ))
147
- msgSequence = struct .unpack ('<I' , msg [- 1 ])[- 1 ]
148
- assert_equal (msgSequence , blockcount + 1 )
149
- assert_equal (hashRPC , hashZMQ ) # txid from sendtoaddress must be equal to the hash received over zmq
150
- assert_equal (hashRPC , hashedZMQ )
112
+ # Should receive the broadcasted txid.
113
+ txid = self .hashtx .receive ()
114
+ assert_equal (payment_txid , bytes_to_hex_str (txid ))
115
+
116
+ # Should receive the broadcasted raw transaction.
117
+ hex = self .rawtx .receive ()
118
+ assert_equal (payment_txid , bytes_to_hex_str (hash256 (hex )))
151
119
152
120
if __name__ == '__main__' :
153
121
ZMQTest ().main ()
0 commit comments