Skip to content

Commit 61a640e

Browse files
committed
Merge #9485: ZMQ example using python3 and asyncio
b471daf Adddress nits, use asyncio signal handling, create_task (Bob McElrath) 4bb7d1b Add python version checks and 3.4 example (Bob McElrath) 5406d51 Rewrite to not use Polling wrapper for asyncio, link to python2.7 example (Bob McElrath) 5ea5368 ZMQ example using python3 and asyncio (Bob McElrath)
2 parents 5f0556d + b471daf commit 61a640e

File tree

2 files changed

+158
-28
lines changed

2 files changed

+158
-28
lines changed

contrib/zmq/zmq_sub.py

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,84 @@
1-
#!/usr/bin/env python2
1+
#!/usr/bin/env python3
22
# Copyright (c) 2014-2016 The Bitcoin Core developers
33
# Distributed under the MIT software license, see the accompanying
44
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
55

6+
"""
7+
ZMQ example using python3's asyncio
8+
9+
Bitcoin should be started with the command line arguments:
10+
bitcoind -testnet -daemon \
11+
-zmqpubhashblock=tcp://127.0.0.1:28332 \
12+
-zmqpubrawtx=tcp://127.0.0.1:28332 \
13+
-zmqpubhashtx=tcp://127.0.0.1:28332 \
14+
-zmqpubhashblock=tcp://127.0.0.1:28332
15+
16+
We use the asyncio library here. `self.handle()` installs itself as a
17+
future at the end of the function. Since it never returns with the event
18+
loop having an empty stack of futures, this creates an infinite loop. An
19+
alternative is to wrap the contents of `handle` inside `while True`.
20+
21+
A blocking example using python 2.7 can be obtained from the git history:
22+
https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py
23+
"""
24+
625
import binascii
26+
import asyncio
727
import zmq
28+
import zmq.asyncio
29+
import signal
830
import struct
31+
import sys
32+
33+
if not (sys.version_info.major >= 3 and sys.version_info.minor >= 5):
34+
print("This example only works with Python 3.5 and greater")
35+
exit(1)
936

1037
port = 28332
1138

12-
zmqContext = zmq.Context()
13-
zmqSubSocket = zmqContext.socket(zmq.SUB)
14-
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
15-
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
16-
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
17-
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
18-
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
19-
20-
try:
21-
while True:
22-
msg = zmqSubSocket.recv_multipart()
23-
topic = str(msg[0])
39+
class ZMQHandler():
40+
def __init__(self):
41+
self.loop = zmq.asyncio.install()
42+
self.zmqContext = zmq.asyncio.Context()
43+
44+
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
45+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
46+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
47+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
48+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
49+
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
50+
51+
async def handle(self) :
52+
msg = await self.zmqSubSocket.recv_multipart()
53+
topic = msg[0]
2454
body = msg[1]
2555
sequence = "Unknown"
2656
if len(msg[-1]) == 4:
2757
msgSequence = struct.unpack('<I', msg[-1])[-1]
2858
sequence = str(msgSequence)
29-
if topic == "hashblock":
30-
print '- HASH BLOCK ('+sequence+') -'
31-
print binascii.hexlify(body)
32-
elif topic == "hashtx":
33-
print '- HASH TX ('+sequence+') -'
34-
print binascii.hexlify(body)
35-
elif topic == "rawblock":
36-
print '- RAW BLOCK HEADER ('+sequence+') -'
37-
print binascii.hexlify(body[:80])
38-
elif topic == "rawtx":
39-
print '- RAW TX ('+sequence+') -'
40-
print binascii.hexlify(body)
41-
42-
except KeyboardInterrupt:
43-
zmqContext.destroy()
59+
if topic == b"hashblock":
60+
print('- HASH BLOCK ('+sequence+') -')
61+
print(binascii.hexlify(body))
62+
elif topic == b"hashtx":
63+
print('- HASH TX ('+sequence+') -')
64+
print(binascii.hexlify(body))
65+
elif topic == b"rawblock":
66+
print('- RAW BLOCK HEADER ('+sequence+') -')
67+
print(binascii.hexlify(body[:80]))
68+
elif topic == b"rawtx":
69+
print('- RAW TX ('+sequence+') -')
70+
print(binascii.hexlify(body))
71+
# schedule ourselves to receive the next message
72+
asyncio.ensure_future(self.handle())
73+
74+
def start(self):
75+
self.loop.add_signal_handler(signal.SIGINT, self.stop)
76+
self.loop.create_task(self.handle())
77+
self.loop.run_forever()
78+
79+
def stop(self):
80+
self.loop.stop()
81+
self.zmqContext.destroy()
82+
83+
daemon = ZMQHandler()
84+
daemon.start()

