Skip to content

Feat/fix mplex stream muxer issues #742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ downloads/
wheels/
MANIFEST
pip-wheel-metadata
.ruff_cache

# Installer logs
pip-log.txt
Expand Down
199 changes: 99 additions & 100 deletions libp2p/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,103 @@ async def listen_close(self, network: "INetwork", multiaddr: Multiaddr) -> None:
"""


class IMultiselectCommunicator(ABC):
"""
Communicator helper for multiselect.

Ensures that both the client and multistream module follow the same
multistream protocol.
"""

@abstractmethod
async def write(self, msg_str: str) -> None:
"""
Write a message to the stream.

Parameters
----------
msg_str : str
The message string to write.

"""

@abstractmethod
async def read(self) -> str:
"""
Read a message from the stream until EOF.

Returns
-------
str
The message read from the stream.

"""


# -------------------------- multiselect_muxer interface.py --------------------------


class IMultiselectMuxer(ABC):
"""
Multiselect module for protocol negotiation.

Responsible for responding to a multiselect client by selecting a protocol
and its corresponding handler for communication.
"""

handlers: dict[TProtocol | None, StreamHandlerFn | None]

@abstractmethod
def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None:
"""
Store a handler for the specified protocol.

Parameters
----------
protocol : TProtocol
The protocol name.
handler : StreamHandlerFn
The handler function associated with the protocol.

"""

def get_protocols(self) -> tuple[TProtocol | None, ...]:
"""
Retrieve the protocols for which handlers have been registered.

Returns
-------
tuple[TProtocol, ...]
A tuple of registered protocol names.

"""
return tuple(self.handlers.keys())

@abstractmethod
async def negotiate(
self, communicator: IMultiselectCommunicator
) -> tuple[TProtocol | None, StreamHandlerFn | None]:
"""
Negotiate a protocol selection with a multiselect client.

Parameters
----------
communicator : IMultiselectCommunicator
The communicator used to negotiate the protocol.

Returns
-------
tuple[TProtocol, StreamHandlerFn]
A tuple containing the selected protocol and its handler.

Raises
------
Exception
If negotiation fails.

"""


# -------------------------- host interface.py --------------------------


Expand Down Expand Up @@ -1545,15 +1642,14 @@ def get_network(self) -> INetworkService:

"""

# FIXME: Replace with correct return type
@abstractmethod
def get_mux(self) -> Any:
def get_mux(self) -> IMultiselectMuxer:
"""
Retrieve the muxer instance for the host.

Returns
-------
Any
IMultiselectMuxer
The muxer instance of the host.

"""
Expand Down Expand Up @@ -2016,39 +2112,6 @@ def is_expired(self) -> bool:
# ------------------ multiselect_communicator interface.py ------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line should move along with the class definition

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pacrob Will handle that as I clock in tommorrow.



class IMultiselectCommunicator(ABC):
"""
Communicator helper for multiselect.

Ensures that both the client and multistream module follow the same
multistream protocol.
"""

@abstractmethod
async def write(self, msg_str: str) -> None:
"""
Write a message to the stream.

Parameters
----------
msg_str : str
The message string to write.

"""

@abstractmethod
async def read(self) -> str:
"""
Read a message from the stream until EOF.

Returns
-------
str
The message read from the stream.

"""


# -------------------------- multiselect_client interface.py --------------------------


Expand Down Expand Up @@ -2131,70 +2194,6 @@ async def try_select(
"""


# -------------------------- multiselect_muxer interface.py --------------------------


class IMultiselectMuxer(ABC):
"""
Multiselect module for protocol negotiation.

Responsible for responding to a multiselect client by selecting a protocol
and its corresponding handler for communication.
"""

handlers: dict[TProtocol | None, StreamHandlerFn | None]

@abstractmethod
def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None:
"""
Store a handler for the specified protocol.

Parameters
----------
protocol : TProtocol
The protocol name.
handler : StreamHandlerFn
The handler function associated with the protocol.

"""

def get_protocols(self) -> tuple[TProtocol | None, ...]:
"""
Retrieve the protocols for which handlers have been registered.

Returns
-------
tuple[TProtocol, ...]
A tuple of registered protocol names.

"""
return tuple(self.handlers.keys())

@abstractmethod
async def negotiate(
self, communicator: IMultiselectCommunicator
) -> tuple[TProtocol | None, StreamHandlerFn | None]:
"""
Negotiate a protocol selection with a multiselect client.

Parameters
----------
communicator : IMultiselectCommunicator
The communicator used to negotiate the protocol.

Returns
-------
tuple[TProtocol, StreamHandlerFn]
A tuple containing the selected protocol and its handler.

Raises
------
Exception
If negotiation fails.

"""


# -------------------------- routing interface.py --------------------------


Expand Down
23 changes: 22 additions & 1 deletion libp2p/host/basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from libp2p.abc import (
IHost,
IMultiselectMuxer,
INetConn,
INetStream,
INetworkService,
Expand Down Expand Up @@ -130,7 +131,7 @@ def get_peerstore(self) -> IPeerStore:
"""
return self.peerstore

def get_mux(self) -> Multiselect:
def get_mux(self) -> IMultiselectMuxer:
"""
:return: mux instance of host
"""
Expand Down Expand Up @@ -276,6 +277,7 @@ async def close(self) -> None:
async def _swarm_stream_handler(self, net_stream: INetStream) -> None:
# Perform protocol muxing to determine protocol to use
try:
# The protocol returned here can now be None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# The protocol returned here can now be None

protocol, handler = await self.multiselect.negotiate(
MultiselectCommunicator(net_stream), self.negotiate_timeout
)
Expand All @@ -286,6 +288,25 @@ async def _swarm_stream_handler(self, net_stream: INetStream) -> None:
)
await net_stream.reset()
return

