Skip to content

Commit b427291

Browse files
authored
Merge pull request #52 from zixuanzh/libp2p-new-node
End to end messaging bugfixes
2 parents cdfd302 + 6e3857c commit b427291

File tree

11 files changed

+127
-124
lines changed

11 files changed

+127
-124
lines changed

host/basic_host.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ class BasicHost(IHost):
1111
def __init__(self, _network):
1212
self.network = _network
1313
self.peerstore = self.network.peerstore
14-
# self.stream_handlers = {}
1514

1615
def get_id(self):
1716
"""
@@ -49,13 +48,13 @@ def set_stream_handler(self, protocol_id, stream_handler):
4948

5049
# protocol_id can be a list of protocol_ids
5150
# stream will decide which protocol_id to run on
52-
def new_stream(self, peer_id, protocol_id):
51+
async def new_stream(self, peer_id, protocol_id):
5352
"""
5453
:param peer_id: peer_id that host is connecting
5554
:param proto_id: protocol id that stream runs on
5655
:return: true if successful
5756
"""
5857
# TODO: host should return a mux stream not a raw stream
59-
stream = self.network.new_stream(peer_id)
58+
stream = await self.network.new_stream(peer_id, protocol_id)
6059
stream.set_protocol(protocol_id)
6160
return stream

libp2p/libp2p.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
class Libp2p(object):
99

1010
def __init__(self, idOpt = None, \
11-
transportOpt = ["/ip4/127.0.0.1/tcp/10000"], \
11+
transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \
1212
muxerOpt = ["mplex/6.7.0"], \
1313
secOpt = ["secio"], \
1414
peerstore = PeerStore()):
@@ -25,16 +25,15 @@ def __init__(self, idOpt = None, \
2525
self.secOpt = secOpt
2626
self.peerstore = peerstore
2727

28-
def new_node(self):
28+
async def new_node(self):
2929

3030
upgrader = TransportUpgrader(self.secOpt, self.transportOpt)
3131
swarm = Swarm(self.idOpt, self.peerstore, upgrader)
3232
tcp = TCP()
3333
swarm.add_transport(tcp)
34-
swarm.listen(self.transportOpts)
34+
await swarm.listen(self.transportOpt[0])
3535
host = BasicHost(swarm)
3636

37-
# TODO MuxedConnection currently contains all muxing logic
37+
# TODO MuxedConnection currently contains all muxing logic (move to a Muxer)
3838
# TODO routing unimplemented
39-
4039
return host

muxer/mplex/muxed_connection.py

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ def __init__(self, conn, initiator):
1717
self.initiator = initiator
1818
self.buffers = {}
1919
self.streams = {}
20+
self.stream_queue = asyncio.Queue()
2021

21-
self.add_incoming_task()
22+
asyncio.ensure_future(self.handle_incoming())
2223

2324
def close(self):
2425
"""
@@ -33,7 +34,12 @@ def is_closed(self):
3334
"""
3435
pass
3536

36-
def read_buffer(self, stream_id):
37+
async def read_buffer(self, stream_id):
38+
# Empty buffer or nonexistent stream
39+
# TODO: propagate up timeout exception and catch
40+
if stream_id not in self.buffers or not self.buffers[stream_id]:
41+
await self.handle_incoming()
42+
3743
data = self.buffers[stream_id]
3844
self.buffers[stream_id] = bytearray()
3945
return data
@@ -43,37 +49,22 @@ def open_stream(self, protocol_id, stream_id, peer_id, multi_addr):
4349
creates a new muxed_stream
4450
:return: a new stream
4551
"""
46-
stream = MuxedStream(peer_id, multi_addr, self)
52+
stream = MuxedStream(stream_id, multi_addr, self)
4753
self.streams[stream_id] = stream
48-
self.buffers[stream_id] = bytearray()
4954
return stream
5055

51-
def accept_stream(self):
56+
async def accept_stream(self):
5257
"""
5358
accepts a muxed stream opened by the other end
5459
:return: the accepted stream
5560
"""
56-
data = bytearray()
57-
while True:
58-
chunk = self.raw_conn.reader.read(100)
59-
if not chunk:
60-
break
61-
data += chunk
62-
header, end_index = decode_uvarint(data, 0)
63-
length, end_index = decode_uvarint(data, end_index)
64-
message = data[end_index, end_index + length]
65-
66-
flag = header & 0x07
67-
stream_id = header >> 3
68-
6961
# TODO update to pull out protocol_id from message
7062
protocol_id = "/echo/1.0.0"
71-
63+
stream_id = await self.stream_queue.get()
7264
stream = MuxedStream(stream_id, False, self)
73-
7465
return stream, stream_id, protocol_id
7566

