diff --git a/Makefile b/Makefile index 0d8ca81a2..a7b319fb4 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ typecheck: pre-commit run mypy-local --all-files && pre-commit run pyrefly-local --all-files test: - python -m pytest tests -n auto + python -m pytest tests -n auto --timeout=300 pr: clean fix lint typecheck test diff --git a/docs/conf.py b/docs/conf.py index 446252f1f..bd37d6ac1 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -290,7 +290,7 @@ ] # Prevent autodoc from trying to import module from tests.factories -autodoc_mock_imports = ["tests.factories"] +autodoc_mock_imports = ["tests.factories", "redis"] # Documents to append as an appendix to all manuals. # texinfo_appendices = [] diff --git a/docs/index.rst b/docs/index.rst index 3031f067a..66192aed1 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -18,6 +18,7 @@ The Python implementation of the libp2p networking stack Examples API + Interop .. toctree:: :maxdepth: 1 diff --git a/docs/interop.rst b/docs/interop.rst new file mode 100644 index 000000000..6485fc972 --- /dev/null +++ b/docs/interop.rst @@ -0,0 +1,29 @@ +interop package +=============== + +Submodules +---------- + +interop.arch module +------------------- + +.. automodule:: interop.arch + :members: + :show-inheritance: + :undoc-members: + +interop.lib module +------------------ + +.. automodule:: interop.lib + :members: + :show-inheritance: + :undoc-members: + +Module contents +--------------- + +.. automodule:: interop + :members: + :show-inheritance: + :undoc-members: diff --git a/examples/ping/ping.py b/examples/ping/ping.py index d1a5daae4..83e6e239d 100644 --- a/examples/ping/ping.py +++ b/examples/ping/ping.py @@ -1,4 +1,5 @@ import argparse +import logging import multiaddr import trio @@ -55,8 +56,8 @@ async def send_ping(stream: INetStream) -> None: async def run(port: int, destination: str) -> None: - listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - host = new_host(listen_addrs=[listen_addr]) + listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") + host = new_host() async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: # Start the peer-store cleanup task @@ -106,8 +107,20 @@ def main() -> None: type=str, help=f"destination multiaddr string, e.g. {example_maddr}", ) + parser.add_argument( + "-v", "--verbose", action="store_true", help="enable verbose logging" + ) + args = parser.parse_args() + if args.verbose: + # Enable even more detailed logging + logging.getLogger("libp2p").setLevel(logging.DEBUG) + logging.getLogger("libp2p.network").setLevel(logging.DEBUG) + logging.getLogger("libp2p.transport").setLevel(logging.DEBUG) + logging.getLogger("libp2p.security").setLevel(logging.DEBUG) + logging.getLogger("libp2p.stream_muxer").setLevel(logging.DEBUG) + try: trio.run(run, *(args.port, args.destination)) except KeyboardInterrupt: diff --git a/interop/README.md b/interop/README.md new file mode 100644 index 000000000..ce03d2d5f --- /dev/null +++ b/interop/README.md @@ -0,0 +1,25 @@ +These commands are to be run in `./interop/exec` + +## Redis + +```bash +docker run -p 6379:6379 -it redis:latest +``` + +## Listener + +```bash +transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=false python3 interop/exec/native_ping.py +``` + +## Dialer + +```bash +transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=true python3 interop/exec/native_ping.py +``` + +## From the Rust-side (Listener) + +```bash +RUST_LOG=debug redis_addr=localhost:6379 ip="0.0.0.0" transport=tcp security=noise muxer=yamux is_dialer="false" cargo run --bin native_ping +``` diff --git a/interop/__init__.py b/interop/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/interop/arch.py b/interop/arch.py new file mode 100644 index 000000000..d28a860d6 --- /dev/null +++ b/interop/arch.py @@ -0,0 +1,157 @@ +from dataclasses import ( + dataclass, +) +import logging + +from cryptography.hazmat.primitives.asymmetric import ( + x25519, +) +import multiaddr +import redis +import trio + +from libp2p import ( + new_host, +) +from libp2p.crypto.keys import ( + KeyPair, +) +from libp2p.crypto.rsa import ( + create_new_key_pair, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.security.insecure.transport import ( + PLAINTEXT_PROTOCOL_ID, + InsecureTransport, +) +from libp2p.security.noise.transport import ( + Transport as NoiseTransport, +) +import libp2p.security.secio.transport as secio +from libp2p.stream_muxer.mplex.mplex import ( + MPLEX_PROTOCOL_ID, + Mplex, +) +from libp2p.stream_muxer.yamux.yamux import ( + PROTOCOL_ID as YAMUX_PROTOCOL_ID, + Yamux, +) + +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + + +def generate_new_rsa_identity() -> KeyPair: + return create_new_key_pair() + + +def create_noise_keypair(): + """Create a Noise protocol keypair for secure communication""" + try: + x25519_private_key = x25519.X25519PrivateKey.generate() + + class NoisePrivateKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.private_bytes_raw() + + def public_key(self): + return NoisePublicKey(self._key.public_key()) + + def get_public_key(self): + return NoisePublicKey(self._key.public_key()) + + class NoisePublicKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.public_bytes_raw() + + return NoisePrivateKey(x25519_private_key) + except Exception as e: + logging.error(f"Failed to create Noise keypair: {e}") + return None + + +async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str): + match (sec_protocol, muxer): + case ("insecure", "mplex"): + key_pair = create_new_key_pair() + host = new_host( + key_pair, + {TProtocol(MPLEX_PROTOCOL_ID): Mplex}, + { + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair), + TProtocol(secio.ID): secio.Transport(key_pair), + }, + ) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) + case ("insecure", "yamux"): + key_pair = create_new_key_pair() + host = new_host( + key_pair, + {TProtocol(YAMUX_PROTOCOL_ID): Yamux}, + { + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair), + TProtocol(secio.ID): secio.Transport(key_pair), + }, + ) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) + case ("noise", "yamux"): + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + if not noise_privkey: + print("[ERROR] Failed to create Noise keypair") + return 1 + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) + case _: + raise ValueError("Protocols not supported") + + +@dataclass +class RedisClient: + client: redis.Redis + + def brpop(self, key: str, timeout: float) -> list[str]: + result = self.client.brpop([key], timeout) + return [result[1]] if result else [] + + def rpush(self, key: str, value: str) -> None: + self.client.rpush(key, value) + + +async def main(): + client = RedisClient(redis.Redis(host="localhost", port=6379, db=0)) + client.rpush("test", "hello") + print(client.blpop("test", timeout=5)) + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/exec/config/mod.py b/interop/exec/config/mod.py new file mode 100644 index 000000000..aa1e93044 --- /dev/null +++ b/interop/exec/config/mod.py @@ -0,0 +1,54 @@ +from dataclasses import ( + dataclass, +) +import os + + +def str_to_bool(val: str) -> bool: + return val.lower() in ("true", "1") + + +class ConfigError(Exception): + """Raised when the required environment variables are missing or invalid""" + + +@dataclass +class Config: + transport: str + sec_protocol: str | None + muxer: str | None + ip: str + is_dialer: bool + test_timeout: int + redis_addr: str + port: str + + @classmethod + def from_env(cls) -> "Config": + try: + transport = os.environ["transport"] + ip = os.environ["ip"] + except KeyError as e: + raise ConfigError(f"{e.args[0]} env variable not set") from None + + try: + is_dialer = str_to_bool(os.environ.get("is_dialer", "true")) + test_timeout = int(os.environ.get("test_timeout", "180")) + except ValueError as e: + raise ConfigError(f"Invalid value in env: {e}") from None + + redis_addr = os.environ.get("redis_addr", 6379) + sec_protocol = os.environ.get("security") + muxer = os.environ.get("muxer") + port = os.environ.get("port", "8000") + + return cls( + transport=transport, + sec_protocol=sec_protocol, + muxer=muxer, + ip=ip, + is_dialer=is_dialer, + test_timeout=test_timeout, + redis_addr=redis_addr, + port=port, + ) diff --git a/interop/exec/native_ping.py b/interop/exec/native_ping.py new file mode 100644 index 000000000..3578d0c60 --- /dev/null +++ b/interop/exec/native_ping.py @@ -0,0 +1,33 @@ +import trio + +from interop.exec.config.mod import ( + Config, + ConfigError, +) +from interop.lib import ( + run_test, +) + + +async def main() -> None: + try: + config = Config.from_env() + except ConfigError as e: + print(f"Config error: {e}") + return + + # Uncomment and implement when ready + _ = await run_test( + config.transport, + config.ip, + config.port, + config.is_dialer, + config.test_timeout, + config.redis_addr, + config.sec_protocol, + config.muxer, + ) + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/lib.py b/interop/lib.py new file mode 100644 index 000000000..abd8275aa --- /dev/null +++ b/interop/lib.py @@ -0,0 +1,245 @@ +import logging + +import multiaddr +import redis +import trio + +from interop.arch import ( + RedisClient, + build_host, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.net_stream import ( + INetStream, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) + +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + +PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") +PING_LENGTH = 32 +RESP_TIMEOUT = 60 + + +async def handle_ping(stream: INetStream) -> None: + """Handle incoming ping requests from rust-libp2p clients""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] New ping stream opened by {peer_id}") + logging.info(f"Ping handler called for peer {peer_id}") + + ping_count = 0 + + try: + while True: + try: + print(f"[INFO] Wailting for ping data from {peer_id}...") + logging.debug(f"Stream state: {stream}") + data = await stream.read(PING_LENGTH) + + if not data: + print( + f"[INFO] No data received,conneciton likely closed by {peer_id}" + ) + logging.debug("No data received, stream closed") + break + + if len(data) == 0: + print(f"[INFO] Empty data received, connection closed by {peer_id}") + logging.debug("Empty data received") + break + + ping_count += 1 + print( + f"[PING {ping_count}] Received ping from {peer_id}:" + f" {len(data)} bytes" + ) + logging.debug(f"Ping data: {data.hex()}") + + # Echo the data back (this is what ping protocol does) + await stream.write(data) + print(f"[PING {ping_count}] Echoed ping back to {peer_id}") + + except Exception as e: + print(f"[ERROR] Error in ping loop with {peer_id}: {e}") + logging.exception("Ping loop error") + break + + except Exception as e: + print(f"[ERROR] Error handling ping from {peer_id}: {e}") + logging.exception("Ping handler error") + finally: + try: + print(f"[INFO] Closing ping stream with {peer_id}") + await stream.close() + except Exception as e: + logging.debug(f"Error closing stream: {e}") + + print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") + + +async def send_ping(stream: INetStream, count: int = 1) -> None: + """Send a sequence of pings compatible with rust-libp2p.""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") + + import os + import time + + rtts = [] + + for i in range(1, count + 1): + try: + # Generate random 32-byte payload as per ping protcol spec + payload = os.urandom(PING_LENGTH) + print(f"[PING {i}/{count}] Sending ping to {peer_id}") + logging.debug(f"Sending payload: {payload.hex()}") + start_time = time.time() + + await stream.write(payload) + + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + end_time = time.time() + rtt = (end_time - start_time) * 1000 + + if ( + response + and len(response) >= PING_LENGTH + and response[:PING_LENGTH] == payload + ): + rtts.append(rtt) + print(f"[PING {i}] Successful RTT: {rtt:.2f}ms") + else: + print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") + if response: + logging.debug(f"Expecte: {payload.hex()}") + logging.debug(f"Received: {response.hex()}") + + if i < count: + await trio.sleep(1) + + except trio.TooSlowError: + print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") + except Exception as e: + print(f"[ERROR] Ping {i} failed: {e}") + logging.exception(f"Ping {i} error") + + # Print statistics + if rtts: + avg_rtt = sum(rtts) / len(rtts) + min_rtt = min(rtts) + max_rtt = max(rtts) # Fixed typo: was max_rtts + success_count = len(rtts) + loss_rate = ((count - success_count) / count) * 100 + + print("\n[STATS] Ping Statistics:") + print( + f" Packets: Sent={count}, Received={success_count}," + f" Lost={count - success_count}" + ) + print(f" Loss rate: {loss_rate:.1f}%") + print(f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms, max={max_rtt:.2f}ms") + else: + print(f"\n[STATS] All pings failed ({count} attempts)") + + +async def run_test( + transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer +): + redis_client = RedisClient( + redis.Redis(host="localhost", port=int(redis_addr), db=0) + ) + (host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer) + async with host.run(listen_addrs=[listen_addr]): + if not is_dialer: + print("[INFO] Starting py-libp2p ping server...") + + print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") + host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + + # Also register alternative protocol IDs for better compatibilty + alt_protcols = [ + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] + + for alt_proto in alt_protcols: + print(f"[INFO] Also registering handler for: {alt_proto}") + host.set_stream_handler(alt_proto, handle_ping) + + print("[INFO] Server started successfully!") + print(f"[INFO] Peer ID: {host.get_id()}") + print(f"[INFO] Listening: /ip4/{ip}/tcp/{port}") + print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") + + ma = f"{listen_addr}/p2p/{host.get_id().pretty()}" + redis_client.rpush("listenerAddr", ma) + + print("[INFO] Pushed address to Redis database") + await trio.sleep_forever() + else: + print("[INFO] Starting py-libp2p ping client...") + + print("[INFO] Fetching remote address from Redis database...") + redis_addr = redis_client.brpop("listenerAddr", timeout=5) + destination = redis_addr[0].decode() + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + target_peer_id = info.peer_id + + print(f"[INFO] Our Peer ID: {host.get_id()}") + print(f"[INFO] Target: {destination}") + print(f"[INFO] Target Peer ID: {target_peer_id}") + print("[INFO] Connecting to peer...") + + await host.connect(info) + print("[INFO] Connection established!") + + # Try protocols in order of preference + # Start with the standard libp2p ping protocol + protocols_to_try = [ + PING_PROTOCOL_ID, # /ipfs/ping/1.0.0 - standard protocol + TProtocol("/ping/1.0.0"), # Alternative + TProtocol("/libp2p/ping/1.0.0"), # Another alternative + ] + + stream = None + + for proto in protocols_to_try: + try: + print(f"[INFO] Trying to open stream with protocol: {proto}") + stream = await host.new_stream(target_peer_id, [proto]) + print(f"[INFO] Stream opened with protocol: {proto}") + break + except Exception as e: + print(f"[ERROR] Failed to open stream with {proto}: {e}") + logging.debug(f"Protocol {proto} failed: {e}") + continue + + if not stream: + print("[ERROR] Failed to open stream with any ping protocol") + print("[ERROR] Ensure the target peer supports one of these protocols") + for proto in protocols_to_try: + print(f"[ERROR] - {proto}") + return 1 + + await send_ping(stream) + + await stream.close() + print("[INFO] Stream closed successfully") + + print("\n[INFO] Client stopped") + return 0 diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index a0311bd89..26e7471c8 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -221,7 +221,6 @@ async def new_stream( :return: stream: new stream created """ net_stream = await self._network.new_stream(peer_id) - # Perform protocol muxing to determine protocol to use try: selected_protocol = await self.multiselect_client.select_one_of( diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 67d462797..1991017a2 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -227,6 +227,7 @@ async def new_stream(self, peer_id: ID) -> INetStream: swarm_conn = await self.dial_peer(peer_id) net_stream = await swarm_conn.new_stream() + logger.debug("successfully opened a stream to peer %s", peer_id) return net_stream diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index b2711e1a8..3d4952667 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -33,6 +33,9 @@ from libp2p.io.exceptions import ( IncompleteReadError, ) +from libp2p.io.utils import ( + read_exactly, +) from libp2p.network.connection.exceptions import ( RawConnError, ) @@ -103,44 +106,25 @@ async def write(self, data: bytes) -> None: sent = 0 logger.debug(f"Stream {self.stream_id}: Starts writing {total_len} bytes ") while sent < total_len: - # Wait for available window with timeout - timeout = False - async with self.window_lock: - if self.send_window == 0: - logger.debug( - f"Stream {self.stream_id}: Window is zero, waiting for update" - ) - # Release lock and wait with timeout - self.window_lock.release() - # To avoid re-acquiring the lock immediately, - with trio.move_on_after(5.0) as cancel_scope: - while self.send_window == 0 and not self.closed: - await trio.sleep(0.01) - # If we timed out, cancel the scope - timeout = cancel_scope.cancelled_caught - # Re-acquire lock - await self.window_lock.acquire() - - # If we timed out waiting for window update, raise an error - if timeout: - raise MuxedStreamError( - "Timed out waiting for window update after 5 seconds." - ) + # Wait for the available window + while self.send_window == 0 and not self.closed: + await trio.sleep(0.01) - if self.closed: - raise MuxedStreamError("Stream is closed") + if self.closed: + raise MuxedStreamError("Stream is closed") - # Calculate how much we can send now + # Calculate how much we can send now + async with self.window_lock: to_send = min(self.send_window, total_len - sent) chunk = data[sent : sent + to_send] self.send_window -= to_send - # Send the data - header = struct.pack( - YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk) - ) - await self.conn.secured_conn.write(header + chunk) - sent += to_send + # Send the data + header = struct.pack( + YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk) + ) + await self.conn.secured_conn.write(header + chunk) + sent += to_send async def send_window_update(self, increment: int, skip_lock: bool = False) -> None: """ @@ -397,7 +381,6 @@ async def close(self, error_code: int = GO_AWAY_NORMAL) -> None: else: if self.on_close is not None: await self.on_close() - await trio.sleep(0.1) @property def is_closed(self) -> bool: @@ -518,81 +501,80 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes: # Wait for data if stream is still open logger.debug(f"Waiting for data on stream {self.peer_id}:{stream_id}") try: - await self.stream_events[stream_id].wait() - self.stream_events[stream_id] = trio.Event() + await self._wait_for_stream_event(stream_id) except KeyError: raise MuxedStreamEOF("Stream was removed") # This line should never be reached, but satisfies the type checker raise MuxedStreamEOF("Unexpected end of read_stream") + async def _wait_for_stream_event(self, stream_id: int) -> None: + async with self.streams_lock: + if stream_id not in self.stream_events or self.event_shutting_down.is_set(): + return + event = self.stream_events[stream_id] + + try: + await event.wait() + except trio.Cancelled: + raise + + async with self.streams_lock: + if ( + stream_id in self.stream_events + and not self.event_shutting_down.is_set() + ): + self.stream_events[stream_id] = trio.Event() + async def handle_incoming(self) -> None: while not self.event_shutting_down.is_set(): try: - header = await self.secured_conn.read(HEADER_SIZE) - if not header or len(header) < HEADER_SIZE: - logger.debug( - f"Connection closed orincomplete header for peer {self.peer_id}" + try: + header = await read_exactly(self.secured_conn, HEADER_SIZE) + except IncompleteReadError: + logging.debug( + f"Connection closed or incomplete header for peer " + f"{self.peer_id}" ) self.event_shutting_down.set() await self._cleanup_on_error() break + + # Debug: log raw header bytes + logging.debug(f"Raw header bytes: {header.hex()}") + version, typ, flags, stream_id, length = struct.unpack( YAMUX_HEADER_FORMAT, header ) logger.debug( f"Received header for peer {self.peer_id}:" - f"type={typ}, flags={flags}, stream_id={stream_id}," - f"length={length}" + f"version={version}, type={typ}, flags={flags}, " + f"stream_id={stream_id}, length={length}" ) - if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN: - async with self.streams_lock: - if stream_id not in self.streams: - stream = YamuxStream(stream_id, self, False) - self.streams[stream_id] = stream - self.stream_buffers[stream_id] = bytearray() - self.stream_events[stream_id] = trio.Event() - ack_header = struct.pack( - YAMUX_HEADER_FORMAT, - 0, - TYPE_DATA, - FLAG_ACK, - stream_id, - 0, - ) - await self.secured_conn.write(ack_header) - logger.debug( - f"Sending stream {stream_id}" - f"to channel for peer {self.peer_id}" - ) - await self.new_stream_send_channel.send(stream) - else: - rst_header = struct.pack( - YAMUX_HEADER_FORMAT, - 0, - TYPE_DATA, - FLAG_RST, - stream_id, - 0, - ) - await self.secured_conn.write(rst_header) - elif typ == TYPE_DATA and flags & FLAG_RST: - async with self.streams_lock: - if stream_id in self.streams: - logger.debug( - f"Resetting stream {stream_id} for peer {self.peer_id}" - ) - self.streams[stream_id].closed = True - self.streams[stream_id].reset_received = True - self.stream_events[stream_id].set() - elif typ == TYPE_DATA and flags & FLAG_ACK: - async with self.streams_lock: - if stream_id in self.streams: - logger.debug( - f"Received ACK for stream" - f"{stream_id} for peer {self.peer_id}" - ) - elif typ == TYPE_GO_AWAY: + + data = b"" + if typ == TYPE_DATA and length > 0: + try: + data = await read_exactly(self.secured_conn, length) + # Ensure data is never None + if data is None: + data = b"" + logging.debug( + f"Read {len(data)} bytes of data for stream {stream_id}" + ) + except Exception as e: + logging.error(f"Error reading data for stream {stream_id}: {e}") + data = b"" # Ensure data is never None + # Mark stream as closed on read error + async with self.streams_lock: + if stream_id in self.streams: + self.streams[stream_id].recv_closed = True + if self.streams[stream_id].send_closed: + self.streams[stream_id].closed = True + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + + if typ == TYPE_GO_AWAY: error_code = length if error_code == GO_AWAY_NORMAL: logger.debug( @@ -630,32 +612,6 @@ async def handle_incoming(self) -> None: f"Received ping response with value" f"{length} for peer {self.peer_id}" ) - elif typ == TYPE_DATA: - try: - data = ( - await self.secured_conn.read(length) if length > 0 else b"" - ) - async with self.streams_lock: - if stream_id in self.streams: - self.stream_buffers[stream_id].extend(data) - self.stream_events[stream_id].set() - if flags & FLAG_FIN: - logger.debug( - f"Received FIN for stream {self.peer_id}:" - f"{stream_id}, marking recv_closed" - ) - self.streams[stream_id].recv_closed = True - if self.streams[stream_id].send_closed: - self.streams[stream_id].closed = True - except Exception as e: - logger.error(f"Error reading data for stream {stream_id}: {e}") - # Mark stream as closed on read error - async with self.streams_lock: - if stream_id in self.streams: - self.streams[stream_id].recv_closed = True - if self.streams[stream_id].send_closed: - self.streams[stream_id].closed = True - self.stream_events[stream_id].set() elif typ == TYPE_WINDOW_UPDATE: increment = length async with self.streams_lock: @@ -668,6 +624,85 @@ async def handle_incoming(self) -> None: f" increment: {increment}" ) stream.send_window += increment + elif typ == TYPE_DATA: + async with self.streams_lock: + if stream_id in self.streams: + # Store data - ensure data is not None before extending + if data is not None and len(data) > 0: + self.stream_buffers[stream_id].extend(data) + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + # Handle flags + if flags & FLAG_SYN: + logging.debug( + f"Received late SYN for stream {stream_id} " + f"for peer {self.peer_id}" + ) + if flags & FLAG_ACK: + logging.debug( + f"Received ACK for stream {stream_id} " + f"for peer {self.peer_id}" + ) + if flags & FLAG_FIN: + logging.debug( + f"Received FIN for stream {self.peer_id}:" + f"{stream_id}, marking recv_closed" + ) + self.streams[stream_id].recv_closed = True + if self.streams[stream_id].send_closed: + self.streams[stream_id].closed = True + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + if flags & FLAG_RST: + logging.debug( + f"Resetting stream {stream_id} " + f"for peer {self.peer_id}" + ) + self.streams[stream_id].closed = True + self.streams[stream_id].reset_received = True + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + else: + if flags & FLAG_SYN: + if stream_id not in self.streams: + stream = YamuxStream(stream_id, self, False) + self.streams[stream_id] = stream + # Initialize stream buffer + buffer = bytearray() + if data is not None and len(data) > 0: + buffer.extend(data) + self.stream_buffers[stream_id] = buffer + self.stream_events[stream_id] = trio.Event() + self.stream_events[stream_id].set() + ack_header = struct.pack( + YAMUX_HEADER_FORMAT, + 0, + TYPE_DATA, + FLAG_ACK, + stream_id, + 0, + ) + await self.secured_conn.write(ack_header) + logging.debug( + f"Sending stream {stream_id}" + f"to channel for peer {self.peer_id}" + ) + await self.new_stream_send_channel.send(stream) + else: + rst_header = struct.pack( + YAMUX_HEADER_FORMAT, + 0, + TYPE_DATA, + FLAG_RST, + stream_id, + 0, + ) + await self.secured_conn.write(rst_header) + else: + logging.warning( + f"Received data for unknown stream {stream_id} " + f"from peer {self.peer_id} (length={length})" + ) except Exception as e: # Special handling for expected IncompleteReadError on stream close if isinstance(e, IncompleteReadError): @@ -677,7 +712,7 @@ async def handle_incoming(self) -> None: and details.get("requested_count") == 2 and details.get("received_count") == 0 ): - logger.info( + logging.info( f"Stream closed cleanly for peer {self.peer_id}" + f" (IncompleteReadError: {details})" ) @@ -685,40 +720,21 @@ async def handle_incoming(self) -> None: await self._cleanup_on_error() break else: - logger.error( + logging.error( f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" + f"{type(e).__name__}: {str(e)}" ) else: - # Handle RawConnError with more nuance - if isinstance(e, RawConnError): - error_msg = str(e) - # If RawConnError is empty, it's likely normal cleanup - if not error_msg.strip(): - logger.info( - f"RawConnError (empty) during cleanup for peer " - f"{self.peer_id} (normal connection shutdown)" - ) - else: - # Log non-empty RawConnError as warning - logger.warning( - f"RawConnError during connection handling for peer " - f"{self.peer_id}: {error_msg}" - ) - else: - # Log all other errors normally - logger.error( - f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" - ) + logging.error( + f"Error in handle_incoming for peer {self.peer_id}: " + f"{type(e).__name__}: {str(e)}" + ) # Don't crash the whole connection for temporary errors if self.event_shutting_down.is_set() or isinstance( e, (RawConnError, OSError) ): await self._cleanup_on_error() break - # For other errors, log and continue - await trio.sleep(0.01) async def _cleanup_on_error(self) -> None: # Set shutdown flag first to prevent other operations @@ -760,6 +776,6 @@ async def _cleanup_on_error(self) -> None: except Exception as callback_error: logger.error(f"Error in on_close callback: {callback_error}") - # Cancel nursery tasks + # Cancel nursery tasks if available if self._nursery: self._nursery.cancel_scope.cancel() diff --git a/pyproject.toml b/pyproject.toml index 7f08697e4..0574fd830 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "trio-typing>=0.0.4", "trio>=0.26.0", "fastecdsa==2.3.2; sys_platform != 'win32'", + "cryptography>=42.0.0; sys_platform == 'win32'", # Alternative for Windows "zeroconf (>=0.147.0,<0.148.0)", ] classifiers = [ @@ -69,6 +70,7 @@ dev = [ "tox>=4.0.0", "twine", "wheel", + "redis", "setuptools>=42", "sphinx>=6.0.0", "sphinx_rtd_theme>=1.0.0", @@ -80,12 +82,14 @@ dev = [ "factory-boy>=2.12.0,<3.0.0", "ruff>=0.11.10", "pyrefly (>=0.17.1,<0.18.0)", + "pytest-timeout" ] docs = [ "sphinx>=6.0.0", "sphinx_rtd_theme>=1.0.0", "towncrier>=24,<25", "tomli; python_version < '3.11'", + "redis" ] test = [ "p2pclient==0.2.0", @@ -93,6 +97,7 @@ test = [ "pytest-xdist>=2.4.0", "pytest-trio>=0.5.2", "factory-boy>=2.12.0,<3.0.0", + "pytest-timeout", ] [tool.setuptools] diff --git a/tests/core/stream_muxer/test_yamux.py b/tests/core/stream_muxer/test_yamux.py index 444288518..7198549f7 100644 --- a/tests/core/stream_muxer/test_yamux.py +++ b/tests/core/stream_muxer/test_yamux.py @@ -51,6 +51,11 @@ async def read(self, n: int | None = None) -> bytes: data = await self.receive_stream.receive_some(n) logging.debug(f"Read {len(data)} bytes") return data + # Raise IncompleteReadError on timeout to simulate connection closed + logging.debug("Read timed out after 2 seconds, raising IncompleteReadError") + from libp2p.io.exceptions import IncompleteReadError + + raise IncompleteReadError({"requested_count": n, "received_count": 0}) async def close(self) -> None: logging.debug("Closing stream") diff --git a/tests/core/stream_muxer/test_yamux_interleaving.py b/tests/core/stream_muxer/test_yamux_interleaving.py index 1ce629523..89f4b7b87 100644 --- a/tests/core/stream_muxer/test_yamux_interleaving.py +++ b/tests/core/stream_muxer/test_yamux_interleaving.py @@ -21,6 +21,9 @@ YamuxStream, ) +# Configure logger for this test module +logger = logging.getLogger(__name__) + class TrioStreamAdapter(IRawConnection): """Adapter to make trio memory streams work with libp2p.""" @@ -31,21 +34,26 @@ def __init__(self, send_stream, receive_stream, is_initiator=False): self.is_initiator = is_initiator async def write(self, data: bytes) -> None: - logging.debug(f"Attempting to write {len(data)} bytes") + logger.debug(f"Attempting to write {len(data)} bytes") with trio.move_on_after(2): await self.send_stream.send_all(data) async def read(self, n: int | None = None) -> bytes: if n is None or n <= 0: raise ValueError("Reading unbounded or zero bytes not supported") - logging.debug(f"Attempting to read {n} bytes") + logger.debug(f"Attempting to read {n} bytes") with trio.move_on_after(2): data = await self.receive_stream.receive_some(n) - logging.debug(f"Read {len(data)} bytes") + logger.debug(f"Read {len(data)} bytes") return data + # Raise IncompleteReadError on timeout to simulate connection closed + logger.debug("Read timed out after 2 seconds, raising IncompleteReadError") + from libp2p.io.exceptions import IncompleteReadError + + raise IncompleteReadError({"requested_count": n, "received_count": 0}) async def close(self) -> None: - logging.debug("Closing stream") + logger.debug("Closing stream") await self.send_stream.aclose() await self.receive_stream.aclose() @@ -67,7 +75,7 @@ def peer_id(key_pair): @pytest.fixture async def secure_conn_pair(key_pair, peer_id): """Create a pair of secure connections for testing.""" - logging.debug("Setting up secure_conn_pair") + logger.debug("Setting up secure_conn_pair") client_send, server_receive = memory_stream_pair() server_send, client_receive = memory_stream_pair() @@ -79,13 +87,13 @@ async def secure_conn_pair(key_pair, peer_id): async def run_outbound(nursery_results): with trio.move_on_after(5): client_conn = await insecure_transport.secure_outbound(client_rw, peer_id) - logging.debug("Outbound handshake complete") + logger.debug("Outbound handshake complete") nursery_results["client"] = client_conn async def run_inbound(nursery_results): with trio.move_on_after(5): server_conn = await insecure_transport.secure_inbound(server_rw) - logging.debug("Inbound handshake complete") + logger.debug("Inbound handshake complete") nursery_results["server"] = server_conn nursery_results = {} @@ -100,14 +108,14 @@ async def run_inbound(nursery_results): if client_conn is None or server_conn is None: raise RuntimeError("Handshake failed: client_conn or server_conn is None") - logging.debug("secure_conn_pair setup complete") + logger.debug("secure_conn_pair setup complete") return client_conn, server_conn @pytest.fixture async def yamux_pair(secure_conn_pair, peer_id): """Create a pair of Yamux multiplexers for testing.""" - logging.debug("Setting up yamux_pair") + logger.debug("Setting up yamux_pair") client_conn, server_conn = secure_conn_pair client_yamux = Yamux(client_conn, peer_id, is_initiator=True) server_yamux = Yamux(server_conn, peer_id, is_initiator=False) @@ -116,9 +124,9 @@ async def yamux_pair(secure_conn_pair, peer_id): nursery.start_soon(client_yamux.start) nursery.start_soon(server_yamux.start) await trio.sleep(0.1) - logging.debug("yamux_pair started") + logger.debug("yamux_pair started") yield client_yamux, server_yamux - logging.debug("yamux_pair cleanup") + logger.debug("yamux_pair cleanup") @pytest.mark.trio diff --git a/tests/core/stream_muxer/test_yamux_interleaving_EOF.py b/tests/core/stream_muxer/test_yamux_interleaving_EOF.py index 23d2c2b4c..5ea2a94ac 100644 --- a/tests/core/stream_muxer/test_yamux_interleaving_EOF.py +++ b/tests/core/stream_muxer/test_yamux_interleaving_EOF.py @@ -44,6 +44,11 @@ async def read(self, n: int | None = None) -> bytes: data = await self.receive_stream.receive_some(n) logging.debug(f"Read {len(data)} bytes") return data + # Raise IncompleteReadError on timeout to simulate connection closed + logging.debug("Read timed out after 2 seconds, raising IncompleteReadError") + from libp2p.io.exceptions import IncompleteReadError + + raise IncompleteReadError({"requested_count": n, "received_count": 0}) async def close(self) -> None: logging.debug("Closing stream") diff --git a/tox.ini b/tox.ini index 44f74bab5..85d4df60b 100644 --- a/tox.ini +++ b/tox.ini @@ -63,13 +63,14 @@ deps= wheel build[virtualenv] allowlist_externals= - bash.exe + cmd.exe commands= python --version python -m pip install --upgrade pip - bash.exe -c "rm -rf build dist" + cmd.exe /c "if exist build rd /s /q build" + cmd.exe /c "if exist dist rd /s /q dist" python -m build - bash.exe -c 'python -m pip install --upgrade "$(ls dist/libp2p-*-py3-none-any.whl)" --progress-bar off' + cmd.exe /c "for %i in (dist\libp2p-*-py3-none-any.whl) do python -m pip install --upgrade "%i" --progress-bar off" python -c "import libp2p" skip_install=true