Skip to content

Commit 4b18607

Browse files
paschal533acul71
andauthored
Feat: Adding Yamux as default multiplexer, keeping Mplex as fallback (#538)
* feat: Replace mplex with yamux as default multiplexer in py-libp2p * Retain Mplex alongside Yamux in new_swarm with messaging that Yamux is preferred * moved !BBHII to a constant YAMUX_HEADER_FORMAT at the top of yamux.py with a comment explaining its structure * renamed the news fragment to 534.feature.rst and updated the description * renamed the news fragment to 534.feature.rst and updated the description * added a docstring to clarify that Yamux does not support deadlines natively * Remove the __main__ block entirely from test_yamux.py * Replaced the print statements in test_yamux.py with logging.debug * Added a comment linking to the spec for clarity * Raise NotImplementedError in YamuxStream.set_deadline per review * Add muxed_conn to YamuxStream and test deadline NotImplementedError * Fix Yamux implementation to meet libp2p spec * Fix None handling in YamuxStream.read and Yamux.read_stream * Fix test_connected_peers.py to correctly handle peer connections * fix: Ensure StreamReset is raised on read after local reset in yamux * fix: Map MuxedStreamError to StreamClosed in NetStream.write for Yamux * fix: Raise MuxedStreamReset in Yamux.read_stream for closed streams * fix: Correct Yamux stream read behavior for NetStream tests Fixed est_net_stream_read_after_remote_closed by updating NetStream.read to raise StreamEOF when the stream is remotely closed and no data is available, aligning with test expectations and Fixed est_net_stream_read_until_eof by modifying YamuxStream.read to block until the stream is closed ( ecv_closed=True) for =-1 reads, ensuring data is only returned after remote closure. * fix: Correct Yamux stream read behavior for NetStream tests Fixed est_net_stream_read_after_remote_closed by updating NetStream.read to raise StreamEOF when the stream is remotely closed and no data is available, aligning with test expectations and Fixed est_net_stream_read_until_eof by modifying YamuxStream.read to block until the stream is closed ( ecv_closed=True) for =-1 reads, ensuring data is only returned after remote closure. * fix: raise StreamEOF when reading from closed stream with empty buffer * fix: prioritize returning buffered data even after stream reset * fix: prioritize returning buffered data even after stream reset * fix: Ensure test_net_stream_read_after_remote_closed_and_reset passes in full suite * fix: Add __init__.py to yamux module to fix documentation build * fix: Add __init__.py to yamux module to fix documentation build * fix: Add libp2p.stream_muxer.yamux to libp2p.stream_muxer.rst toctree * fix: Correct title underline length in libp2p.stream_muxer.yamux.rst * fix: Add a = so that is matches the libp2p.stream\_muxer.yamux length * fix(tests): Resolve race condition in network notification test * fix: fixing failing tests and examples with yamux and noise * refactor: remove debug logging and improve x25519 tests * fix: Add functionality for users to choose between Yamux and Mplex * fix: increased trio sleep to 0.1 sec for slow environment * feat: Add test for switching between Yamux and mplex * refactor: move host fixtures to interop tests * chore: Update __init__.py removing unused import removed unused ```python import os import logging ``` * lint: fix import order * fix: Resolve conftest.py conflict by removing trio test support * fix: Resolve test skipping by keeping trio test support * Fix: add a newline at end of the file --------- Co-authored-by: acul71 <[email protected]> Co-authored-by: acul71 <[email protected]>
1 parent 18c6f52 commit 4b18607

29 files changed

+2216
-102
lines changed

docs/libp2p.stream_muxer.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Subpackages
88
:maxdepth: 4
99

1010
libp2p.stream_muxer.mplex
11+
libp2p.stream_muxer.yamux
1112

1213
Submodules
1314
----------

docs/libp2p.stream_muxer.yamux.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
libp2p.stream\_muxer.yamux
2+
==========================
3+
4+
.. automodule:: libp2p.stream_muxer.yamux
5+
:members:
6+
:undoc-members:
7+
:show-inheritance:

examples/identify_push/identify_push_demo.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,52 @@ async def main() -> None:
8585
logger.info("Host 2 connected to Host 1")
8686
print("Host 2 successfully connected to Host 1")
8787

88+
# Run the identify protocol from host_2 to host_1
89+
# (so Host 1 learns Host 2's address)
90+
from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID
91+
92+
stream = await host_2.new_stream(host_1.get_id(), (IDENTIFY_PROTOCOL_ID,))
93+
response = await stream.read()
94+
await stream.close()
95+
96+
# Run the identify protocol from host_1 to host_2
97+
# (so Host 2 learns Host 1's address)
98+
stream = await host_1.new_stream(host_2.get_id(), (IDENTIFY_PROTOCOL_ID,))
99+
response = await stream.read()
100+
await stream.close()
101+
102+
# --- NEW CODE: Update Host 1's peerstore with Host 2's addresses ---
103+
from libp2p.identity.identify.pb.identify_pb2 import (
104+
Identify,
105+
)
106+
107+
identify_msg = Identify()
108+
identify_msg.ParseFromString(response)
109+
peerstore_1 = host_1.get_peerstore()
110+
peer_id_2 = host_2.get_id()
111+
for addr_bytes in identify_msg.listen_addrs:
112+
maddr = multiaddr.Multiaddr(addr_bytes)
113+
# TTL can be any positive int
114+
peerstore_1.add_addr(
115+
peer_id_2,
116+
maddr,
117+
ttl=3600,
118+
)
119+
# --- END NEW CODE ---
120+
121+
# Now Host 1's peerstore should have Host 2's address
122+
peerstore_1 = host_1.get_peerstore()
123+
peer_id_2 = host_2.get_id()
124+
addrs_1_for_2 = peerstore_1.addrs(peer_id_2)
125+
logger.info(
126+
f"[DEBUG] Host 1 peerstore addresses for Host 2 before push: "
127+
f"{addrs_1_for_2}"
128+
)
129+
print(
130+
f"[DEBUG] Host 1 peerstore addresses for Host 2 before push: "
131+
f"{addrs_1_for_2}"
132+
)
133+
88134
# Push identify information from host_1 to host_2
89135
logger.info("Host 1 pushing identify information to Host 2")
90136
print("\nHost 1 pushing identify information to Host 2...")
@@ -104,6 +150,9 @@ async def main() -> None:
104150
logger.error(f"Error during identify push: {str(e)}")
105151
print(f"\nError during identify push: {str(e)}")
106152

153+
# Add this at the end of your async with block:
154+
await trio.sleep(0.5) # Give background tasks time to finish
155+
107156

108157
if __name__ == "__main__":
109158
trio.run(main)

libp2p/__init__.py

Lines changed: 139 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
1+
from collections.abc import (
2+
Mapping,
3+
)
14
from importlib.metadata import version as __version
5+
from typing import (
6+
Literal,
7+
Optional,
8+
Type,
9+
cast,
10+
)
211

312
from libp2p.abc import (
413
IHost,
14+
IMuxedConn,
515
INetworkService,
616
IPeerRouting,
717
IPeerStore,
18+
ISecureTransport,
819
)
920
from libp2p.crypto.keys import (
1021
KeyPair,
1122
)
1223
from libp2p.crypto.rsa import (
1324
create_new_key_pair,
1425
)
26+
from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
1527
from libp2p.custom_types import (
1628
TMuxerOptions,
1729
TProtocol,
@@ -36,11 +48,17 @@
3648
PLAINTEXT_PROTOCOL_ID,
3749
InsecureTransport,
3850
)
51+
from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID
52+
from libp2p.security.noise.transport import Transport as NoiseTransport
3953
import libp2p.security.secio.transport as secio
4054
from libp2p.stream_muxer.mplex.mplex import (
4155
MPLEX_PROTOCOL_ID,
4256
Mplex,
4357
)
58+
from libp2p.stream_muxer.yamux.yamux import (
59+
Yamux,
60+
)
61+
from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID
4462
from libp2p.transport.tcp.tcp import (
4563
TCP,
4664
)
@@ -54,6 +72,60 @@
5472
# Initialize logging configuration
5573
setup_logging()
5674

75+
# Default multiplexer choice
76+
DEFAULT_MUXER = "YAMUX"
77+
78+
# Multiplexer options
79+
MUXER_YAMUX = "YAMUX"
80+
MUXER_MPLEX = "MPLEX"
81+
82+
83+
def set_default_muxer(muxer_name: Literal["YAMUX", "MPLEX"]) -> None:
84+
"""
85+
Set the default multiplexer protocol to use.
86+
87+
:param muxer_name: Either "YAMUX" or "MPLEX"
88+
:raise ValueError: If an unsupported muxer name is provided
89+
"""
90+
global DEFAULT_MUXER
91+
muxer_upper = muxer_name.upper()
92+
if muxer_upper not in [MUXER_YAMUX, MUXER_MPLEX]:
93+
raise ValueError(f"Unknown muxer: {muxer_name}. Use 'YAMUX' or 'MPLEX'.")
94+
DEFAULT_MUXER = muxer_upper
95+
96+
97+
def get_default_muxer() -> str:
98+
"""
99+
Returns the currently selected default muxer.
100+
101+
:return: Either "YAMUX" or "MPLEX"
102+
"""
103+
return DEFAULT_MUXER
104+
105+
106+
def create_yamux_muxer_option() -> TMuxerOptions:
107+
"""
108+
Returns muxer options with Yamux as the primary choice.
109+
110+
:return: Muxer options with Yamux first
111+
"""
112+
return {
113+
TProtocol(YAMUX_PROTOCOL_ID): Yamux, # Primary choice
114+
TProtocol(MPLEX_PROTOCOL_ID): Mplex, # Fallback for compatibility
115+
}
116+
117+
118+
def create_mplex_muxer_option() -> TMuxerOptions:
119+
"""
120+
Returns muxer options with Mplex as the primary choice.
121+
122+
:return: Muxer options with Mplex first
123+
"""
124+
return {
125+
TProtocol(MPLEX_PROTOCOL_ID): Mplex, # Primary choice
126+
TProtocol(YAMUX_PROTOCOL_ID): Yamux, # Fallback
127+
}
128+
57129

58130
def generate_new_rsa_identity() -> KeyPair:
59131
return create_new_key_pair()
@@ -64,11 +136,24 @@ def generate_peer_id_from(key_pair: KeyPair) -> ID:
64136
return ID.from_pubkey(public_key)
65137

66138

139+
def get_default_muxer_options() -> TMuxerOptions:
140+
"""
141+
Returns the default muxer options based on the current default muxer setting.
142+
143+
:return: Muxer options with the preferred muxer first
144+
"""
145+
if DEFAULT_MUXER == "MPLEX":
146+
return create_mplex_muxer_option()
147+
else: # YAMUX is default
148+
return create_yamux_muxer_option()
149+
150+
67151
def new_swarm(
68-
key_pair: KeyPair = None,
69-
muxer_opt: TMuxerOptions = None,
70-
sec_opt: TSecurityOptions = None,
71-
peerstore_opt: IPeerStore = None,
152+
key_pair: Optional[KeyPair] = None,
153+
muxer_opt: Optional[TMuxerOptions] = None,
154+
sec_opt: Optional[TSecurityOptions] = None,
155+
peerstore_opt: Optional[IPeerStore] = None,
156+
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
72157
) -> INetworkService:
73158
"""
74159
Create a swarm instance based on the parameters.
@@ -77,7 +162,13 @@ def new_swarm(
77162
:param muxer_opt: optional choice of stream muxer
78163
:param sec_opt: optional choice of security upgrade
79164
:param peerstore_opt: optional peerstore
165+
:param muxer_preference: optional explicit muxer preference
80166
:return: return a default swarm instance
167+
168+
Note: Yamux (/yamux/1.0.0) is the preferred stream multiplexer
169+
due to its improved performance and features.
170+
Mplex (/mplex/6.7.0) is retained for backward compatibility
171+
but may be deprecated in the future.
81172
"""
82173
if key_pair is None:
83174
key_pair = generate_new_rsa_identity()
@@ -87,13 +178,41 @@ def new_swarm(
87178
# TODO: Parse `listen_addrs` to determine transport
88179
transport = TCP()
89180

90-
muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex}
91-
security_transports_by_protocol = sec_opt or {
92-
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
181+
# Generate X25519 keypair for Noise
182+
noise_key_pair = create_new_x25519_key_pair()
183+
184+
# Default security transports (using Noise as primary)
185+
secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport] = sec_opt or {
186+
NOISE_PROTOCOL_ID: NoiseTransport(
187+
key_pair, noise_privkey=noise_key_pair.private_key
188+
),
93189
TProtocol(secio.ID): secio.Transport(key_pair),
190+
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
94191
}
192+
193+
# Use given muxer preference if provided, otherwise use global default
194+
if muxer_preference is not None:
195+
temp_pref = muxer_preference.upper()
196+
if temp_pref not in [MUXER_YAMUX, MUXER_MPLEX]:
197+
raise ValueError(
198+
f"Unknown muxer: {muxer_preference}. Use 'YAMUX' or 'MPLEX'."
199+
)
200+
active_preference = temp_pref
201+
else:
202+
active_preference = DEFAULT_MUXER
203+
204+
# Use provided muxer options if given, otherwise create based on preference
205+
if muxer_opt is not None:
206+
muxer_transports_by_protocol = muxer_opt
207+
else:
208+
if active_preference == MUXER_MPLEX:
209+
muxer_transports_by_protocol = create_mplex_muxer_option()
210+
else: # YAMUX is default
211+
muxer_transports_by_protocol = create_yamux_muxer_option()
212+
95213
upgrader = TransportUpgrader(
96-
security_transports_by_protocol, muxer_transports_by_protocol
214+
secure_transports_by_protocol=secure_transports_by_protocol,
215+
muxer_transports_by_protocol=muxer_transports_by_protocol,
97216
)
98217

99218
peerstore = peerstore_opt or PeerStore()
@@ -104,11 +223,12 @@ def new_swarm(
104223

105224

106225
def new_host(
107-
key_pair: KeyPair = None,
108-
muxer_opt: TMuxerOptions = None,
109-
sec_opt: TSecurityOptions = None,
110-
peerstore_opt: IPeerStore = None,
111-
disc_opt: IPeerRouting = None,
226+
key_pair: Optional[KeyPair] = None,
227+
muxer_opt: Optional[TMuxerOptions] = None,
228+
sec_opt: Optional[TSecurityOptions] = None,
229+
peerstore_opt: Optional[IPeerStore] = None,
230+
disc_opt: Optional[IPeerRouting] = None,
231+
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
112232
) -> IHost:
113233
"""
114234
Create a new libp2p host based on the given parameters.
@@ -118,20 +238,20 @@ def new_host(
118238
:param sec_opt: optional choice of security upgrade
119239
:param peerstore_opt: optional peerstore
120240
:param disc_opt: optional discovery
241+
:param muxer_preference: optional explicit muxer preference
121242
:return: return a host instance
122243
"""
123244
swarm = new_swarm(
124245
key_pair=key_pair,
125246
muxer_opt=muxer_opt,
126247
sec_opt=sec_opt,
127248
peerstore_opt=peerstore_opt,
249+
muxer_preference=muxer_preference,
128250
)
129-
host: IHost
130-
if disc_opt:
131-
host = RoutedHost(swarm, disc_opt)
132-
else:
133-
host = BasicHost(swarm)
134-
return host
251+
252+
if disc_opt is not None:
253+
return RoutedHost(swarm, disc_opt)
254+
return BasicHost(swarm)
135255

136256

137257
__version__ = __version("libp2p")

0 commit comments

Comments
 (0)