Skip to content

Commit 16be6fa

Browse files
Merge branch 'main' into feature/bootstrap
2 parents cbb1e26 + 975ea1b commit 16be6fa

File tree

7 files changed

+176
-64
lines changed

7 files changed

+176
-64
lines changed

libp2p/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
# Multiplexer options
8585
MUXER_YAMUX = "YAMUX"
8686
MUXER_MPLEX = "MPLEX"
87+
DEFAULT_NEGOTIATE_TIMEOUT = 5
88+
8789

8890

8991
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
@@ -250,6 +252,7 @@ def new_host(
250252
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
251253
enable_mDNS: bool = False,
252254
bootstrap: list[str] | None = None,
255+
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
253256
) -> IHost:
254257
"""
255258
Create a new libp2p host based on the given parameters.
@@ -276,6 +279,6 @@ def new_host(
276279

277280
if disc_opt is not None:
278281
return RoutedHost(swarm, disc_opt, enable_mDNS, bootstrap)
279-
return BasicHost(swarm, enable_mDNS, bootstrap)
282+
return BasicHost(network=swarm,enable_mDNS=enable_mDNS , bootstrap=bootstrap, negotitate_timeout=negotiate_timeout)
280283

281284
__version__ = __version("libp2p")

libp2p/host/basic_host.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272

7373

7474
logger = logging.getLogger("libp2p.network.basic_host")
75+
DEFAULT_NEGOTIATE_TIMEOUT = 5
7576

7677

7778
class BasicHost(IHost):
@@ -94,10 +95,12 @@ def __init__(
9495
enable_mDNS: bool = False,
9596
bootstrap: list[str] | None = None,
9697
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
98+
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
9799
) -> None:
98100
self._network = network
99101
self._network.set_stream_handler(self._swarm_stream_handler)
100102
self.peerstore = self._network.peerstore
103+
self.negotiate_timeout = negotitate_timeout
101104
# Protocol muxing
102105
default_protocols = default_protocols or get_default_protocols(self)
103106
self.multiselect = Multiselect(dict(default_protocols.items()))
@@ -198,7 +201,10 @@ def set_stream_handler(
198201
self.multiselect.add_handler(protocol_id, stream_handler)
199202

200203
async def new_stream(
201-
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
204+
self,
205+
peer_id: ID,
206+
protocol_ids: Sequence[TProtocol],
207+
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
202208
) -> INetStream:
203209
"""
204210
:param peer_id: peer_id that host is connecting
@@ -210,7 +216,9 @@ async def new_stream(
210216
# Perform protocol muxing to determine protocol to use
211217
try:
212218
selected_protocol = await self.multiselect_client.select_one_of(
213-
list(protocol_ids), MultiselectCommunicator(net_stream)
219+
list(protocol_ids),
220+
MultiselectCommunicator(net_stream),
221+
negotitate_timeout,
214222
)
215223
except MultiselectClientError as error:
216224
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
@@ -220,7 +228,12 @@ async def new_stream(
220228
net_stream.set_protocol(selected_protocol)
221229
return net_stream
222230

223-
async def send_command(self, peer_id: ID, command: str) -> list[str]:
231+
async def send_command(
232+
self,
233+
peer_id: ID,
234+
command: str,
235+
response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
236+
) -> list[str]:
224237
"""
225238
Send a multistream-select command to the specified peer and return
226239
the response.
@@ -234,7 +247,7 @@ async def send_command(self, peer_id: ID, command: str) -> list[str]:
234247

235248
try:
236249
response = await self.multiselect_client.query_multistream_command(
237-
MultiselectCommunicator(new_stream), command
250+
MultiselectCommunicator(new_stream), command, response_timeout
238251
)
239252
except MultiselectClientError as error:
240253
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
@@ -273,7 +286,7 @@ async def _swarm_stream_handler(self, net_stream: INetStream) -> None:
273286
# Perform protocol muxing to determine protocol to use
274287
try:
275288
protocol, handler = await self.multiselect.negotiate(
276-
MultiselectCommunicator(net_stream)
289+
MultiselectCommunicator(net_stream), self.negotiate_timeout
277290
)
278291
except MultiselectError as error:
279292
peer_id = net_stream.muxed_conn.peer_id

libp2p/protocol_muxer/multiselect.py

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import trio
2+
13
from libp2p.abc import (
24
IMultiselectCommunicator,
35
IMultiselectMuxer,
@@ -14,6 +16,7 @@
1416

1517
MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
1618
PROTOCOL_NOT_FOUND_MSG = "na"
19+
DEFAULT_NEGOTIATE_TIMEOUT = 5
1720

1821

1922
class Multiselect(IMultiselectMuxer):
@@ -47,47 +50,56 @@ def add_handler(
4750

4851
# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent
4952
async def negotiate(
50-
self, communicator: IMultiselectCommunicator
53+
self,
54+
communicator: IMultiselectCommunicator,
55+
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
5156
) -> tuple[TProtocol, StreamHandlerFn | None]:
5257
"""
5358
Negotiate performs protocol selection.
5459
5560
:param stream: stream to negotiate on
61+
:param negotiate_timeout: timeout for negotiation
5662
:return: selected protocol name, handler function
5763
:raise MultiselectError: raised when negotiation failed
5864
"""
59-
await self.handshake(communicator)
60-
61-
while True:
62-
try:
63-
command = await communicator.read()
64-
except MultiselectCommunicatorError as error:
65-
raise MultiselectError() from error
66-
67-
if command == "ls":
68-
supported_protocols = [p for p in self.handlers.keys() if p is not None]
69-
response = "\n".join(supported_protocols) + "\n"
70-
71-
try:
72-
await communicator.write(response)
73-
except MultiselectCommunicatorError as error:
74-
raise MultiselectError() from error
75-
76-
else:
77-
protocol = TProtocol(command)
78-
if protocol in self.handlers:
65+
try:
66+
with trio.fail_after(negotiate_timeout):
67+
await self.handshake(communicator)
68+
69+
while True:
7970
try:
80-
await communicator.write(protocol)
71+
command = await communicator.read()
8172
except MultiselectCommunicatorError as error:
8273
raise MultiselectError() from error
8374

84-
return protocol, self.handlers[protocol]
85-
try:
86-
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
87-
except MultiselectCommunicatorError as error:
88-
raise MultiselectError() from error
89-
90-
raise MultiselectError("Negotiation failed: no matching protocol")
75+
if command == "ls":
76+
supported_protocols = [
77+
p for p in self.handlers.keys() if p is not None
78+
]
79+
response = "\n".join(supported_protocols) + "\n"
80+
81+
try:
82+
await communicator.write(response)
83+
except MultiselectCommunicatorError as error:
84+
raise MultiselectError() from error
85+
86+
else:
87+
protocol = TProtocol(command)
88+
if protocol in self.handlers:
89+
try:
90+
await communicator.write(protocol)
91+
except MultiselectCommunicatorError as error:
92+
raise MultiselectError() from error
93+
94+
return protocol, self.handlers[protocol]
95+
try:
96+
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
97+
except MultiselectCommunicatorError as error:
98+
raise MultiselectError() from error
99+
100+
raise MultiselectError("Negotiation failed: no matching protocol")
101+
except trio.TooSlowError:
102+
raise MultiselectError("handshake read timeout")
91103

92104
async def handshake(self, communicator: IMultiselectCommunicator) -> None:
93105
"""

libp2p/protocol_muxer/multiselect_client.py

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
Sequence,
33
)
44

5+
import trio
6+
57
from libp2p.abc import (
68
IMultiselectClient,
79
IMultiselectCommunicator,
@@ -17,6 +19,7 @@
1719

1820
MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
1921
PROTOCOL_NOT_FOUND_MSG = "na"
22+
DEFAULT_NEGOTIATE_TIMEOUT = 5
2023

2124

2225
class MultiselectClient(IMultiselectClient):
@@ -40,14 +43,18 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None:
4043

4144
try:
4245
handshake_contents = await communicator.read()
46+
4347
except MultiselectCommunicatorError as error:
4448
raise MultiselectClientError() from error
4549

4650
if not is_valid_handshake(handshake_contents):
4751
raise MultiselectClientError("multiselect protocol ID mismatch")
4852

4953
async def select_one_of(
50-
self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator
54+
self,
55+
protocols: Sequence[TProtocol],
56+
communicator: IMultiselectCommunicator,
57+
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
5158
) -> TProtocol:
5259
"""
5360
For each protocol, send message to multiselect selecting protocol and
@@ -56,49 +63,65 @@ async def select_one_of(
5663
5764
:param protocol: protocol to select
5865
:param communicator: communicator to use to communicate with counterparty
66+
:param negotiate_timeout: timeout for negotiation
5967
:return: selected protocol
6068
:raise MultiselectClientError: raised when protocol negotiation failed
6169
"""
62-
await self.handshake(communicator)
63-
64-
for protocol in protocols:
65-
try:
66-
selected_protocol = await self.try_select(communicator, protocol)
67-
return selected_protocol
68-
except MultiselectClientError:
69-
pass
70-
71-
raise MultiselectClientError("protocols not supported")
70+
try:
71+
with trio.fail_after(negotitate_timeout):
72+
await self.handshake(communicator)
73+
74+
for protocol in protocols:
75+
try:
76+
selected_protocol = await self.try_select(
77+
communicator, protocol
78+
)
79+
return selected_protocol
80+
except MultiselectClientError:
81+
pass
82+
83+
raise MultiselectClientError("protocols not supported")
84+
except trio.TooSlowError:
85+
raise MultiselectClientError("response timed out")
7286

7387
async def query_multistream_command(
74-
self, communicator: IMultiselectCommunicator, command: str
88+
self,
89+
communicator: IMultiselectCommunicator,
90+
command: str,
91+
response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
7592
) -> list[str]:
7693
"""
7794
Send a multistream-select command over the given communicator and return
7895
parsed response.
7996
8097
:param communicator: communicator to use to communicate with counterparty
8198
:param command: supported multistream-select command(e.g., ls)
99+
:param negotiate_timeout: timeout for negotiation
82100
:raise MultiselectClientError: If the communicator fails to process data.
83101
:return: list of strings representing the response from peer.
84102
"""
85-
await self.handshake(communicator)
103+
try:
104+
with trio.fail_after(response_timeout):
105+
await self.handshake(communicator)
86106

87-
if command == "ls":
88-
try:
89-
await communicator.write("ls")
90-
except MultiselectCommunicatorError as error:
91-
raise MultiselectClientError() from error
92-
else:
93-
raise ValueError("Command not supported")
107+
if command == "ls":
108+
try:
109+
await communicator.write("ls")
110+
except MultiselectCommunicatorError as error:
111+
raise MultiselectClientError() from error
112+
else:
113+
raise ValueError("Command not supported")
94114

95-
try:
96-
response = await communicator.read()
97-
response_list = response.strip().splitlines()
98-
except MultiselectCommunicatorError as error:
99-
raise MultiselectClientError() from error
115+
try:
116+
response = await communicator.read()
117+
response_list = response.strip().splitlines()
118+
119+
except MultiselectCommunicatorError as error:
120+
raise MultiselectClientError() from error
100121

101-
return response_list
122+
return response_list
123+
except trio.TooSlowError:
124+
raise MultiselectClientError("command response timed out")
102125

103126
async def try_select(
104127
self, communicator: IMultiselectCommunicator, protocol: TProtocol
@@ -118,6 +141,7 @@ async def try_select(
118141

119142
try:
120143
response = await communicator.read()
144+
121145
except MultiselectCommunicatorError as error:
122146
raise MultiselectClientError() from error
123147

libp2p/stream_muxer/muxer_multistream.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
Yamux,
3232
)
3333

34-
# FIXME: add negotiate timeout to `MuxerMultistream`
35-
DEFAULT_NEGOTIATE_TIMEOUT = 60
36-
3734

3835
class MuxerMultistream:
3936
"""

newsfragments/696.bugfix.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Add timeout wrappers in:
2+
1. multiselect.py: `negotiate` function
3+
2. multiselect_client.py: `select_one_of` , `query_multistream_command` functions
4+
to prevent indefinite hangs when a remote peer does not respond.

0 commit comments

Comments
 (0)