76-
def send_message(self, flag, data, stream_id):
67+
async def send_message(self, flag, data, stream_id):
7768
"""
7869
sends a message over the connection
7970
:param header: header to use
@@ -86,7 +77,8 @@ def send_message(self, flag, data, stream_id):
8677
header = encode_uvarint(header)
8778
data_length = encode_uvarint(len(data))
8879
_bytes = header + data_length + data
89-
return self.write_to_stream(_bytes)
80+
81+
return await self.write_to_stream(_bytes)
9082

9183
async def write_to_stream(self, _bytes):
9284
self.raw_conn.writer.write(_bytes)
@@ -95,25 +87,23 @@ async def write_to_stream(self, _bytes):
9587

9688
async def handle_incoming(self):
9789
data = bytearray()
98-
while True:
99-
chunk = self.raw_conn.reader.read(100)
100-
if not chunk:
101-
break
90+
try:
91+
chunk = await asyncio.wait_for(self.raw_conn.reader.read(1024), timeout=5)
10292
data += chunk
103-
header, end_index = decode_uvarint(data, 0)
104-
length, end_index = decode_uvarint(data, end_index)
105-
message = data[end_index, end_index + length]
106-
107-
# Deal with other types of messages
108-
flag = header & 0x07
109-
stream_id = header >> 3
110-
111-
self.buffers[stream_id] = self.buffers[stream_id] + message
112-
# Read header
113-
# Read message length
114-
# Read message into corresponding buffer
115-
116-
def add_incoming_task(self):
117-
loop = asyncio.get_event_loop()
118-
handle_incoming_task = loop.create_task(self.handle_incoming())
119-
handle_incoming_task.add_done_callback(self.add_incoming_task)
93+
94+
header, end_index = decode_uvarint(data, 0)
95+
length, end_index = decode_uvarint(data, end_index)
96+
97+
message = data[end_index:end_index + length + 1]
98+
99+
# Deal with other types of messages
100+
flag = header & 0x07
101+
stream_id = header >> 3
102+
103+
if stream_id not in self.buffers:
104+
self.buffers[stream_id] = message
105+
await self.stream_queue.put(stream_id)
106+
else:
107+
self.buffers[stream_id] = self.buffers[stream_id] + message
108+
except asyncio.TimeoutError:
109+
print('timeout!')

muxer/mplex/muxed_stream.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ def get_flag(self, action):
3636
else:
3737
return HEADER_TAGS[action] - 1
3838

39-
def read(self):
39+
async def read(self):
4040
"""
4141
read messages associated with stream from buffer til end of file
4242
:return: bytes of input
4343
"""
44-
return self.muxed_conn.read_buffer(self.id)
44+
return await self.muxed_conn.read_buffer(self.id)
4545

46-
def write(self, data):
46+
async def write(self, data):
4747
"""
4848
write to stream
4949
:return: number of bytes written
5050
"""
51-
return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id)
51+
return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id)
5252

5353
def close(self):
5454
"""

muxer/mplex/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ def decode_uvarint(buff, index):
2222
break
2323
index += 1
2424

25-
return result, index
25+
return result, index + 1

network/multiaddr.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ class MultiAddr:
22

33
# Validates input string and constructs internal representation.
44
def __init__(self, addr):
5+
self.protocol_map = dict()
6+
57
# Empty multiaddrs are valid.
68
if not addr:
79
self.protocol_map = dict()

network/stream/net_stream.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@ def set_protocol(self, protocol_id):
1919
"""
2020
self.protocol_id = protocol_id
2121

22-
def read(self):
22+
async def read(self):
2323
"""
2424
read from stream
2525
:return: bytes of input until EOF
2626
"""
27-
return self.muxed_stream.read()
27+
return await self.muxed_stream.read()
2828

29-
def write(self, bytes):
29+
async def write(self, bytes):
3030
"""
3131
write to stream
3232
:return: number of bytes written
3333
"""
34-
return self.muxed_stream.write(bytes)
34+
return await self.muxed_stream.write(bytes)
3535

3636
def close(self):
3737
"""

