Skip to content

Commit f8ec326

Browse files
authored
Fix yamux listener graceful handling of peer connection closure (#1114)
* Fix yamux listener graceful handling of peer connection closure * add test cases and newsfragments
1 parent 35889ad commit f8ec326

File tree

5 files changed

+98
-30
lines changed

5 files changed

+98
-30
lines changed

libp2p/io/exceptions.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,21 @@ class IOException(BaseLibp2pError):
1010
class IncompleteReadError(IOException):
1111
"""Fewer bytes were read than requested."""
1212

13+
def __init__(
14+
self,
15+
message: str,
16+
expected_bytes: int = 0,
17+
received_bytes: int = 0,
18+
) -> None:
19+
super().__init__(message)
20+
self.expected_bytes = expected_bytes
21+
self.received_bytes = received_bytes
22+
23+
@property
24+
def is_clean_close(self) -> bool:
25+
"""Returns True if this represents a clean connection closure."""
26+
return self.received_bytes == 0
27+
1328

1429
class MsgioException(IOException):
1530
pass

libp2p/io/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,7 @@ async def read_exactly(
7373

7474
raise IncompleteReadError(
7575
f"Connection closed during read operation: expected {n} bytes but "
76-
f"received {len(buffer)} bytes{context_info}"
76+
f"received {len(buffer)} bytes{context_info}",
77+
expected_bytes=n,
78+
received_bytes=len(buffer),
7779
)

libp2p/stream_muxer/yamux/yamux.py

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -647,22 +647,36 @@ async def handle_incoming(self) -> None:
647647
try:
648648
header = await read_exactly(self.secured_conn, HEADER_SIZE)
649649
except IncompleteReadError as e:
650-
# Get transport context for better debugging
651-
transport_type = "unknown"
652-
try:
653-
if hasattr(self.secured_conn, "conn_state"):
654-
conn_state_method = getattr(self.secured_conn, "conn_state")
655-
if callable(conn_state_method):
656-
state = conn_state_method()
657-
if isinstance(state, dict):
658-
transport_type = state.get("transport", "unknown")
659-
except Exception:
660-
pass
661-
662-
logger.error(
663-
f"Yamux connection closed during header read for peer "
664-
f"{self.peer_id}: {e}. Transport: {transport_type}."
665-
)
650+
# Check if this is a clean connection closure (0 bytes received)
651+
# This happens when the peer closes the connection gracefully
652+
# after completing their operations (e.g., after ping/pong)
653+
if e.is_clean_close:
654+
# Clean connection closure - this is normal when peer
655+
# disconnects after completing protocol exchange
656+
logger.info(
657+
f"Yamux connection closed cleanly by peer {self.peer_id}"
658+
)
659+
else:
660+
# Unexpected partial read - log as warning
661+
transport_type = "unknown"
662+
try:
663+
if hasattr(self.secured_conn, "conn_state"):
664+
conn_state_method = getattr(
665+
self.secured_conn, "conn_state"
666+
)
667+
if callable(conn_state_method):
668+
state = conn_state_method()
669+
if isinstance(state, dict):
670+
transport_type = state.get(
671+
"transport", "unknown"
672+
)
673+
except Exception:
674+
pass
675+
logger.warning(
676+
f"Yamux connection closed unexpectedly for peer "
677+
f"{self.peer_id}: {e}. Transport: {transport_type}."
678+
)
679+
666680
self.event_shutting_down.set()
667681
await self._cleanup_on_error()
668682
break
@@ -927,26 +941,20 @@ async def handle_incoming(self) -> None:
927941
self.stream_events[stream_id].set()
928942
except Exception as e:
929943
# Special handling for expected IncompleteReadError on stream close
930-
# This occurs when the connection closes while reading the header
931-
# (12 bytes)
944+
# This occurs when the connection closes while reading
932945
if isinstance(e, IncompleteReadError):
933-
details = getattr(e, "args", [{}])[0]
934-
if (
935-
isinstance(details, dict)
936-
and details.get("requested_count") == HEADER_SIZE
937-
and details.get("received_count") == 0
938-
):
946+
if e.is_clean_close:
939947
logger.info(
940-
f"Stream closed cleanly for peer {self.peer_id}"
941-
+ f" (IncompleteReadError: {details})"
948+
f"Yamux connection closed cleanly for peer {self.peer_id}"
942949
)
943950
self.event_shutting_down.set()
944951
await self._cleanup_on_error()
945952
break
946953
else:
947-
logger.error(
948-
f"Error in handle_incoming for peer {self.peer_id}: "
949-
+ f"{type(e).__name__}: {str(e)}"
954+
# Partial read - log as warning, not error
955+
logger.warning(
956+
f"Incomplete read in handle_incoming for peer "
957+
f"{self.peer_id}: {e}"
950958
)
951959
else:
952960
# Handle RawConnError with more nuance

newsfragments/1084.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed yamux listener incorrectly logging errors when peers close connections gracefully after completing protocol exchanges. Clean connection closures (0 bytes received) are now logged at INFO level instead of ERROR level.

tests/core/stream_muxer/test_yamux.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,3 +773,45 @@ async def test_yamux_syn_with_large_data(yamux_pair):
773773
assert received == test_data
774774
assert len(received) == 1024
775775
logging.debug("test_yamux_syn_with_large_data complete")
776+
777+
778+
@pytest.mark.trio
779+
async def test_incomplete_read_error_clean_close_detection():
780+
"""
781+
Test that IncompleteReadError correctly identifies clean connection closures.
782+
783+
This verifies the fix for issue #1084 where yamux listener incorrectly
784+
logged clean peer disconnections as errors. Clean closures (0 bytes received)
785+
should be detected via the is_clean_close property.
786+
"""
787+
from libp2p.io.exceptions import IncompleteReadError
788+
789+
# Test clean closure (0 bytes received)
790+
clean_error = IncompleteReadError(
791+
"Connection closed during read operation: expected 2 bytes but "
792+
"received 0 bytes",
793+
expected_bytes=2,
794+
received_bytes=0,
795+
)
796+
assert clean_error.is_clean_close, "Should detect clean closure (0 bytes)"
797+
assert clean_error.expected_bytes == 2
798+
assert clean_error.received_bytes == 0
799+
800+
# Test partial read (not clean closure)
801+
partial_error = IncompleteReadError(
802+
"Connection closed during read operation: expected 12 bytes but "
803+
"received 5 bytes",
804+
expected_bytes=12,
805+
received_bytes=5,
806+
)
807+
assert not partial_error.is_clean_close, "Partial read should not be clean closure"
808+
assert partial_error.expected_bytes == 12
809+
assert partial_error.received_bytes == 5
810+
811+
# Test default values (backward compatibility)
812+
legacy_error = IncompleteReadError("Some error message")
813+
assert legacy_error.is_clean_close, "Default 0 bytes should be clean closure"
814+
assert legacy_error.expected_bytes == 0
815+
assert legacy_error.received_bytes == 0
816+
817+
logging.debug("test_incomplete_read_error_clean_close_detection complete")

0 commit comments

Comments
 (0)