Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
128d837
feat: Replace mplex with yamux as default multiplexer in py-libp2p
paschal533 Apr 6, 2025
debf65b
Merge branch 'feature/replace-mplex-with-yamux' of https://github.com…
paschal533 Apr 6, 2025
a398fd3
Retain Mplex alongside Yamux in new_swarm with messaging that Yamux i…
paschal533 Apr 6, 2025
50b5297
moved !BBHII to a constant YAMUX_HEADER_FORMAT at the top of yamux.py…
paschal533 Apr 8, 2025
1224981
renamed the news fragment to 534.feature.rst and updated the description
paschal533 Apr 8, 2025
051ac92
renamed the news fragment to 534.feature.rst and updated the description
paschal533 Apr 8, 2025
ba1203f
added a docstring to clarify that Yamux does not support deadlines na…
paschal533 Apr 8, 2025
74816d0
Remove the __main__ block entirely from test_yamux.py
paschal533 Apr 8, 2025
d4fef3d
Replaced the print statements in test_yamux.py with logging.debug
paschal533 Apr 8, 2025
03653df
Added a comment linking to the spec for clarity
paschal533 Apr 8, 2025
d596c79
Merge branch 'libp2p:main' into feature/replace-mplex-with-yamux
paschal533 Apr 13, 2025
7a83298
Raise NotImplementedError in YamuxStream.set_deadline per review
paschal533 Apr 13, 2025
fb28ef9
Add muxed_conn to YamuxStream and test deadline NotImplementedError
paschal533 Apr 13, 2025
78ff27d
Fix Yamux implementation to meet libp2p spec
paschal533 Apr 16, 2025
b176eb1
Fix None handling in YamuxStream.read and Yamux.read_stream
paschal533 Apr 19, 2025
860a11a
Fix test_connected_peers.py to correctly handle peer connections
paschal533 Apr 23, 2025
1bebdfa
fix: Ensure StreamReset is raised on read after local reset in yamux
paschal533 Apr 23, 2025
992565a
fix: Map MuxedStreamError to StreamClosed in NetStream.write for Yamux
paschal533 Apr 24, 2025
5f94e26
fix: Raise MuxedStreamReset in Yamux.read_stream for closed streams
paschal533 Apr 24, 2025
4c48ec0
fix: Correct Yamux stream read behavior for NetStream tests
paschal533 Apr 24, 2025
aed8605
fix: Correct Yamux stream read behavior for NetStream tests
paschal533 Apr 24, 2025
c0a0f04
Merge branch 'feature/replace-mplex-with-yamux' of https://github.com…
paschal533 Apr 30, 2025
18f0d07
fix: raise StreamEOF when reading from closed stream with empty buffer
paschal533 May 1, 2025
3f9247e
fix: prioritize returning buffered data even after stream reset
paschal533 May 1, 2025
1d42c13
fix: prioritize returning buffered data even after stream reset
paschal533 May 1, 2025
3dd13a4
Merge branch 'feature/replace-mplex-with-yamux' of https://github.com…
paschal533 May 1, 2025
c1993ba
fix: Ensure test_net_stream_read_after_remote_closed_and_reset passes…
paschal533 May 5, 2025
fa6343a
Merge branch 'libp2p:main' into feature/replace-mplex-with-yamux
paschal533 May 5, 2025
0d8a41d
fix: Add __init__.py to yamux module to fix documentation build
paschal533 May 6, 2025
0de5de2
fix: Add __init__.py to yamux module to fix documentation build
paschal533 May 6, 2025
a4874f7
Merge branch 'feature/replace-mplex-with-yamux' of https://github.com…
paschal533 May 6, 2025
575e6c1
fix: Add libp2p.stream_muxer.yamux to libp2p.stream_muxer.rst toctree
paschal533 May 6, 2025
dd51625
fix: Correct title underline length in libp2p.stream_muxer.yamux.rst
paschal533 May 6, 2025
9efce53
fix: Add a = so that is matches the libp2p.stream\_muxer.yamux length
paschal533 May 6, 2025
e4e1efc
Merge branch 'feature/replace-mplex-with-yamux' of https://github.com…
paschal533 May 6, 2025
f17c2da
fix(tests): Resolve race condition in network notification test
paschal533 May 6, 2025
93eea37
fix: fixing failing tests and examples with yamux and noise
acul71 May 8, 2025
c848399
refactor: remove debug logging and improve x25519 tests
acul71 May 8, 2025
1c0fd20
fix: Add functionality for users to choose between Yamux and Mplex
paschal533 May 14, 2025
7fe6567
fix: increased trio sleep to 0.1 sec for slow environment
paschal533 May 14, 2025
61afc6c
feat: Add test for switching between Yamux and mplex
paschal533 May 14, 2025
5f02564
refactor: move host fixtures to interop tests
acul71 May 18, 2025
b36015a
merge: integrate muxer selection and noise transport
acul71 May 18, 2025
718a27c
chore: Update __init__.py removing unused import
acul71 May 19, 2025
3b93d5b
lint: fix import order
acul71 May 19, 2025
02c8d77
Merge pull request #5 from acul71/feature/replace-mplex-with-yamux
paschal533 May 19, 2025
d8b23a1
fix: Resolve conftest.py conflict by removing trio test support
paschal533 May 19, 2025
10afeaf
Merge branch 'feature/replace-mplex-with-yamux' into feature/remove-d…
paschal533 May 20, 2025
3fb4ed1
fix: Resolve test skipping by keeping trio test support
paschal533 May 20, 2025
f65d835
Merge branch 'libp2p:main' into feature/replace-mplex-with-yamux
paschal533 May 20, 2025
d206d84
Merge branch 'feature/replace-mplex-with-yamux' of https://github.com…
paschal533 May 20, 2025
be5018b
Fix: add a newline at end of the file
paschal533 May 20, 2025
0880fb4
Merge pull request #3 from acul71/feature/remove-debug-logging
paschal533 May 20, 2025
64d278b
delete old interop, turn on with placeholders, add py312 and py313 to…
pacrob May 8, 2025
bf01bd7
interop: initial commit
lla-dane May 18, 2025
ac32b6c
fix: handshake_time unit
lla-dane May 18, 2025
7980573
update readme
lla-dane May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/libp2p.stream_muxer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Subpackages
:maxdepth: 4

