-
Notifications
You must be signed in to change notification settings - Fork 174
Feat/fix mplex stream muxer issues #742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…_message when it receives an unknown flag
…rom an unknown stream
… remote after stream was closed by them
… remote after stream was closed by them
…x-stream-muxer-issues
@@ -2016,39 +2112,6 @@ def is_expired(self) -> bool: | |||
# ------------------ multiselect_communicator interface.py ------------------ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line should move along with the class definition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pacrob Will handle that as I clock in tommorrow.
libp2p/host/basic_host.py
Outdated
@@ -286,6 +288,25 @@ async def _swarm_stream_handler(self, net_stream: INetStream) -> None: | |||
) | |||
await net_stream.reset() | |||
return | |||
|
|||
# --- NEW CODE: Handle case where protocol is None --- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# --- NEW CODE: Handle case where protocol is None --- | |
# Handle case where protocol is None |
libp2p/host/basic_host.py
Outdated
# raise StreamFailure(f"No protocol selected from peer {peer_id}") | ||
# But the 'return' is consistent with the `except` block's handling. | ||
return | ||
# --- END NEW CODE --- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# --- END NEW CODE --- |
@@ -276,6 +277,7 @@ async def close(self) -> None: | |||
async def _swarm_stream_handler(self, net_stream: INetStream) -> None: | |||
# Perform protocol muxing to determine protocol to use | |||
try: | |||
# The protocol returned here can now be None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# The protocol returned here can now be None |
@@ -48,7 +48,6 @@ def add_handler( | |||
""" | |||
self.handlers[protocol] = handler | |||
|
|||
# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent | |||
async def negotiate( | |||
self, | |||
communicator: IMultiselectCommunicator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you're here, please update the docstring to match the arguments (specifically communicator
vs stream
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do.
@@ -48,7 +48,6 @@ def add_handler( | |||
""" | |||
self.handlers[protocol] = handler | |||
|
|||
# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've removed the FIXME
comment, but not updated the return type accordingly.
General note on code comments - they should briefly describe what is happening in the following code if it not clear. They should not describe the changes happening in a particular PR. You can add those as comments here on the PR page for things you think need additional clarity. |
@seetadev The other PR can be closed in favor of this one. |
My bad. Thanks for letting me know. Will follow this comment format going forward. |
@seetadev Changes have been made accordingly |
@Aldorax Are you still working on this PR or can be take over ? |
Good to hear. |
PR: Mplex Stream Muxer TODOs Resolution
This PR addresses several
TODO
s in the Mplex stream muxer implementation, significantly improving type safety, logging, and overall robustness.What was wrong?
The
mplex.py
implementation had four outstandingTODO
items that detracted from code quality and made debugging difficult:Return Type Inconsistency: The
send_message
function was type-hinted to return anint
, but its underlying call towrite_to_stream
returnedNone
. This led to type checker warnings and an unclear API contract.Missing Logging for Unknown Flags: The system would silently handle and reset streams that received messages with unknown header flags, providing no visibility into this protocol violation. This made diagnosing unexpected behavior challenging.
Missing Warning for Unknown Stream Messages: Messages received for a stream that wasn't recognized by the host were silently dropped, potentially hiding issues with peer behavior or connection state.
Missing Warning for Data After Stream Close: If a remote peer sent data after it had already closed its end of the stream, the data was silently ignored. This could obscure misbehaving peers or protocol errors.
How was it fixed?
This PR resolves all four issues by implementing direct and effective solutions:
Return Type Fixed: The
write_to_stream
method was updated to returnlen(_bytes)
, which represents the number of bytes successfully written. Consequently, the type hints for bothwrite_to_stream
andsend_message
were aligned to correctly return anint
, resolving the type inconsistency and removing the# type: ignore
comment.Logging Added:
logger.warning
calls were strategically added to the handlers for:Unknown flags: When messages with unrecognized header flags are received.
Messages to unknown streams: When data arrives for a stream ID that the local muxer does not recognize.
Data received after a stream was closed: When a peer attempts to send data on a stream that has already been locally closed.
This makes protocol violations visible in the logs, which is critical for debugging and understanding peer behavior.
Tests Added: A comprehensive suite of tests was added to validate the new behavior. This includes:
Runtime type checks for the
send_message
return value to ensure it consistently returns anint
.Logging verification tests using the
capsys
fixture to confirm that the correct warning messages are emitted under the specified error conditions (unknown flags, unknown streams, data after close).Summary of Approach
The approach was to systematically address each
TODO
item in the Mplex implementation. The core of the work involved aligning function signatures with their behavior for better type safety and injecting informative logging statements at key points where protocol violations can occur. This significantly enhances the debuggability and robustness of the stream muxer. A corresponding test suite was developed to lock in these fixes and prevent future regressions. The final step was a cleanup pass to ensure all code adhered to the project's linting and static analysis standards.To-Do
Clean up commit history
Add or update documentation related to these changes
Add entry to the release notes
Cute Animal Picture
The issue this PR #741