Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions libp2p/io/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ class IOException(BaseLibp2pError):
class IncompleteReadError(IOException):
"""Fewer bytes were read than requested."""

def __init__(
self,
message: str,
expected_bytes: int = 0,
received_bytes: int = 0,
) -> None:
super().__init__(message)
self.expected_bytes = expected_bytes
self.received_bytes = received_bytes

@property
def is_clean_close(self) -> bool:
"""Returns True if this represents a clean connection closure."""
return self.received_bytes == 0


class MsgioException(IOException):
pass
Expand Down
4 changes: 3 additions & 1 deletion libp2p/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,7 @@ async def read_exactly(

raise IncompleteReadError(
f"Connection closed during read operation: expected {n} bytes but "
f"received {len(buffer)} bytes{context_info}"
f"received {len(buffer)} bytes{context_info}",
expected_bytes=n,
received_bytes=len(buffer),
)
66 changes: 37 additions & 29 deletions libp2p/stream_muxer/yamux/yamux.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,22 +647,36 @@ async def handle_incoming(self) -> None:
try:
header = await read_exactly(self.secured_conn, HEADER_SIZE)
except IncompleteReadError as e:
# Get transport context for better debugging
transport_type = "unknown"
try:
if hasattr(self.secured_conn, "conn_state"):
conn_state_method = getattr(self.secured_conn, "conn_state")
if callable(conn_state_method):
state = conn_state_method()
if isinstance(state, dict):
transport_type = state.get("transport", "unknown")
except Exception:
pass

logger.error(
f"Yamux connection closed during header read for peer "
f"{self.peer_id}: {e}. Transport: {transport_type}."
)
# Check if this is a clean connection closure (0 bytes received)
# This happens when the peer closes the connection gracefully
# after completing their operations (e.g., after ping/pong)
if e.is_clean_close:
# Clean connection closure - this is normal when peer
# disconnects after completing protocol exchange
logger.info(
f"Yamux connection closed cleanly by peer {self.peer_id}"
)
else:
# Unexpected partial read - log as warning
transport_type = "unknown"
try:
if hasattr(self.secured_conn, "conn_state"):
conn_state_method = getattr(
self.secured_conn, "conn_state"
)
if callable(conn_state_method):
state = conn_state_method()
if isinstance(state, dict):
transport_type = state.get(
"transport", "unknown"
)
except Exception:
pass
logger.warning(
f"Yamux connection closed unexpectedly for peer "
f"{self.peer_id}: {e}. Transport: {transport_type}."
)

self.event_shutting_down.set()
await self._cleanup_on_error()
break
Expand Down Expand Up @@ -927,26 +941,20 @@ async def handle_incoming(self) -> None:
self.stream_events[stream_id].set()
except Exception as e:
# Special handling for expected IncompleteReadError on stream close
# This occurs when the connection closes while reading the header
# (12 bytes)
# This occurs when the connection closes while reading
if isinstance(e, IncompleteReadError):
details = getattr(e, "args", [{}])[0]
if (
isinstance(details, dict)
and details.get("requested_count") == HEADER_SIZE
and details.get("received_count") == 0
):
if e.is_clean_close:
logger.info(
f"Stream closed cleanly for peer {self.peer_id}"
+ f" (IncompleteReadError: {details})"
f"Yamux connection closed cleanly for peer {self.peer_id}"
)
self.event_shutting_down.set()
await self._cleanup_on_error()
break
else:
logger.error(
f"Error in handle_incoming for peer {self.peer_id}: "
+ f"{type(e).__name__}: {str(e)}"
# Partial read - log as warning, not error
logger.warning(
f"Incomplete read in handle_incoming for peer "
f"{self.peer_id}: {e}"
)
else:
# Handle RawConnError with more nuance
Expand Down
1 change: 1 addition & 0 deletions newsfragments/1084.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed yamux listener incorrectly logging errors when peers close connections gracefully after completing protocol exchanges. Clean connection closures (0 bytes received) are now logged at INFO level instead of ERROR level.
42 changes: 42 additions & 0 deletions tests/core/stream_muxer/test_yamux.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,3 +773,45 @@ async def test_yamux_syn_with_large_data(yamux_pair):
assert received == test_data
assert len(received) == 1024
logging.debug("test_yamux_syn_with_large_data complete")


@pytest.mark.trio
async def test_incomplete_read_error_clean_close_detection():
"""
Test that IncompleteReadError correctly identifies clean connection closures.

This verifies the fix for issue #1084 where yamux listener incorrectly
logged clean peer disconnections as errors. Clean closures (0 bytes received)
should be detected via the is_clean_close property.
"""
from libp2p.io.exceptions import IncompleteReadError

# Test clean closure (0 bytes received)
clean_error = IncompleteReadError(
"Connection closed during read operation: expected 2 bytes but "
"received 0 bytes",
expected_bytes=2,
received_bytes=0,
)
assert clean_error.is_clean_close, "Should detect clean closure (0 bytes)"
assert clean_error.expected_bytes == 2
assert clean_error.received_bytes == 0

# Test partial read (not clean closure)
partial_error = IncompleteReadError(
"Connection closed during read operation: expected 12 bytes but "
"received 5 bytes",
expected_bytes=12,
received_bytes=5,
)
assert not partial_error.is_clean_close, "Partial read should not be clean closure"
assert partial_error.expected_bytes == 12
assert partial_error.received_bytes == 5

# Test default values (backward compatibility)
legacy_error = IncompleteReadError("Some error message")
assert legacy_error.is_clean_close, "Default 0 bytes should be clean closure"
assert legacy_error.expected_bytes == 0
assert legacy_error.received_bytes == 0

logging.debug("test_incomplete_read_error_clean_close_detection complete")
Loading