Skip to content

enh/806-add-gossipsub-1.2-support #812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 140 additions & 4 deletions libp2p/pubsub/gossipsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
PROTOCOL_ID_V11 = TProtocol("/meshsub/1.1.0")
PROTOCOL_ID_V12 = TProtocol("/meshsub/1.2.0")

logger = logging.getLogger("libp2p.pubsub.gossipsub")

Expand Down Expand Up @@ -94,6 +95,10 @@ class GossipSub(IPubsubRouter, Service):
prune_back_off: int
unsubscribe_back_off: int

# Gossipsub v1.2 features
dont_send_message_ids: dict[ID, set[bytes]]
max_idontwant_messages: int

def __init__(
self,
protocols: Sequence[TProtocol],
Expand All @@ -112,6 +117,7 @@ def __init__(
px_peers_count: int = 16,
prune_back_off: int = 60,
unsubscribe_back_off: int = 10,
max_idontwant_messages: int = 10,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
Expand Down Expand Up @@ -152,6 +158,10 @@ def __init__(
self.prune_back_off = prune_back_off
self.unsubscribe_back_off = unsubscribe_back_off

# Gossipsub v1.2 features
self.dont_send_message_ids = dict()
self.max_idontwant_messages = max_idontwant_messages

async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
Expand Down Expand Up @@ -195,14 +205,23 @@ def add_peer(self, peer_id: ID, protocol_id: TProtocol | None) -> None:
if protocol_id is None:
raise ValueError("Protocol cannot be None")

if protocol_id not in (PROTOCOL_ID, floodsub.PROTOCOL_ID):
if protocol_id not in (
PROTOCOL_ID,
PROTOCOL_ID_V11,
PROTOCOL_ID_V12,
floodsub.PROTOCOL_ID,
):
# We should never enter here. Becuase the `protocol_id` is registered by
# your pubsub instance in multistream-select, but it is not the protocol
# that gossipsub supports. In this case, probably we registered gossipsub
# to a wrong `protocol_id` in multistream-select, or wrong versions.
raise ValueError(f"Protocol={protocol_id} is not supported.")
self.peer_protocol[peer_id] = protocol_id

# Initialize IDONTWANT tracking for this peer
if peer_id not in self.dont_send_message_ids:
self.dont_send_message_ids[peer_id] = set()

def remove_peer(self, peer_id: ID) -> None:
"""
Notifies the router that a peer has been disconnected.
Expand All @@ -218,6 +237,9 @@ def remove_peer(self, peer_id: ID) -> None:

self.peer_protocol.pop(peer_id, None)

# Clean up IDONTWANT tracking for this peer
self.dont_send_message_ids.pop(peer_id, None)

async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
"""
Invoked to process control messages in the RPC envelope.
Expand All @@ -241,20 +263,34 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
if control_message.prune:
for prune in control_message.prune:
await self.handle_prune(prune, sender_peer_id)
if control_message.idontwant:
for idontwant in control_message.idontwant:
await self.handle_idontwant(idontwant, sender_peer_id)

async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
"""Invoked to forward a new message that has been validated."""
self.mcache.put(pubsub_msg)

# Get message ID for IDONTWANT
if self.pubsub is not None:
msg_id = self.pubsub._msg_id_constructor(pubsub_msg)
else:
# Fallback to default ID construction
msg_id = pubsub_msg.seqno + pubsub_msg.from_id

peers_gen = self._get_peers_to_send(
pubsub_msg.topicIDs,
msg_forwarder=msg_forwarder,
origin=ID(pubsub_msg.from_id),
msg_id=msg_id,
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])

logger.debug("publishing message %s", pubsub_msg)

# Send IDONTWANT to mesh peers about this message
await self._emit_idontwant_for_message(msg_id, pubsub_msg.topicIDs)

for peer_id in peers_gen:
if self.pubsub is None:
raise NoPubsubAttached
Expand All @@ -269,7 +305,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
self.time_since_last_publish[topic] = int(time.time())

def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
self,
topic_ids: Iterable[str],
msg_forwarder: ID,
origin: ID,
msg_id: bytes | None = None,
) -> Iterable[ID]:
"""
Get the eligible peers to send the data to.
Expand Down Expand Up @@ -322,7 +362,18 @@ def _get_peers_to_send(
gossipsub_peers = fanout_peers
send_to.update(gossipsub_peers)
# Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin])
filtered_peers = send_to.difference([msg_forwarder, origin])

# Filter out peers that have sent IDONTWANT for this message
if msg_id is not None:
filtered_peers = {
peer_id
for peer_id in filtered_peers
if peer_id not in self.dont_send_message_ids
or msg_id not in self.dont_send_message_ids[peer_id]
}

yield from filtered_peers

async def join(self, topic: str) -> None:
"""
Expand Down Expand Up @@ -483,6 +534,9 @@ async def heartbeat(self) -> None:

self.mcache.shift()

# Prune old IDONTWANT entries to prevent memory leaks
self._prune_idontwant_entries()

await trio.sleep(self.heartbeat_interval)

async def direct_connect_heartbeat(self) -> None:
Expand Down Expand Up @@ -680,7 +734,8 @@ def _get_in_topic_gossipsub_peers_from_minus(
gossipsub_peers_in_topic = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == PROTOCOL_ID
if self.peer_protocol[peer_id]
in (PROTOCOL_ID, PROTOCOL_ID_V11, PROTOCOL_ID_V12)
}
if backoff_check:
# filter out peers that are in back off for this topic
Expand Down Expand Up @@ -733,6 +788,18 @@ def _check_back_off(self, peer: ID, topic: str) -> bool:
del self.back_off[topic][peer]
return False

def _prune_idontwant_entries(self) -> None:
"""
Prune old IDONTWANT entries during heartbeat to prevent memory leaks.

This method removes all IDONTWANT entries since they are only relevant
for the current heartbeat period. The specific message IDs that peers
don't want should be cleared regularly to prevent indefinite growth.
"""
# Clear all IDONTWANT entries for all peers
for peer_id in self.dont_send_message_ids:
self.dont_send_message_ids[peer_id].clear()

async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
if len(px_peers) > self.px_peers_count:
px_peers = px_peers[: self.px_peers_count]
Expand Down Expand Up @@ -894,6 +961,7 @@ def pack_control_msgs(
ihave_msgs: list[rpc_pb2.ControlIHave] | None,
graft_msgs: list[rpc_pb2.ControlGraft] | None,
prune_msgs: list[rpc_pb2.ControlPrune] | None,
idontwant_msgs: list[rpc_pb2.ControlIDontWant] | None = None,
) -> rpc_pb2.ControlMessage:
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
if ihave_msgs:
Expand All @@ -902,6 +970,8 @@ def pack_control_msgs(
control_msg.graft.extend(graft_msgs)
if prune_msgs:
control_msg.prune.extend(prune_msgs)
if idontwant_msgs:
control_msg.idontwant.extend(idontwant_msgs)
return control_msg

async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None:
Expand Down Expand Up @@ -966,6 +1036,16 @@ async def emit_prune(

await self.emit_control_message(control_msg, to_peer)

async def emit_idontwant(self, msg_ids: list[bytes], to_peer: ID) -> None:
"""Emit idontwant message, sent to to_peer, for msg_ids."""
idontwant_msg: rpc_pb2.ControlIDontWant = rpc_pb2.ControlIDontWant()
idontwant_msg.messageIDs.extend(msg_ids)

control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.idontwant.extend([idontwant_msg])

await self.emit_control_message(control_msg, to_peer)

async def emit_control_message(
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
) -> None:
Expand All @@ -985,3 +1065,59 @@ async def emit_control_message(

# Write rpc to stream
await self.pubsub.write_msg(peer_stream, packet)

async def _emit_idontwant_for_message(
self, msg_id: bytes, topic_ids: Iterable[str]
) -> None:
"""
Emit IDONTWANT message to mesh peers about a received message.

:param msg_id: The message ID to notify peers about
:param topic_ids: The topics the message belongs to
"""
if self.pubsub is None:
return

# Get all mesh peers for the topics in this message
mesh_peers: set[ID] = set()
for topic in topic_ids:
if topic in self.mesh:
mesh_peers.update(self.mesh[topic])

# Only send to peers that support gossipsub 1.2
v12_peers = {
peer_id
for peer_id in mesh_peers
if self.peer_protocol.get(peer_id) == PROTOCOL_ID_V12
}

if not v12_peers:
return

# Send IDONTWANT message to all v1.2 mesh peers
for peer_id in v12_peers:
await self.emit_idontwant([msg_id], peer_id)

async def handle_idontwant(
self, idontwant_msg: rpc_pb2.ControlIDontWant, sender_peer_id: ID
) -> None:
"""
Handle incoming IDONTWANT control message by adding message IDs
to the peer's dont_send_message_ids set.

:param idontwant_msg: The IDONTWANT control message
:param sender_peer_id: ID of the peer who sent the message
"""
# Initialize set if peer not tracked
if sender_peer_id not in self.dont_send_message_ids:
self.dont_send_message_ids[sender_peer_id] = set()

# Add all message IDs to the peer's dont_send set
for msg_id in idontwant_msg.messageIDs:
self.dont_send_message_ids[sender_peer_id].add(msg_id)

logger.debug(
"Added %d message IDs to dont_send list for peer %s",
len(idontwant_msg.messageIDs),
sender_peer_id,
)
5 changes: 5 additions & 0 deletions libp2p/pubsub/pb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message ControlMessage {
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
repeated ControlIDontWant idontwant = 5;
}

message ControlIHave {
Expand All @@ -51,6 +52,10 @@ message ControlPrune {
optional uint64 backoff = 3;
}

message ControlIDontWant {
repeated bytes messageIDs = 1;
}

message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
Expand Down
72 changes: 37 additions & 35 deletions libp2p/pubsub/pb/rpc_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading