-
Notifications
You must be signed in to change notification settings - Fork 174
Feat/fix mplex stream muxer issues #742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7f2c635
ca9bed4
8fd2d65
2fde49e
9595851
f8f93cc
eb3653e
881e5cb
d888938
80b0f12
750991b
5d5e33c
92b39fe
ec4eb83
e0f5dcd
aff17fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ downloads/ | |
wheels/ | ||
MANIFEST | ||
pip-wheel-metadata | ||
.ruff_cache | ||
|
||
# Installer logs | ||
pip-log.txt | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -16,6 +16,7 @@ | |||
|
||||
from libp2p.abc import ( | ||||
IHost, | ||||
IMultiselectMuxer, | ||||
INetConn, | ||||
INetStream, | ||||
INetworkService, | ||||
|
@@ -130,7 +131,7 @@ def get_peerstore(self) -> IPeerStore: | |||
""" | ||||
return self.peerstore | ||||
|
||||
def get_mux(self) -> Multiselect: | ||||
def get_mux(self) -> IMultiselectMuxer: | ||||
""" | ||||
:return: mux instance of host | ||||
""" | ||||
|
@@ -274,8 +275,13 @@ async def close(self) -> None: | |||
|
||||
# Reference: `BasicHost.newStreamHandler` in Go. | ||||
async def _swarm_stream_handler(self, net_stream: INetStream) -> None: | ||||
""" | ||||
Handles incoming network streams by performing protocol negotiation | ||||
and dispatching to the appropriate handler. | ||||
""" | ||||
# Perform protocol muxing to determine protocol to use | ||||
try: | ||||
# The protocol returned here can now be None | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
protocol, handler = await self.multiselect.negotiate( | ||||
MultiselectCommunicator(net_stream), self.negotiate_timeout | ||||
) | ||||
|
@@ -286,6 +292,24 @@ async def _swarm_stream_handler(self, net_stream: INetStream) -> None: | |||
) | ||||
await net_stream.reset() | ||||
return | ||||
|
||||
# Handle case where protocol is None | ||||
if protocol is None: | ||||
peer_id = net_stream.muxed_conn.peer_id | ||||
logger.debug( | ||||
"No protocol selected by peer %s during negotiation. Resetting stream.", | ||||
peer_id, | ||||
) | ||||
await net_stream.reset() | ||||
# The BasicHost analysis suggested raising StreamFailure here. | ||||
# However, the current structure of _swarm_stream_handler | ||||
# just returns on failure, so let's maintain that pattern | ||||
# for now, unless further analysis suggests a raise is better. | ||||
# For strict adherence to the analysis, it might be: | ||||
# raise StreamFailure(f"No protocol selected from peer {peer_id}") | ||||
# But the 'return' is consistent with the `except` block's handling. | ||||
return | ||||
|
||||
net_stream.set_protocol(protocol) | ||||
if handler is None: | ||||
logger.debug( | ||||
|
@@ -322,4 +346,4 @@ def get_peer_connection_info(self, peer_id: ID) -> INetConn | None: | |||
:param peer_id: ID of the peer to get info for | ||||
:return: Connection object if peer is connected, None otherwise | ||||
""" | ||||
return self._network.connections.get(peer_id) | ||||
return self._network.connections.get(peer_id) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,16 +48,15 @@ def add_handler( | |
""" | ||
self.handlers[protocol] = handler | ||
|
||
# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've removed the |
||
async def negotiate( | ||
self, | ||
communicator: IMultiselectCommunicator, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While you're here, please update the docstring to match the arguments (specifically There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Will do. |
||
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, | ||
) -> tuple[TProtocol, StreamHandlerFn | None]: | ||
) -> tuple[TProtocol | None, StreamHandlerFn | None]: | ||
""" | ||
Negotiate performs protocol selection. | ||
Negotiate performs protocol selection with a multiselect client. | ||
|
||
:param stream: stream to negotiate on | ||
:param communicator: The communicator used to negotiate the protocol. | ||
:param negotiate_timeout: timeout for negotiation | ||
:return: selected protocol name, handler function | ||
:raise MultiselectError: raised when negotiation failed | ||
|
@@ -144,4 +143,4 @@ def is_valid_handshake(handshake_contents: str) -> bool: | |
:param handshake_contents: contents of handshake message | ||
:return: true if handshake is complete, false otherwise | ||
""" | ||
return handshake_contents == MULTISELECT_PROTOCOL_ID | ||
return handshake_contents == MULTISELECT_PROTOCOL_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line should move along with the class definition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pacrob Will handle that as I clock in tommorrow.