contrib/zmq/zmq_sub3.4.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) 2014-2016 The Bitcoin Core developers
3+
# Distributed under the MIT software license, see the accompanying
4+
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
6+
"""
7+
ZMQ example using python3's asyncio
8+
9+
Bitcoin should be started with the command line arguments:
10+
bitcoind -testnet -daemon \
11+
-zmqpubhashblock=tcp://127.0.0.1:28332 \
12+
-zmqpubrawtx=tcp://127.0.0.1:28332 \
13+
-zmqpubhashtx=tcp://127.0.0.1:28332 \
14+
-zmqpubhashblock=tcp://127.0.0.1:28332
15+
16+
We use the asyncio library here. `self.handle()` installs itself as a
17+
future at the end of the function. Since it never returns with the event
18+
loop having an empty stack of futures, this creates an infinite loop. An
19+
alternative is to wrap the contents of `handle` inside `while True`.
20+
21+
The `@asyncio.coroutine` decorator and the `yield from` syntax found here
22+
was introduced in python 3.4 and has been deprecated in favor of the `async`
23+
and `await` keywords respectively.
24+
25+
A blocking example using python 2.7 can be obtained from the git history:
26+
https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py
27+
"""
28+
29+
import binascii
30+
import asyncio
31+
import zmq
32+
import zmq.asyncio
33+
import signal
34+
import struct
35+
import sys
36+
37+
if not (sys.version_info.major >= 3 and sys.version_info.minor >= 4):
38+
print("This example only works with Python 3.4 and greater")
39+
exit(1)
40+
41+
port = 28332
42+
43+
class ZMQHandler():
44+
def __init__(self):
45+
self.loop = zmq.asyncio.install()
46+
self.zmqContext = zmq.asyncio.Context()
47+
48+
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
49+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
50+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
51+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
52+
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
53+
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
54+
55+
@asyncio.coroutine
56+
def handle(self) :
57+
msg = yield from self.zmqSubSocket.recv_multipart()
58+
topic = msg[0]
59+
body = msg[1]
60+
sequence = "Unknown";
61+
if len(msg[-1]) == 4:
62+
msgSequence = struct.unpack('<I', msg[-1])[-1]
63+
sequence = str(msgSequence)
64+
if topic == b"hashblock":
65+
print('- HASH BLOCK ('+sequence+') -')
66+
print(binascii.hexlify(body))
67+
elif topic == b"hashtx":
68+
print('- HASH TX ('+sequence+') -')
69+
print(binascii.hexlify(body))
70+
elif topic == b"rawblock":
71+
print('- RAW BLOCK HEADER ('+sequence+') -')
72+
print(binascii.hexlify(body[:80]))
73+
elif topic == b"rawtx":
74+
print('- RAW TX ('+sequence+') -')
75+
print(binascii.hexlify(body))
76+
# schedule ourselves to receive the next message
77+
asyncio.ensure_future(self.handle())
78+
79+
def start(self):
80+
self.loop.add_signal_handler(signal.SIGINT, self.stop)
81+
self.loop.create_task(self.handle())
82+
self.loop.run_forever()
83+
84+
def stop(self):
85+
self.loop.stop()
86+
self.zmqContext.destroy()
87+
88+
daemon = ZMQHandler()
89+
daemon.start()

0 commit comments

Comments
 (0)