libp2p.stream_muxer.mplex
libp2p.stream_muxer.yamux

Submodules
----------
Expand Down
7 changes: 7 additions & 0 deletions docs/libp2p.stream_muxer.yamux.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
libp2p.stream\_muxer.yamux
==========================

.. automodule:: libp2p.stream_muxer.yamux
:members:
:undoc-members:
:show-inheritance:
49 changes: 49 additions & 0 deletions examples/identify_push/identify_push_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,52 @@ async def main() -> None:
logger.info("Host 2 connected to Host 1")
print("Host 2 successfully connected to Host 1")

# Run the identify protocol from host_2 to host_1
# (so Host 1 learns Host 2's address)
from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID

stream = await host_2.new_stream(host_1.get_id(), (IDENTIFY_PROTOCOL_ID,))
response = await stream.read()
await stream.close()

# Run the identify protocol from host_1 to host_2
# (so Host 2 learns Host 1's address)
stream = await host_1.new_stream(host_2.get_id(), (IDENTIFY_PROTOCOL_ID,))
response = await stream.read()
await stream.close()

# --- NEW CODE: Update Host 1's peerstore with Host 2's addresses ---
from libp2p.identity.identify.pb.identify_pb2 import (
Identify,
)

identify_msg = Identify()
identify_msg.ParseFromString(response)
peerstore_1 = host_1.get_peerstore()
peer_id_2 = host_2.get_id()
for addr_bytes in identify_msg.listen_addrs:
maddr = multiaddr.Multiaddr(addr_bytes)
# TTL can be any positive int
peerstore_1.add_addr(
peer_id_2,
maddr,
ttl=3600,
)
# --- END NEW CODE ---

# Now Host 1's peerstore should have Host 2's address
peerstore_1 = host_1.get_peerstore()
peer_id_2 = host_2.get_id()
addrs_1_for_2 = peerstore_1.addrs(peer_id_2)
logger.info(
f"[DEBUG] Host 1 peerstore addresses for Host 2 before push: "
f"{addrs_1_for_2}"
)
print(
f"[DEBUG] Host 1 peerstore addresses for Host 2 before push: "
f"{addrs_1_for_2}"
)

# Push identify information from host_1 to host_2
logger.info("Host 1 pushing identify information to Host 2")
print("\nHost 1 pushing identify information to Host 2...")
Expand All @@ -104,6 +150,9 @@ async def main() -> None:
logger.error(f"Error during identify push: {str(e)}")
print(f"\nError during identify push: {str(e)}")

# Add this at the end of your async with block:
await trio.sleep(0.5) # Give background tasks time to finish


if __name__ == "__main__":
trio.run(main)
Expand Down
19 changes: 19 additions & 0 deletions interop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
These commands are to be run in `./interop/exec`

## Redis

```bash
docker run -p 6379:6379 -it redis:latest
```

## Listener

```bash
transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
```

## Dialer

```bash
transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
```
Empty file added interop/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions interop/arch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from dataclasses import (
dataclass,
)

import multiaddr
import redis
import trio

from libp2p import (
new_host,
)
from libp2p.crypto.keys import (
KeyPair,
)
from libp2p.crypto.rsa import (
create_new_key_pair,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.security.insecure.transport import (
PLAINTEXT_PROTOCOL_ID,
InsecureTransport,
)
import libp2p.security.secio.transport as secio
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_PROTOCOL_ID,
Mplex,
)


def generate_new_rsa_identity() -> KeyPair:
return create_new_key_pair()