# --- NEW CODE: Handle case where protocol is None ---
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# --- NEW CODE: Handle case where protocol is None ---
# Handle case where protocol is None

if protocol is None:
peer_id = net_stream.muxed_conn.peer_id
logger.debug(
"No protocol selected by peer %s during negotiation. Resetting stream.",
peer_id,
)
await net_stream.reset()
# The BasicHost analysis suggested raising StreamFailure here.
# However, the current structure of _swarm_stream_handler
# just returns on failure, so let's maintain that pattern
# for now, unless further analysis suggests a raise is better.
# For strict adherence to the analysis, it might be:
# raise StreamFailure(f"No protocol selected from peer {peer_id}")
# But the 'return' is consistent with the `except` block's handling.
return
# --- END NEW CODE ---
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# --- END NEW CODE ---


net_stream.set_protocol(protocol)
if handler is None:
logger.debug(
Expand Down
1 change: 0 additions & 1 deletion libp2p/protocol_muxer/multiselect.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def add_handler(
"""
self.handlers[protocol] = handler

# FIXME: Make TProtocol Optional[TProtocol] to keep types consistent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've removed the FIXME comment, but not updated the return type accordingly.

async def negotiate(
self,
communicator: IMultiselectCommunicator,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're here, please update the docstring to match the arguments (specifically communicator vs stream).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will do.

Expand Down
49 changes: 32 additions & 17 deletions libp2p/security/security_multistream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from libp2p.peer.id import (
ID,
)
from libp2p.protocol_muxer.exceptions import MultiselectError
from libp2p.protocol_muxer.multiselect import (
Multiselect,
)
Expand All @@ -26,6 +27,7 @@
from libp2p.protocol_muxer.multiselect_communicator import (
MultiselectCommunicator,
)
from libp2p.transport.exceptions import SecurityUpgradeFailure

"""
Represents a secured connection object, which includes a connection and details about
Expand Down Expand Up @@ -96,23 +98,36 @@ async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureCon
async def select_transport(
self, conn: IRawConnection, is_initiator: bool
) -> ISecureTransport:
"""
Select a transport that both us and the node on the other end of conn
support and agree on.

:param conn: conn to choose a transport over
:param is_initiator: true if we are the initiator, false otherwise
:return: selected secure transport
"""
protocol: TProtocol
# Note: protocol is TProtocol | None here due to negotiate's new type hint
protocol: TProtocol | None # <--- UPDATE TYPE HINT FOR 'protocol' VARIABLE
communicator = MultiselectCommunicator(conn)
if is_initiator:
# Select protocol if initiator
protocol = await self.multiselect_client.select_one_of(
list(self.transports.keys()), communicator

# Use a try-except block to catch MultiselectError from negotiate
try:
if is_initiator:
# Select protocol if initiator (multiselect_client.select_one_of should
# raise if no protocol)
protocol = await self.multiselect_client.select_one_of(
list(self.transports.keys()), communicator
)
else:
# Select protocol if non-initiator
# protocol can now be None if negotiate doesn't find a suitable one
protocol, _ = await self.multiselect.negotiate(communicator)
except MultiselectError as error:
# Catch errors from both select_one_of and negotiate, and re-raise as
# SecurityUpgradeFailure
raise SecurityUpgradeFailure(
"failed to negotiate security protocol"
) from error

# --- NEW CODE: Handle case where protocol is None after negotiation ---
if protocol is None:
raise SecurityUpgradeFailure(
"No security protocol selected during negotiation"
)
else:
# Select protocol if non-initiator
protocol, _ = await self.multiselect.negotiate(communicator)
# Return transport from protocol
# --- END NEW CODE ---

# protocol is guaranteed to be TProtocol here, so no TProtocol(protocol) cast
# needed
return self.transports[protocol]
Loading
Loading