diff --git a/libp2p/io/exceptions.py b/libp2p/io/exceptions.py index 1ba3da224..a8500d3c8 100644 --- a/libp2p/io/exceptions.py +++ b/libp2p/io/exceptions.py @@ -10,6 +10,21 @@ class IOException(BaseLibp2pError): class IncompleteReadError(IOException): """Fewer bytes were read than requested.""" + def __init__( + self, + message: str, + expected_bytes: int = 0, + received_bytes: int = 0, + ) -> None: + super().__init__(message) + self.expected_bytes = expected_bytes + self.received_bytes = received_bytes + + @property + def is_clean_close(self) -> bool: + """Returns True if this represents a clean connection closure.""" + return self.received_bytes == 0 + class MsgioException(IOException): pass diff --git a/libp2p/io/utils.py b/libp2p/io/utils.py index 1bb98d1e0..1e8ddbbbd 100644 --- a/libp2p/io/utils.py +++ b/libp2p/io/utils.py @@ -73,5 +73,7 @@ async def read_exactly( raise IncompleteReadError( f"Connection closed during read operation: expected {n} bytes but " - f"received {len(buffer)} bytes{context_info}" + f"received {len(buffer)} bytes{context_info}", + expected_bytes=n, + received_bytes=len(buffer), ) diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index 83e2bc32c..a489758a6 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -647,22 +647,36 @@ async def handle_incoming(self) -> None: try: header = await read_exactly(self.secured_conn, HEADER_SIZE) except IncompleteReadError as e: - # Get transport context for better debugging - transport_type = "unknown" - try: - if hasattr(self.secured_conn, "conn_state"): - conn_state_method = getattr(self.secured_conn, "conn_state") - if callable(conn_state_method): - state = conn_state_method() - if isinstance(state, dict): - transport_type = state.get("transport", "unknown") - except Exception: - pass - - logger.error( - f"Yamux connection closed during header read for peer " - f"{self.peer_id}: {e}. Transport: {transport_type}." - ) + # Check if this is a clean connection closure (0 bytes received) + # This happens when the peer closes the connection gracefully + # after completing their operations (e.g., after ping/pong) + if e.is_clean_close: + # Clean connection closure - this is normal when peer + # disconnects after completing protocol exchange + logger.info( + f"Yamux connection closed cleanly by peer {self.peer_id}" + ) + else: + # Unexpected partial read - log as warning + transport_type = "unknown" + try: + if hasattr(self.secured_conn, "conn_state"): + conn_state_method = getattr( + self.secured_conn, "conn_state" + ) + if callable(conn_state_method): + state = conn_state_method() + if isinstance(state, dict): + transport_type = state.get( + "transport", "unknown" + ) + except Exception: + pass + logger.warning( + f"Yamux connection closed unexpectedly for peer " + f"{self.peer_id}: {e}. Transport: {transport_type}." + ) + self.event_shutting_down.set() await self._cleanup_on_error() break @@ -927,26 +941,20 @@ async def handle_incoming(self) -> None: self.stream_events[stream_id].set() except Exception as e: # Special handling for expected IncompleteReadError on stream close - # This occurs when the connection closes while reading the header - # (12 bytes) + # This occurs when the connection closes while reading if isinstance(e, IncompleteReadError): - details = getattr(e, "args", [{}])[0] - if ( - isinstance(details, dict) - and details.get("requested_count") == HEADER_SIZE - and details.get("received_count") == 0 - ): + if e.is_clean_close: logger.info( - f"Stream closed cleanly for peer {self.peer_id}" - + f" (IncompleteReadError: {details})" + f"Yamux connection closed cleanly for peer {self.peer_id}" ) self.event_shutting_down.set() await self._cleanup_on_error() break else: - logger.error( - f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" + # Partial read - log as warning, not error + logger.warning( + f"Incomplete read in handle_incoming for peer " + f"{self.peer_id}: {e}" ) else: # Handle RawConnError with more nuance diff --git a/newsfragments/1084.bugfix.rst b/newsfragments/1084.bugfix.rst new file mode 100644 index 000000000..c6419b8ad --- /dev/null +++ b/newsfragments/1084.bugfix.rst @@ -0,0 +1 @@ +Fixed yamux listener incorrectly logging errors when peers close connections gracefully after completing protocol exchanges. Clean connection closures (0 bytes received) are now logged at INFO level instead of ERROR level. diff --git a/tests/core/stream_muxer/test_yamux.py b/tests/core/stream_muxer/test_yamux.py index e1d24cde9..fec0d7683 100644 --- a/tests/core/stream_muxer/test_yamux.py +++ b/tests/core/stream_muxer/test_yamux.py @@ -773,3 +773,45 @@ async def test_yamux_syn_with_large_data(yamux_pair): assert received == test_data assert len(received) == 1024 logging.debug("test_yamux_syn_with_large_data complete") + + +@pytest.mark.trio +async def test_incomplete_read_error_clean_close_detection(): + """ + Test that IncompleteReadError correctly identifies clean connection closures. + + This verifies the fix for issue #1084 where yamux listener incorrectly + logged clean peer disconnections as errors. Clean closures (0 bytes received) + should be detected via the is_clean_close property. + """ + from libp2p.io.exceptions import IncompleteReadError + + # Test clean closure (0 bytes received) + clean_error = IncompleteReadError( + "Connection closed during read operation: expected 2 bytes but " + "received 0 bytes", + expected_bytes=2, + received_bytes=0, + ) + assert clean_error.is_clean_close, "Should detect clean closure (0 bytes)" + assert clean_error.expected_bytes == 2 + assert clean_error.received_bytes == 0 + + # Test partial read (not clean closure) + partial_error = IncompleteReadError( + "Connection closed during read operation: expected 12 bytes but " + "received 5 bytes", + expected_bytes=12, + received_bytes=5, + ) + assert not partial_error.is_clean_close, "Partial read should not be clean closure" + assert partial_error.expected_bytes == 12 + assert partial_error.received_bytes == 5 + + # Test default values (backward compatibility) + legacy_error = IncompleteReadError("Some error message") + assert legacy_error.is_clean_close, "Default 0 bytes should be clean closure" + assert legacy_error.expected_bytes == 0 + assert legacy_error.received_bytes == 0 + + logging.debug("test_incomplete_read_error_clean_close_detection complete")