network/swarm.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,20 @@ def set_stream_handler(self, protocol_id, stream_handler):
2222
"""
2323
self.stream_handlers[protocol_id] = stream_handler
2424

25-
def new_stream(self, peer_id, protocol_id):
25+
async def new_stream(self, peer_id, protocol_id):
2626
"""
2727
:param peer_id: peer_id of destination
2828
:param protocol_id: protocol id
2929
:return: net stream instance
3030
"""
31-
muxed_conn = None
31+
# Get peer info from peer store
32+
addrs = self.peerstore.addrs(peer_id)
33+
34+
if not addrs:
35+
raise SwarmException("No known addresses to peer")
36+
37+
multiaddr = addrs[0]
38+
3239
if peer_id in self.connections:
3340
"""
3441
If muxed connection already exists for peer_id,
@@ -37,14 +44,8 @@ def new_stream(self, peer_id, protocol_id):
3744
"""
3845
muxed_conn = self.connections[peer_id]
3946
else:
40-
# Get peer info from peer store
41-
addrs = self.peerstore.addrs(peer_id)
42-
4347
# Transport dials peer (gets back a raw conn)
44-
if not addrs:
45-
raise SwarmException("No known addresses to peer")
46-
first_addr = addrs[0]
47-
raw_conn = self.transport.dial(first_addr)
48+
raw_conn = await self.transport.dial(MultiAddr(multiaddr))
4849

4950
# Use upgrader to upgrade raw conn to muxed conn
5051
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)
@@ -54,56 +55,59 @@ def new_stream(self, peer_id, protocol_id):
5455

5556
# Use muxed conn to open stream, which returns
5657
# a muxed stream
57-
stream_id = str(uuid.uuid4())
58-
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, first_addr)
58+
# TODO: use better stream IDs
59+
stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3
60+
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr)
5961

6062
# Create a net stream
6163
net_stream = NetStream(muxed_stream)
6264

6365
return net_stream
6466

65-
def listen(self, *args):
67+
async def listen(self, *args):
6668
"""
6769
:param *args: one or many multiaddrs to start listening on
6870
:return: true if at least one success
6971
"""
70-
71-
# For each multiaddr in args
72-
# Check if a listener for multiaddr exists already
73-
# If listener already exists, continue
74-
# Otherwise, do the following:
75-
# Pass multiaddr into conn handler
76-
# Have conn handler delegate to stream handler
77-
# Call listener listen with the multiaddr
78-
# Map multiaddr to listener
72+
"""
73+
For each multiaddr in args
74+
Check if a listener for multiaddr exists already
75+
If listener already exists, continue
76+
Otherwise:
77+
Capture multiaddr in conn handler
78+
Have conn handler delegate to stream handler
79+
Call listener listen with the multiaddr
80+
Map multiaddr to listener
81+
"""
7982
for multiaddr_str in args:
8083
if multiaddr_str in self.listeners:
8184
return True
8285

8386
multiaddr = MultiAddr(multiaddr_str)
8487
multiaddr_dict = multiaddr.to_options()
8588

86-
def conn_handler(reader, writer):
89+
async def conn_handler(reader, writer):
8790
# Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr)
88-
raw_conn = RawConnection(multiaddr_dict.host, multiaddr_dict.port, reader, writer)
91+
raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer)
8992
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
9093

91-
muxed_stream, stream_id, protocol_id = muxed_conn.accept_stream()
94+
muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream()
9295
net_stream = NetStream(muxed_stream)
9396
net_stream.set_protocol(protocol_id)
9497

9598
# Give to stream handler
9699
# TODO: handle case when stream handler is set
97-
self.stream_handlers[protocol_id](net_stream)
100+
# TODO: handle case of multiple protocols over same raw connection
101+
await self.stream_handlers[protocol_id](net_stream)
98102

99103
try:
100104
# Success
101105
listener = self.transport.create_listener(conn_handler)
102-
listener.listen(multiaddr)
106+
await listener.listen(multiaddr)
103107
return True
104108
except IOError:
105109
# Failed. Continue looping.
106-
print("Failed to connect to: " + multiaddr)
110+
print("Failed to connect to: " + str(multiaddr))
107111

108112
# No multiaddr succeeded
109113
return False

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ asyncio
22
pylint
33
pytest
44
pycryptodome
5+
pytest-asyncio

0 commit comments

Comments
 (0)