diff --git a/examples/ping/ping.py b/examples/ping/ping.py index 535d8473f..1985487b8 100644 --- a/examples/ping/ping.py +++ b/examples/ping/ping.py @@ -1,9 +1,14 @@ import argparse +import logging +from cryptography.hazmat.primitives.asymmetric import ( + x25519, +) import multiaddr import trio from libp2p import ( + generate_new_rsa_identity, new_host, ) from libp2p.custom_types import ( @@ -15,107 +20,409 @@ from libp2p.peer.peerinfo import ( info_from_p2p_addr, ) +from libp2p.security.noise.transport import Transport as NoiseTransport +from libp2p.stream_muxer.yamux.yamux import ( + Yamux, +) +from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + +# Standard libp2p ping protocol - this is what rust-libp2p uses by default PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") PING_LENGTH = 32 RESP_TIMEOUT = 60 async def handle_ping(stream: INetStream) -> None: - while True: + """Handle incoming ping requests from rust-libp2p clients""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] New ping stream opened by {peer_id}") + logging.info(f"Ping handler called for peer {peer_id}") + + ping_count = 0 + + try: + while True: + try: + print(f"[INFO] Waiting for ping data from {peer_id}...") + logging.debug(f"Stream state: {stream}") + data = await stream.read(PING_LENGTH) + + if not data: + print( + f"[INFO] No data received," + f" connection likely closed by {peer_id}" + ) + logging.debug("No data received, stream closed") + break + + if len(data) == 0: + print(f"[INFO] Empty data received, connection closed by {peer_id}") + logging.debug("Empty data received") + break + + ping_count += 1 + print( + f"[PING {ping_count}] Received ping from {peer_id}:" + f" {len(data)} bytes" + ) + logging.debug(f"Ping data: {data.hex()}") + + # Echo the data back (this is what ping protocol does) + await stream.write(data) + print(f"[PING {ping_count}] Echoed ping back to {peer_id}") + + except Exception as e: + print(f"[ERROR] Error in ping loop with {peer_id}: {e}") + logging.exception("Ping loop error") + break + + except Exception as e: + print(f"[ERROR] Error handling ping from {peer_id}: {e}") + logging.exception("Ping handler error") + finally: try: - payload = await stream.read(PING_LENGTH) - peer_id = stream.muxed_conn.peer_id - if payload is not None: - print(f"received ping from {peer_id}") + print(f"[INFO] Closing ping stream with {peer_id}") + await stream.close() + except Exception as e: + logging.debug(f"Error closing stream: {e}") + + print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") - await stream.write(payload) - print(f"responded with pong to {peer_id}") - except Exception: - await stream.reset() - break +async def send_ping_sequence(stream: INetStream, count: int = 5) -> None: + """Send a sequence of pings compatible with rust-libp2p.""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") + + import os + import time + + rtts = [] + + for i in range(1, count + 1): + try: + # Generate random 32-byte payload as per ping protocol spec + payload = os.urandom(PING_LENGTH) + print(f"[PING {i}/{count}] Sending ping to {peer_id}") + logging.debug(f"Sending payload: {payload.hex()}") + start_time = time.time() + await stream.write(payload) -async def send_ping(stream: INetStream) -> None: + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + end_time = time.time() + rtt = (end_time - start_time) * 1000 + + if ( + response + and len(response) >= PING_LENGTH + and response[:PING_LENGTH] == payload + ): + rtts.append(rtt) + print(f"[PING {i}] Successful! RTT: {rtt:.2f}ms") + else: + print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") + if response: + logging.debug(f"Expected: {payload.hex()}") + logging.debug(f"Received: {response.hex()}") + + if i < count: + await trio.sleep(1) + + except trio.TooSlowError: + print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") + except Exception as e: + print(f"[ERROR] Ping {i} failed: {e}") + logging.exception(f"Ping {i} error") + + # Print statistics + if rtts: + avg_rtt = sum(rtts) / len(rtts) + min_rtt = min(rtts) + max_rtt = max(rtts) # Fixed typo: was max_rtts + success_count = len(rtts) + loss_rate = ((count - success_count) / count) * 100 + + print("\n[STATS] Ping Statistics:") + print( + f" Packets: Sent={count}, Received={success_count}," + f" Lost={count - success_count}" + ) + print(f" Loss rate: {loss_rate:.1f}%") + print( + f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms," f" max={max_rtt:.2f}ms" + ) + else: + print(f"\n[STATS] All pings failed ({count} attempts)") + + +def create_noise_keypair(): + """Create a Noise protocol keypair for secure communication""" try: - payload = b"\x01" * PING_LENGTH - print(f"sending ping to {stream.muxed_conn.peer_id}") + x25519_private_key = x25519.X25519PrivateKey.generate() + + class NoisePrivateKey: + def __init__(self, key): + self._key = key - await stream.write(payload) + def to_bytes(self): + return self._key.private_bytes_raw() - with trio.fail_after(RESP_TIMEOUT): - response = await stream.read(PING_LENGTH) + def public_key(self): + return NoisePublicKey(self._key.public_key()) - if response == payload: - print(f"received pong from {stream.muxed_conn.peer_id}") + def get_public_key(self): + return NoisePublicKey(self._key.public_key()) + class NoisePublicKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.public_bytes_raw() + + return NoisePrivateKey(x25519_private_key) except Exception as e: - print(f"error occurred : {e}") + logging.error(f"Failed to create Noise keypair: {e}") + return None -async def run(port: int, destination: str) -> None: - localhost_ip = "127.0.0.1" +async def run_server(port: int) -> None: + """Run ping server that accepts connections from rust-libp2p clients.""" listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - host = new_host() - async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: - if not destination: - host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + if not noise_privkey: + print("[ERROR] Failed to create Noise keypair") + return + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + + print("[INFO] Starting py-libp2p ping server...") + + async with host.run(listen_addrs=[listen_addr]): + print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") + host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + + # Also register alternative protocol IDs for better compatibility + alt_protocols = [ + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] - print( - "Run this from the same folder in another console:\n\n" - f"ping-demo -p {int(port) + 1} " - f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}\n" - ) - print("Waiting for incoming connection...") + for alt_proto in alt_protocols: + print(f"[INFO] Also registering handler for: {alt_proto}") + host.set_stream_handler(alt_proto, handle_ping) - else: + print("[INFO] Server started successfully!") + print(f"[INFO] Peer ID: {host.get_id()}") + print(f"[INFO] Listening: /ip4/0.0.0.0/tcp/{port}") + print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") + print("[INFO] Security: Noise encryption") + print("[INFO] Muxer: Yamux stream multiplexing") + + print("\n[INFO] Registered protocols:") + print(f" - {PING_PROTOCOL_ID}") + for proto in alt_protocols: + print(f" - {proto}") + + peer_id = host.get_id() + print("\n[TEST] Test with rust-libp2p:") + print(f" cargo run -- /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") + + print("\n[TEST] Test with py-libp2p:") + print(f" python ping.py client /ip4/127.0.0.1/tcp/{port}/p2p/{peer_id}") + + print("\n[INFO] Waiting for connections...") + print("Press Ctrl+C to exit") + + await trio.sleep_forever() + + +async def run_client(destination: str, count: int = 5) -> None: + """Run ping client to test connectivity with another peer.""" + listen_addr = multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") + + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") + + noise_privkey = create_noise_keypair() + if not noise_privkey: + print("[ERROR] Failed to create Noise keypair") + return 1 + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey=noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) + + print("[INFO] Starting py-libp2p ping client...") + + async with host.run(listen_addrs=[listen_addr]): + print(f"[INFO] Our Peer ID: {host.get_id()}") + print(f"[INFO] Target: {destination}") + print("[INFO] Security: Noise encryption") + print("[INFO] Muxer: Yamux stream multiplexing") + + try: maddr = multiaddr.Multiaddr(destination) info = info_from_p2p_addr(maddr) + target_peer_id = info.peer_id + + print(f"[INFO] Target Peer ID: {target_peer_id}") + print("[INFO] Connecting to peer...") + await host.connect(info) - stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) + print("[INFO] Connection established!") - nursery.start_soon(send_ping, stream) + # Try protocols in order of preference + # Start with the standard libp2p ping protocol + protocols_to_try = [ + PING_PROTOCOL_ID, # /ipfs/ping/1.0.0 - standard protocol + TProtocol("/ping/1.0.0"), # Alternative + TProtocol("/libp2p/ping/1.0.0"), # Another alternative + ] - return + stream = None - await trio.sleep_forever() + for proto in protocols_to_try: + try: + print(f"[INFO] Trying to open stream with protocol: {proto}") + stream = await host.new_stream(target_peer_id, [proto]) + print(f"[INFO] Stream opened with protocol: {proto}") + break + except Exception as e: + print(f"[ERROR] Failed to open stream with {proto}: {e}") + logging.debug(f"Protocol {proto} failed: {e}") + continue + + if not stream: + print("[ERROR] Failed to open stream with any ping protocol") + print("[ERROR] Ensure the target peer supports one of these protocols:") + for proto in protocols_to_try: + print(f"[ERROR] - {proto}") + return 1 + + await send_ping_sequence(stream, count) + + await stream.close() + print("[INFO] Stream closed successfully") + + except Exception as e: + print(f"[ERROR] Client error: {e}") + logging.exception("Client error") + import traceback + + traceback.print_exc() + return 1 + + print("\n[INFO] Client stopped") + return 0 def main() -> None: + """Main function with argument parsing.""" description = """ - This program demonstrates a simple p2p ping application using libp2p. - To use it, first run 'python ping.py -p ', where is the port number. - Then, run another instance with 'python ping.py -p -d ', - where is the multiaddress of the previous listener host. + py-libp2p ping tool for interoperability testing with rust-libp2p. + Uses Noise encryption and Yamux multiplexing for compatibility. + + Server mode: Listens for ping requests from rust-libp2p or py-libp2p clients. + Client mode: Sends ping requests to rust-libp2p or py-libp2p servers. + + The tool implements the standard libp2p ping protocol (/ipfs/ping/1.0.0) + which exchanges 32-byte random payloads and measures round-trip time. """ example_maddr = ( "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" ) - parser = argparse.ArgumentParser(description=description) + parser = argparse.ArgumentParser( + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Examples: + python ping.py server # Start server on port 8000 + python ping.py server --port 9000 # Start server on port 9000 + python ping.py client {example_maddr} + python ping.py client {example_maddr} --count 10 - parser.add_argument( - "-p", "--port", default=8000, type=int, help="source port number" +Protocols supported: + - /ipfs/ping/1.0.0 (primary, rust-libp2p default) + - /ping/1.0.0 (alternative) + - /libp2p/ping/1.0.0 (alternative) + """, ) - parser.add_argument( - "-d", - "--destination", - type=str, - help=f"destination multiaddr string, e.g. {example_maddr}", + + subparsers = parser.add_subparsers(dest="mode", help="Operation mode") + + server_parser = subparsers.add_parser("server", help="Run as ping server") + server_parser.add_argument( + "--port", "-p", type=int, default=8000, help="Port to listen on (default: 8000)" + ) + + client_parser = subparsers.add_parser("client", help="Run as ping client") + client_parser.add_argument("destination", help="Target peer multiaddr") + client_parser.add_argument( + "--count", + "-c", + type=int, + default=5, + help="Number of pings to send (default: 5)", ) + args = parser.parse_args() - if not args.port: - raise RuntimeError("failed to determine local port") + if not args.mode: + parser.print_help() + return 1 try: - trio.run(run, *(args.port, args.destination)) + if args.mode == "server": + trio.run(run_server, args.port) + elif args.mode == "client": + return trio.run(run_client, args.destination, args.count) except KeyboardInterrupt: - pass + print("\n[INFO] Goodbye!") + return 0 + except Exception as e: + print(f"[ERROR] Fatal error: {e}") + logging.exception("Fatal error") + import traceback + + traceback.print_exc() + return 1 + + return 0 if __name__ == "__main__": - main() + exit(main()) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 240143217..9ed93304c 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -12,6 +12,7 @@ ) import multiaddr +import trio from libp2p.abc import ( IHost, @@ -34,6 +35,9 @@ from libp2p.host.exceptions import ( StreamFailure, ) +from libp2p.network.stream.net_stream import ( + NetStream, +) from libp2p.peer.id import ( ID, ) @@ -171,26 +175,128 @@ def set_stream_handler( :param stream_handler: a stream handler function """ self.multiselect.add_handler(protocol_id, stream_handler) + + async def _is_py_libp2p_peer(self, peer_id: ID) -> bool: + """ + Simplified detection using connection patterns + """ + peer_id_str = str(peer_id) + + if not hasattr(self, '_peer_type_cache'): + self._peer_type_cache = {} + self._rust_peer_attempts = {} + + # Check if we already know this peer type + if peer_id_str in self._peer_type_cache: + cached_result = self._peer_type_cache[peer_id_str] + + # If it's a rust peer, handle the initial instability + if not cached_result: # rust peer + if peer_id_str not in self._rust_peer_attempts: + self._rust_peer_attempts[peer_id_str] = 0 + self._rust_peer_attempts[peer_id_str] += 1 + + # For first 4 attempts with rust peers, keep using rust logic + if self._rust_peer_attempts[peer_id_str] <= 4: + logger.debug(f"Rust peer {peer_id}, attempt {self._rust_peer_attempts[peer_id_str]}") + + return cached_result + + # First time seeing this peer - detect type + try: + connection = self._network.connections.get(peer_id) + if not connection: + return False + + # Quick detection: try to accept a stream immediately + muxed_conn = connection.muxed_conn + + with trio.move_on_after(0.1): + try: + test_stream = await muxed_conn.accept_stream() + await test_stream.close() + # If we can accept immediately, it's rust + self._peer_type_cache[peer_id_str] = False + logger.debug(f"Detected {peer_id} as rust-libp2p (immediate stream)") + return False + except Exception: + pass + + # If no immediate stream, likely py-libp2p + self._peer_type_cache[peer_id_str] = True + logger.debug(f"Detected {peer_id} as py-libp2p (no immediate stream)") + return True + + except Exception as e: + logger.debug(f"Detection failed for {peer_id}: {e}, assuming py-libp2p") + self._peer_type_cache[peer_id_str] = True + return True async def new_stream( self, peer_id: ID, protocol_ids: Sequence[TProtocol] - ) -> INetStream: + ) -> NetStream: """ - :param peer_id: peer_id that host is connecting + Create a new stream to the peer_id with one of the given protocol_ids. + :param peer_id: peer_id to connect to :param protocol_ids: available protocol ids to use for stream :return: stream: new stream created """ - net_stream = await self._network.new_stream(peer_id) - - # Perform protocol muxing to determine protocol to use - try: - selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), MultiselectCommunicator(net_stream) - ) - except MultiselectClientError as error: - logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) - await net_stream.reset() - raise StreamFailure(f"failed to open a stream to peer {peer_id}") from error + # Check if the peer is using py-libp2p (based on peer ID or connection context) + is_py_libp2p_peer = await self._is_py_libp2p_peer(peer_id) + + if is_py_libp2p_peer: + # Use single-stream model for py-libp2p to py-libp2p + net_stream = await self._network.new_stream(peer_id) + try: + selected_protocol = await self.multiselect_client.select_one_of( + list(protocol_ids), MultiselectCommunicator(net_stream) + ) + except MultiselectClientError as error: + logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) + await net_stream.reset() + raise StreamFailure(f"failed to open a stream to peer {peer_id}") from error + else: + # Use dual-stream model for rust-libp2p compatibility + outgoing_stream = None + incoming_stream = None + net_stream = None + + try: + outgoing_stream = await self._network.new_stream(peer_id) + connection = self._network.connections[peer_id] + incoming_stream = await connection.muxed_conn.accept_stream() + net_stream = NetStream(incoming_stream) + + # Perform protocol negotiation on the incoming stream + with trio.fail_after(10): # 10 second timeout for protocol negotiation + selected_protocol = await self.multiselect_client.select_one_of( + list(protocol_ids), MultiselectCommunicator(net_stream) + ) + + # Only close the outgoing stream after successful negotiation + await outgoing_stream.close() + + except (MultiselectClientError, Exception) as error: + logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) + + # Clean up all streams on error + if net_stream: + try: + await net_stream.reset() + except: + pass + if incoming_stream and incoming_stream != net_stream: + try: + await incoming_stream.reset() + except: + pass + if outgoing_stream: + try: + await outgoing_stream.reset() + except: + pass + + raise StreamFailure(f"failed to open a stream to peer {peer_id}") from error net_stream.set_protocol(selected_protocol) return net_stream diff --git a/libp2p/io/utils.py b/libp2p/io/utils.py index 8f873ea08..b9fd135ee 100644 --- a/libp2p/io/utils.py +++ b/libp2p/io/utils.py @@ -16,10 +16,28 @@ async def read_exactly( """ data = await reader.read(n) + # If we get 0 bytes on first read and requested > 0, it's likely a closed stream + if len(data) == 0 and n > 0: + raise IncompleteReadError( + {"requested_count": n, "received_count": 0, "received_data": data} + ) + for _ in range(retry_count): if len(data) < n: remaining = n - len(data) - data += await reader.read(remaining) + chunk = await reader.read(remaining) + # If we get no more data, the stream is closed + if len(chunk) == 0: + raise IncompleteReadError( + { + "requested_count": n, + "received_count": len(data), + "received_data": data, + } + ) + data += chunk else: return data - raise IncompleteReadError({"requested_count": n, "received_count": len(data)}) + raise IncompleteReadError( + {"requested_count": n, "received_count": len(data), "received_data": data} + ) diff --git a/libp2p/security/noise/io.py b/libp2p/security/noise/io.py index f9a0260be..c85d9d583 100644 --- a/libp2p/security/noise/io.py +++ b/libp2p/security/noise/io.py @@ -58,11 +58,25 @@ async def write_msg(self, data: bytes, prefix_encoded: bool = False) -> None: await self.read_writer.write_msg(data_encrypted) async def read_msg(self, prefix_encoded: bool = False) -> bytes: - noise_msg_encrypted = await self.read_writer.read_msg() - if prefix_encoded: - return self.decrypt(noise_msg_encrypted[len(self.prefix) :]) - else: - return self.decrypt(noise_msg_encrypted) + try: + noise_msg_encrypted = await self.read_writer.read_msg() + if prefix_encoded: + return self.decrypt(noise_msg_encrypted[len(self.prefix) :]) + else: + return self.decrypt(noise_msg_encrypted) + except Exception as e: + # Handle stream closure gracefully for interoperability + from libp2p.io.exceptions import ( + IncompleteReadError, + ) + + if isinstance(e, IncompleteReadError): + # Re-raise with context about Noise layer + details = getattr(e, "args", [{}])[0] + if isinstance(details, dict): + details["noise_layer"] = True + raise e + raise async def close(self) -> None: await self.read_writer.close() diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index 200d986c4..97e23c56b 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -29,6 +29,9 @@ from libp2p.io.exceptions import ( IncompleteReadError, ) +from libp2p.io.utils import ( + read_exactly, +) from libp2p.network.connection.exceptions import ( RawConnError, ) @@ -455,23 +458,34 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes: async def handle_incoming(self) -> None: while not self.event_shutting_down.is_set(): try: - header = await self.secured_conn.read(HEADER_SIZE) - if not header or len(header) < HEADER_SIZE: + # Try to read the Yamux header (12 bytes) - ensure we get exactly + # HEADER_SIZE bytes + try: + header = await read_exactly(self.secured_conn, HEADER_SIZE) + except IncompleteReadError as e: + details = getattr(e, "args", [{}])[0] if e.args else {} + received = ( + details.get("received_count", 0) + if isinstance(details, dict) + else 0 + ) + requested = ( + details.get("requested_count", 0) + if isinstance(details, dict) + else 0 + ) logging.debug( - f"Connection closed or" - f"incomplete header for peer {self.peer_id}" + f"Connection closed or incomplete header for peer " + f"{self.peer_id} (got {received}/{requested} bytes)" ) self.event_shutting_down.set() await self._cleanup_on_error() break + + # Parse the Yamux header version, typ, flags, stream_id, length = struct.unpack( YAMUX_HEADER_FORMAT, header ) - logging.debug( - f"Received header for peer {self.peer_id}:" - f"type={typ}, flags={flags}, stream_id={stream_id}," - f"length={length}" - ) if typ == TYPE_DATA and flags & FLAG_SYN: async with self.streams_lock: if stream_id not in self.streams: @@ -503,6 +517,48 @@ async def handle_incoming(self) -> None: 0, ) await self.secured_conn.write(rst_header) + + # Read the data payload for SYN+DATA frames + if length > 0: + try: + data = await read_exactly(self.secured_conn, length) + except IncompleteReadError as e: + details = getattr(e, "args", [{}])[0] if e.args else {} + received = ( + details.get("received_count", 0) + if isinstance(details, dict) + else 0 + ) + logging.warning( + f"Expected {length} bytes but got {received} bytes " + f"for SYN+DATA frame from peer {self.peer_id} " + f"stream {stream_id}" + ) + data = b"" # Use empty data to avoid further errors + except Exception as e: + logging.error( + f"Error reading SYN+DATA payload for stream " + f"{stream_id}: {e}" + ) + data = b"" + + async with self.streams_lock: + if stream_id in self.streams: + self.stream_buffers[stream_id].extend(data) + self.stream_events[stream_id].set() + if flags & FLAG_FIN: + logging.debug( + f"Received FIN for SYN stream {self.peer_id}:" + f"{stream_id}, marking recv_closed" + ) + self.streams[stream_id].recv_closed = True + if self.streams[stream_id].send_closed: + self.streams[stream_id].closed = True + else: + logging.warning( + f"Received SYN+DATA for unknown stream {stream_id} " + f"from peer {self.peer_id} (length={length})" + ) elif typ == TYPE_DATA and flags & FLAG_RST: async with self.streams_lock: if stream_id in self.streams: @@ -560,9 +616,37 @@ async def handle_incoming(self) -> None: ) elif typ == TYPE_DATA: try: - data = ( - await self.secured_conn.read(length) if length > 0 else b"" - ) + if length > 0: + try: + data = await read_exactly(self.secured_conn, length) + except IncompleteReadError as e: + details = getattr(e, "args", [{}])[0] if e.args else {} + received = ( + details.get("received_count", 0) + if isinstance(details, dict) + else 0 + ) + received_data = ( + details.get("received_data", b"") + if isinstance(details, dict) + else b"" + ) + logging.warning( + f"Expected {length} bytes but got {received} bytes " + f"for peer {self.peer_id} stream {stream_id}" + ) + # For partial reads, continue with what we got + # or empty data + if received == 0: + logging.error( + f"No data received when expecting " + f"{length} bytes" + ) + continue + data = received_data + else: + data = b"" + async with self.streams_lock: if stream_id in self.streams: self.stream_buffers[stream_id].extend(data) @@ -575,6 +659,11 @@ async def handle_incoming(self) -> None: self.streams[stream_id].recv_closed = True if self.streams[stream_id].send_closed: self.streams[stream_id].closed = True + else: + logging.warning( + f"Received data for unknown stream {stream_id} " + f"from peer {self.peer_id} (length={length})" + ) except Exception as e: logging.error(f"Error reading data for stream {stream_id}: {e}") # Mark stream as closed on read error @@ -591,9 +680,9 @@ async def handle_incoming(self) -> None: stream = self.streams[stream_id] async with stream.window_lock: logging.debug( - f"Received window update for stream" - f"{self.peer_id}:{stream_id}," - f" increment: {increment}" + f"Received window update for stream " + f"{self.peer_id}:{stream_id}, " + f"increment: {increment}" ) stream.send_window += increment except Exception as e: @@ -615,12 +704,12 @@ async def handle_incoming(self) -> None: else: logging.error( f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" + f"{type(e).__name__}: {str(e)}" ) else: logging.error( f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" + f"{type(e).__name__}: {str(e)}" ) # Don't crash the whole connection for temporary errors if self.event_shutting_down.is_set() or isinstance(