Skip to content

Commit 83b42b7

Browse files
authored
Merge branch 'main' into limit_concurrency
2 parents 88db4ce + 975ea1b commit 83b42b7

File tree

7 files changed

+176
-64
lines changed

7 files changed

+176
-64
lines changed

libp2p/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
# Multiplexer options
8585
MUXER_YAMUX = "YAMUX"
8686
MUXER_MPLEX = "MPLEX"
87+
DEFAULT_NEGOTIATE_TIMEOUT = 5
88+
8789

8890

8991
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
@@ -249,6 +251,7 @@ def new_host(
249251
muxer_preference: Literal["YAMUX", "MPLEX"] | None = None,
250252
listen_addrs: Sequence[multiaddr.Multiaddr] | None = None,
251253
enable_mDNS: bool = False,
254+
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
252255
) -> IHost:
253256
"""
254257
Create a new libp2p host based on the given parameters.
@@ -274,6 +277,6 @@ def new_host(
274277

275278
if disc_opt is not None:
276279
return RoutedHost(swarm, disc_opt, enable_mDNS)
277-
return BasicHost(swarm, enable_mDNS)
280+
return BasicHost(network=swarm,enable_mDNS=enable_mDNS , negotitate_timeout=negotiate_timeout)
278281

279282
__version__ = __version("libp2p")

libp2p/host/basic_host.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171

7272

7373
logger = logging.getLogger("libp2p.network.basic_host")
74+
DEFAULT_NEGOTIATE_TIMEOUT = 5
7475

7576

7677
class BasicHost(IHost):
@@ -92,10 +93,12 @@ def __init__(
9293
network: INetworkService,
9394
enable_mDNS: bool = False,
9495
default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None,
96+
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
9597
) -> None:
9698
self._network = network
9799
self._network.set_stream_handler(self._swarm_stream_handler)
98100
self.peerstore = self._network.peerstore
101+
self.negotiate_timeout = negotitate_timeout
99102
# Protocol muxing
100103
default_protocols = default_protocols or get_default_protocols(self)
101104
self.multiselect = Multiselect(dict(default_protocols.items()))
@@ -189,7 +192,10 @@ def set_stream_handler(
189192
self.multiselect.add_handler(protocol_id, stream_handler)
190193

191194
async def new_stream(
192-
self, peer_id: ID, protocol_ids: Sequence[TProtocol]
195+
self,
196+
peer_id: ID,
197+
protocol_ids: Sequence[TProtocol],
198+
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
193199
) -> INetStream:
194200
"""
195201
:param peer_id: peer_id that host is connecting
@@ -201,7 +207,9 @@ async def new_stream(
201207
# Perform protocol muxing to determine protocol to use
202208
try:
203209
selected_protocol = await self.multiselect_client.select_one_of(
204-
list(protocol_ids), MultiselectCommunicator(net_stream)
210+
list(protocol_ids),
211+
MultiselectCommunicator(net_stream),
212+
negotitate_timeout,
205213
)
206214
except MultiselectClientError as error:
207215
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
@@ -211,7 +219,12 @@ async def new_stream(
211219
net_stream.set_protocol(selected_protocol)
212220
return net_stream
213221

214-
async def send_command(self, peer_id: ID, command: str) -> list[str]:
222+
async def send_command(
223+
self,
224+
peer_id: ID,
225+
command: str,
226+
response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
227+
) -> list[str]:
215228
"""
216229
Send a multistream-select command to the specified peer and return
217230
the response.
@@ -225,7 +238,7 @@ async def send_command(self, peer_id: ID, command: str) -> list[str]:
225238

226239
try:
227240
response = await self.multiselect_client.query_multistream_command(
228-
MultiselectCommunicator(new_stream), command
241+
MultiselectCommunicator(new_stream), command, response_timeout
229242
)
230243
except MultiselectClientError as error:
231244
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
@@ -264,7 +277,7 @@ async def _swarm_stream_handler(self, net_stream: INetStream) -> None:
264277
# Perform protocol muxing to determine protocol to use
265278
try:
266279
protocol, handler = await self.multiselect.negotiate(
267-
MultiselectCommunicator(net_stream)
280+
MultiselectCommunicator(net_stream), self.negotiate_timeout
268281
)
269282
except MultiselectError as error:
270283
peer_id = net_stream.muxed_conn.peer_id

libp2p/protocol_muxer/multiselect.py

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import trio
2+
13
from libp2p.abc import (
24
IMultiselectCommunicator,
35
IMultiselectMuxer,
@@ -14,6 +16,7 @@
1416

1517
MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
1618
PROTOCOL_NOT_FOUND_MSG = "na"
19+
DEFAULT_NEGOTIATE_TIMEOUT = 5
1720

1821

1922
class Multiselect(IMultiselectMuxer):
@@ -47,47 +50,56 @@ def add_handler(
4750

4851
# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent
4952
async def negotiate(
50-
self, communicator: IMultiselectCommunicator
53+
self,
54+
communicator: IMultiselectCommunicator,
55+
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
5156
) -> tuple[TProtocol, StreamHandlerFn | None]:
5257
"""
5358
Negotiate performs protocol selection.
5459
5560
:param stream: stream to negotiate on
61+
:param negotiate_timeout: timeout for negotiation
5662
:return: selected protocol name, handler function
5763
:raise MultiselectError: raised when negotiation failed
5864
"""
59-
await self.handshake(communicator)
60-
61-
while True:
62-
try:
63-
command = await communicator.read()
64-
except MultiselectCommunicatorError as error:
65-
raise MultiselectError() from error
66-
67-
if command == "ls":
68-
supported_protocols = [p for p in self.handlers.keys() if p is not None]
69-
response = "\n".join(supported_protocols) + "\n"
70-
71-
try:
72-
await communicator.write(response)
73-
except MultiselectCommunicatorError as error:
74-
raise MultiselectError() from error
75-
76-
else:
77-
protocol = TProtocol(command)
78-
if protocol in self.handlers:
65+
try:
66+
with trio.fail_after(negotiate_timeout):
67+
await self.handshake(communicator)
68+
69+
while True:
7970
try:
80-
await communicator.write(protocol)
71+
command = await communicator.read()
8172
except MultiselectCommunicatorError as error:
8273
raise MultiselectError() from error
8374

84-
return protocol, self.handlers[protocol]
85-
try:
86-
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
87-
except MultiselectCommunicatorError as error:
88-
raise MultiselectError() from error
89-
90-
raise MultiselectError("Negotiation failed: no matching protocol")
75+
if command == "ls":
76+
supported_protocols = [
77+
p for p in self.handlers.keys() if p is not None
78+
]
79+
response = "\n".join(supported_protocols) + "\n"
80+
81+
try:
82+
await communicator.write(response)
83+
except MultiselectCommunicatorError as error:
84+
raise MultiselectError() from error
85+
86+
else:
87+
protocol = TProtocol(command)
88+
if protocol in self.handlers:
89+
try:
90+
await communicator.write(protocol)
91+
except MultiselectCommunicatorError as error:
92+
raise MultiselectError() from error
93+
94+
return protocol, self.handlers[protocol]
95+
try:
96+
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
97+
except MultiselectCommunicatorError as error:
98+
raise MultiselectError() from error
99+
100+
raise MultiselectError("Negotiation failed: no matching protocol")
101+
except trio.TooSlowError:
102+
raise MultiselectError("handshake read timeout")
91103

92104
async def handshake(self, communicator: IMultiselectCommunicator) -> None:
93105
"""

libp2p/protocol_muxer/multiselect_client.py

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
Sequence,
33
)
44

5+
import trio
6+
57
from libp2p.abc import (
68
IMultiselectClient,
79
IMultiselectCommunicator,
@@ -17,6 +19,7 @@
1719

1820
MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
1921
PROTOCOL_NOT_FOUND_MSG = "na"
22+
DEFAULT_NEGOTIATE_TIMEOUT = 5
2023

2124

2225
class MultiselectClient(IMultiselectClient):
@@ -40,14 +43,18 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None:
4043

4144
try:
4245
handshake_contents = await communicator.read()
46+
4347
except MultiselectCommunicatorError as error:
4448
raise MultiselectClientError() from error
4549

4650
if not is_valid_handshake(handshake_contents):
4751
raise MultiselectClientError("multiselect protocol ID mismatch")
4852

4953
async def select_one_of(
50-
self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator
54+
self,
55+
protocols: Sequence[TProtocol],
56+
communicator: IMultiselectCommunicator,
57+
negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
5158
) -> TProtocol:
5259
"""
5360
For each protocol, send message to multiselect selecting protocol and
@@ -56,49 +63,65 @@ async def select_one_of(
5663
5764
:param protocol: protocol to select
5865
:param communicator: communicator to use to communicate with counterparty
66+
:param negotiate_timeout: timeout for negotiation
5967
:return: selected protocol
6068
:raise MultiselectClientError: raised when protocol negotiation failed
6169
"""
62-
await self.handshake(communicator)
63-
64-
for protocol in protocols:
65-
try:
66-
selected_protocol = await self.try_select(communicator, protocol)
67-
return selected_protocol
68-
except MultiselectClientError:
69-
pass
70-
71-
raise MultiselectClientError("protocols not supported")
70+
try:
71+
with trio.fail_after(negotitate_timeout):
72+
await self.handshake(communicator)
73+
74+
for protocol in protocols:
75+
try:
76+
selected_protocol = await self.try_select(
77+
communicator, protocol
78+
)
79+
return selected_protocol
80+
except MultiselectClientError:
81+
pass
82+
83+
raise MultiselectClientError("protocols not supported")
84+
except trio.TooSlowError:
85+
raise MultiselectClientError("response timed out")
7286

7387
async def query_multistream_command(
74-
self, communicator: IMultiselectCommunicator, command: str
88+
self,
89+
communicator: IMultiselectCommunicator,
90+
command: str,
91+
response_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
7592
) -> list[str]:
7693
"""
7794
Send a multistream-select command over the given communicator and return
7895
parsed response.
7996
8097
:param communicator: communicator to use to communicate with counterparty
8198
:param command: supported multistream-select command(e.g., ls)
99+
:param negotiate_timeout: timeout for negotiation
82100
:raise MultiselectClientError: If the communicator fails to process data.
83101
:return: list of strings representing the response from peer.
84102
"""
85-
await self.handshake(communicator)
103+
try:
104+
with trio.fail_after(response_timeout):
105+
await self.handshake(communicator)
86106

87-
if command == "ls":
88-
try:
89-
await communicator.write("ls")
90-
except MultiselectCommunicatorError as error:
91-
raise MultiselectClientError() from error
92-
else:
93-
raise ValueError("Command not supported")
107+
if command == "ls":
108+
try:
109+
await communicator.write("ls")
110+
except MultiselectCommunicatorError as error:
111+
raise MultiselectClientError() from error
112+
else:
113+
raise ValueError("Command not supported")
94114

95-
try:
96-
response = await communicator.read()
97-
response_list = response.strip().splitlines()
98-
except MultiselectCommunicatorError as error:
99-
raise MultiselectClientError() from error
115+
try:
116+
response = await communicator.read()
117+
response_list = response.strip().splitlines()
118+
119+
except MultiselectCommunicatorError as error:
120+
raise MultiselectClientError() from error
100121

101-
return response_list
122+
return response_list
123+
except trio.TooSlowError:
124+
raise MultiselectClientError("command response timed out")
102125

103126
async def try_select(
104127
self, communicator: IMultiselectCommunicator, protocol: TProtocol
@@ -118,6 +141,7 @@ async def try_select(
118141

119142
try:
120143
response = await communicator.read()
144+
121145
except MultiselectCommunicatorError as error:
122146
raise MultiselectClientError() from error
123147

libp2p/stream_muxer/muxer_multistream.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
Yamux,
3232
)
3333

34-
# FIXME: add negotiate timeout to `MuxerMultistream`
35-
DEFAULT_NEGOTIATE_TIMEOUT = 60
36-
3734

3835
class MuxerMultistream:
3936
"""

newsfragments/696.bugfix.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Add timeout wrappers in:
2+
1. multiselect.py: `negotiate` function
3+
2. multiselect_client.py: `select_one_of` , `query_multistream_command` functions
4+
to prevent indefinite hangs when a remote peer does not respond.

0 commit comments

Comments
 (0)