async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str):
match (sec_protocol, muxer):
case ("insecure", "mplex"):
key_pair = create_new_key_pair()
host = new_host(
key_pair,
{MPLEX_PROTOCOL_ID: Mplex},
{
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
TProtocol(secio.ID): secio.Transport(key_pair),
},
)
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
return (host, muladdr)
case _:
raise ValueError("Protocols not supported")


@dataclass
class RedisClient:
client: redis.Redis

def blpop(self, key: str, timeout: float) -> list[str]:
result = self.client.blpop([key], timeout)
return [result[1]] if result else []

def rpush(self, key: str, value: str) -> None:
self.client.rpush(key, value)


async def main():
client = RedisClient(redis.Redis(host="localhost", port=6379, db=0))
client.rpush("test", "hello")
print(client.blpop("test", timeout=5))


if __name__ == "__main__":
trio.run(main)
57 changes: 57 additions & 0 deletions interop/exec/config/mod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dataclasses import (
dataclass,
)
import os
from typing import (
Optional,
)


def str_to_bool(val: str) -> bool:
return val.lower() in ("true", "1")


class ConfigError(Exception):
"""Raised when the required environment variables are missing or invalid"""


@dataclass
class Config:
transport: str
sec_protocol: Optional[str]
muxer: Optional[str]
ip: str
is_dialer: bool
test_timeout: int
redis_addr: str
port: str

@classmethod
def from_env(cls) -> "Config":
try:
transport = os.environ["transport"]
ip = os.environ["ip"]
except KeyError as e:
raise ConfigError(f"{e.args[0]} env variable not set") from None

try:
is_dialer = str_to_bool(os.environ.get("is_dialer", "true"))
test_timeout = int(os.environ.get("test_timeout", "180"))
except ValueError as e:
raise ConfigError(f"Invalid value in env: {e}") from None

redis_addr = os.environ.get("redis_addr", 6379)
sec_protocol = os.environ.get("security")
muxer = os.environ.get("muxer")
port = os.environ.get("port", "8000")

return cls(
transport=transport,
sec_protocol=sec_protocol,
muxer=muxer,
ip=ip,
is_dialer=is_dialer,
test_timeout=test_timeout,
redis_addr=redis_addr,
port=port,
)
33 changes: 33 additions & 0 deletions interop/exec/native_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import trio

from interop.exec.config.mod import (
Config,
ConfigError,
)
from interop.lib import (
run_test,
)


async def main() -> None:
try:
config = Config.from_env()
except ConfigError as e:
print(f"Config error: {e}")
return

# Uncomment and implement when ready
_ = await run_test(
config.transport,
config.ip,
config.port,
config.is_dialer,
config.test_timeout,
config.redis_addr,
config.sec_protocol,
config.muxer,
)


if __name__ == "__main__":
trio.run(main)
112 changes: 112 additions & 0 deletions interop/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from dataclasses import (
dataclass,
)
import json
import time

from loguru import (
logger,
)
import multiaddr
import redis
import trio

from interop.arch import (
RedisClient,
build_host,
)
from libp2p.custom_types import (
TProtocol,
)
from libp2p.network.stream.net_stream import (
INetStream,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)

PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
RESP_TIMEOUT = 60


async def handle_ping(stream: INetStream) -> None:
while True:
try:
payload = await stream.read(PING_LENGTH)
peer_id = stream.muxed_conn.peer_id
if payload is not None:
print(f"received ping from {peer_id}")

await stream.write(payload)
print(f"responded with pong to {peer_id}")

except Exception:
await stream.reset()
break


async def send_ping(stream: INetStream) -> None:
try:
payload = b"\x01" * PING_LENGTH
print(f"sending ping to {stream.muxed_conn.peer_id}")

await stream.write(payload)

with trio.fail_after(RESP_TIMEOUT):
response = await stream.read(PING_LENGTH)

if response == payload:
print(f"received pong from {stream.muxed_conn.peer_id}")

except Exception as e:
print(f"error occurred: {e}")


async def run_test(
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
):
logger.info("Starting run_test")

redis_client = RedisClient(
redis.Redis(host="localhost", port=int(redis_addr), db=0)
)
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
logger.info(f"Running ping test local_peer={host.get_id()}")

async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
if not is_dialer:
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
redis_client.rpush("listenerAddr", ma)

logger.info(f"Test instance, listening: {ma}")
else:
redis_addr = redis_client.blpop("listenerAddr", timeout=5)
destination = redis_addr[0].decode()
maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(maddr)

handshake_start = time.perf_counter()

await host.connect(info)
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])

logger.info("Remote conection established")
nursery.start_soon(send_ping, stream)

handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0

logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
return

await trio.sleep_forever()


@dataclass
class Report:
handshake_plus_one_rtt_millis: float
ping_rtt_millis: float

def gen_report(self):
return json.dumps(self.__dict__)
Loading