diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index cebc438b3..b22fbcdbf 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -57,6 +57,7 @@ PROTOCOL_ID = TProtocol("/meshsub/1.0.0") PROTOCOL_ID_V11 = TProtocol("/meshsub/1.1.0") +PROTOCOL_ID_V12 = TProtocol("/meshsub/1.2.0") logger = logging.getLogger("libp2p.pubsub.gossipsub") @@ -94,6 +95,10 @@ class GossipSub(IPubsubRouter, Service): prune_back_off: int unsubscribe_back_off: int + # Gossipsub v1.2 features + dont_send_message_ids: dict[ID, set[bytes]] + max_idontwant_messages: int + def __init__( self, protocols: Sequence[TProtocol], @@ -112,6 +117,7 @@ def __init__( px_peers_count: int = 16, prune_back_off: int = 60, unsubscribe_back_off: int = 10, + max_idontwant_messages: int = 10, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -152,6 +158,10 @@ def __init__( self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off + # Gossipsub v1.2 features + self.dont_send_message_ids = dict() + self.max_idontwant_messages = max_idontwant_messages + async def run(self) -> None: self.manager.run_daemon_task(self.heartbeat) if len(self.direct_peers) > 0: @@ -195,7 +205,12 @@ def add_peer(self, peer_id: ID, protocol_id: TProtocol | None) -> None: if protocol_id is None: raise ValueError("Protocol cannot be None") - if protocol_id not in (PROTOCOL_ID, floodsub.PROTOCOL_ID): + if protocol_id not in ( + PROTOCOL_ID, + PROTOCOL_ID_V11, + PROTOCOL_ID_V12, + floodsub.PROTOCOL_ID, + ): # We should never enter here. Becuase the `protocol_id` is registered by # your pubsub instance in multistream-select, but it is not the protocol # that gossipsub supports. In this case, probably we registered gossipsub @@ -203,6 +218,10 @@ def add_peer(self, peer_id: ID, protocol_id: TProtocol | None) -> None: raise ValueError(f"Protocol={protocol_id} is not supported.") self.peer_protocol[peer_id] = protocol_id + # Initialize IDONTWANT tracking for this peer + if peer_id not in self.dont_send_message_ids: + self.dont_send_message_ids[peer_id] = set() + def remove_peer(self, peer_id: ID) -> None: """ Notifies the router that a peer has been disconnected. @@ -218,6 +237,9 @@ def remove_peer(self, peer_id: ID) -> None: self.peer_protocol.pop(peer_id, None) + # Clean up IDONTWANT tracking for this peer + self.dont_send_message_ids.pop(peer_id, None) + async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: """ Invoked to process control messages in the RPC envelope. @@ -241,20 +263,34 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: if control_message.prune: for prune in control_message.prune: await self.handle_prune(prune, sender_peer_id) + if control_message.idontwant: + for idontwant in control_message.idontwant: + await self.handle_idontwant(idontwant, sender_peer_id) async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """Invoked to forward a new message that has been validated.""" self.mcache.put(pubsub_msg) + # Get message ID for IDONTWANT + if self.pubsub is not None: + msg_id = self.pubsub._msg_id_constructor(pubsub_msg) + else: + # Fallback to default ID construction + msg_id = pubsub_msg.seqno + pubsub_msg.from_id + peers_gen = self._get_peers_to_send( pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id), + msg_id=msg_id, ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) logger.debug("publishing message %s", pubsub_msg) + # Send IDONTWANT to mesh peers about this message + await self._emit_idontwant_for_message(msg_id, pubsub_msg.topicIDs) + for peer_id in peers_gen: if self.pubsub is None: raise NoPubsubAttached @@ -269,7 +305,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: self.time_since_last_publish[topic] = int(time.time()) def _get_peers_to_send( - self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID + self, + topic_ids: Iterable[str], + msg_forwarder: ID, + origin: ID, + msg_id: bytes | None = None, ) -> Iterable[ID]: """ Get the eligible peers to send the data to. @@ -322,7 +362,18 @@ def _get_peers_to_send( gossipsub_peers = fanout_peers send_to.update(gossipsub_peers) # Excludes `msg_forwarder` and `origin` - yield from send_to.difference([msg_forwarder, origin]) + filtered_peers = send_to.difference([msg_forwarder, origin]) + + # Filter out peers that have sent IDONTWANT for this message + if msg_id is not None: + filtered_peers = { + peer_id + for peer_id in filtered_peers + if peer_id not in self.dont_send_message_ids + or msg_id not in self.dont_send_message_ids[peer_id] + } + + yield from filtered_peers async def join(self, topic: str) -> None: """ @@ -483,6 +534,9 @@ async def heartbeat(self) -> None: self.mcache.shift() + # Prune old IDONTWANT entries to prevent memory leaks + self._prune_idontwant_entries() + await trio.sleep(self.heartbeat_interval) async def direct_connect_heartbeat(self) -> None: @@ -680,7 +734,8 @@ def _get_in_topic_gossipsub_peers_from_minus( gossipsub_peers_in_topic = { peer_id for peer_id in self.pubsub.peer_topics[topic] - if self.peer_protocol[peer_id] == PROTOCOL_ID + if self.peer_protocol[peer_id] + in (PROTOCOL_ID, PROTOCOL_ID_V11, PROTOCOL_ID_V12) } if backoff_check: # filter out peers that are in back off for this topic @@ -733,6 +788,18 @@ def _check_back_off(self, peer: ID, topic: str) -> bool: del self.back_off[topic][peer] return False + def _prune_idontwant_entries(self) -> None: + """ + Prune old IDONTWANT entries during heartbeat to prevent memory leaks. + + This method removes all IDONTWANT entries since they are only relevant + for the current heartbeat period. The specific message IDs that peers + don't want should be cleared regularly to prevent indefinite growth. + """ + # Clear all IDONTWANT entries for all peers + for peer_id in self.dont_send_message_ids: + self.dont_send_message_ids[peer_id].clear() + async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None: if len(px_peers) > self.px_peers_count: px_peers = px_peers[: self.px_peers_count] @@ -894,6 +961,7 @@ def pack_control_msgs( ihave_msgs: list[rpc_pb2.ControlIHave] | None, graft_msgs: list[rpc_pb2.ControlGraft] | None, prune_msgs: list[rpc_pb2.ControlPrune] | None, + idontwant_msgs: list[rpc_pb2.ControlIDontWant] | None = None, ) -> rpc_pb2.ControlMessage: control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() if ihave_msgs: @@ -902,6 +970,8 @@ def pack_control_msgs( control_msg.graft.extend(graft_msgs) if prune_msgs: control_msg.prune.extend(prune_msgs) + if idontwant_msgs: + control_msg.idontwant.extend(idontwant_msgs) return control_msg async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: @@ -966,6 +1036,16 @@ async def emit_prune( await self.emit_control_message(control_msg, to_peer) + async def emit_idontwant(self, msg_ids: list[bytes], to_peer: ID) -> None: + """Emit idontwant message, sent to to_peer, for msg_ids.""" + idontwant_msg: rpc_pb2.ControlIDontWant = rpc_pb2.ControlIDontWant() + idontwant_msg.messageIDs.extend(msg_ids) + + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() + control_msg.idontwant.extend([idontwant_msg]) + + await self.emit_control_message(control_msg, to_peer) + async def emit_control_message( self, control_msg: rpc_pb2.ControlMessage, to_peer: ID ) -> None: @@ -985,3 +1065,59 @@ async def emit_control_message( # Write rpc to stream await self.pubsub.write_msg(peer_stream, packet) + + async def _emit_idontwant_for_message( + self, msg_id: bytes, topic_ids: Iterable[str] + ) -> None: + """ + Emit IDONTWANT message to mesh peers about a received message. + + :param msg_id: The message ID to notify peers about + :param topic_ids: The topics the message belongs to + """ + if self.pubsub is None: + return + + # Get all mesh peers for the topics in this message + mesh_peers: set[ID] = set() + for topic in topic_ids: + if topic in self.mesh: + mesh_peers.update(self.mesh[topic]) + + # Only send to peers that support gossipsub 1.2 + v12_peers = { + peer_id + for peer_id in mesh_peers + if self.peer_protocol.get(peer_id) == PROTOCOL_ID_V12 + } + + if not v12_peers: + return + + # Send IDONTWANT message to all v1.2 mesh peers + for peer_id in v12_peers: + await self.emit_idontwant([msg_id], peer_id) + + async def handle_idontwant( + self, idontwant_msg: rpc_pb2.ControlIDontWant, sender_peer_id: ID + ) -> None: + """ + Handle incoming IDONTWANT control message by adding message IDs + to the peer's dont_send_message_ids set. + + :param idontwant_msg: The IDONTWANT control message + :param sender_peer_id: ID of the peer who sent the message + """ + # Initialize set if peer not tracked + if sender_peer_id not in self.dont_send_message_ids: + self.dont_send_message_ids[sender_peer_id] = set() + + # Add all message IDs to the peer's dont_send set + for msg_id in idontwant_msg.messageIDs: + self.dont_send_message_ids[sender_peer_id].add(msg_id) + + logger.debug( + "Added %d message IDs to dont_send list for peer %s", + len(idontwant_msg.messageIDs), + sender_peer_id, + ) diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index 7abce0d66..618c50bd1 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -30,6 +30,7 @@ message ControlMessage { repeated ControlIWant iwant = 2; repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; + repeated ControlIDontWant idontwant = 5; } message ControlIHave { @@ -51,6 +52,10 @@ message ControlPrune { optional uint64 backoff = 3; } +message ControlIDontWant { + repeated bytes messageIDs = 1; +} + message PeerInfo { optional bytes peerID = 1; optional bytes signedPeerRecord = 2; diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 7941d655c..a2db2f577 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: rpc.proto +# source: libp2p/pubsub/pb/rpc.proto """Generated protocol buffer code.""" from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor @@ -13,39 +13,41 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xe0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\x12.\n\tidontwant\x18\x05 \x03(\x0b\x32\x1b.pubsub.pb.ControlIDontWant\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"&\n\x10\x43ontrolIDontWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\x0c\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rpc_pb2', globals()) -if _descriptor._USE_C_DESCRIPTORS == False: - - DESCRIPTOR._options = None - _RPC._serialized_start=25 - _RPC._serialized_end=205 - _RPC_SUBOPTS._serialized_start=160 - _RPC_SUBOPTS._serialized_end=205 - _MESSAGE._serialized_start=207 - _MESSAGE._serialized_end=312 - _CONTROLMESSAGE._serialized_start=315 - _CONTROLMESSAGE._serialized_end=491 - _CONTROLIHAVE._serialized_start=493 - _CONTROLIHAVE._serialized_end=544 - _CONTROLIWANT._serialized_start=546 - _CONTROLIWANT._serialized_end=580 - _CONTROLGRAFT._serialized_start=582 - _CONTROLGRAFT._serialized_end=613 - _CONTROLPRUNE._serialized_start=615 - _CONTROLPRUNE._serialized_end=699 - _PEERINFO._serialized_start=701 - _PEERINFO._serialized_end=753 - _TOPICDESCRIPTOR._serialized_start=756 - _TOPICDESCRIPTOR._serialized_end=1147 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_start=889 - _TOPICDESCRIPTOR_AUTHOPTS._serialized_end=1013 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_start=975 - _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE._serialized_end=1013 - _TOPICDESCRIPTOR_ENCOPTS._serialized_start=1016 - _TOPICDESCRIPTOR_ENCOPTS._serialized_end=1147 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_start=1104 - _TOPICDESCRIPTOR_ENCOPTS_ENCMODE._serialized_end=1147 +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'libp2p.pubsub.pb.rpc_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_RPC']._serialized_start=42 + _globals['_RPC']._serialized_end=222 + _globals['_RPC_SUBOPTS']._serialized_start=177 + _globals['_RPC_SUBOPTS']._serialized_end=222 + _globals['_MESSAGE']._serialized_start=224 + _globals['_MESSAGE']._serialized_end=329 + _globals['_CONTROLMESSAGE']._serialized_start=332 + _globals['_CONTROLMESSAGE']._serialized_end=556 + _globals['_CONTROLIHAVE']._serialized_start=558 + _globals['_CONTROLIHAVE']._serialized_end=609 + _globals['_CONTROLIWANT']._serialized_start=611 + _globals['_CONTROLIWANT']._serialized_end=645 + _globals['_CONTROLGRAFT']._serialized_start=647 + _globals['_CONTROLGRAFT']._serialized_end=678 + _globals['_CONTROLPRUNE']._serialized_start=680 + _globals['_CONTROLPRUNE']._serialized_end=764 + _globals['_CONTROLIDONTWANT']._serialized_start=766 + _globals['_CONTROLIDONTWANT']._serialized_end=804 + _globals['_PEERINFO']._serialized_start=806 + _globals['_PEERINFO']._serialized_end=858 + _globals['_TOPICDESCRIPTOR']._serialized_start=861 + _globals['_TOPICDESCRIPTOR']._serialized_end=1252 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=994 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1118 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1080 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1118 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1121 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1252 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1209 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1252 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2.pyi b/libp2p/pubsub/pb/rpc_pb2.pyi index 88738e2e4..c3bd0c30d 100644 --- a/libp2p/pubsub/pb/rpc_pb2.pyi +++ b/libp2p/pubsub/pb/rpc_pb2.pyi @@ -102,6 +102,7 @@ class ControlMessage(google.protobuf.message.Message): IWANT_FIELD_NUMBER: builtins.int GRAFT_FIELD_NUMBER: builtins.int PRUNE_FIELD_NUMBER: builtins.int + IDONTWANT_FIELD_NUMBER: builtins.int @property def ihave(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIHave]: ... @property @@ -110,6 +111,8 @@ class ControlMessage(google.protobuf.message.Message): def graft(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlGraft]: ... @property def prune(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlPrune]: ... + @property + def idontwant(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIDontWant]: ... def __init__( self, *, @@ -117,8 +120,9 @@ class ControlMessage(google.protobuf.message.Message): iwant: collections.abc.Iterable[global___ControlIWant] | None = ..., graft: collections.abc.Iterable[global___ControlGraft] | None = ..., prune: collections.abc.Iterable[global___ControlPrune] | None = ..., + idontwant: collections.abc.Iterable[global___ControlIDontWant] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["graft", b"graft", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ... + def ClearField(self, field_name: typing.Literal["graft", b"graft", "idontwant", b"idontwant", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ... global___ControlMessage = ControlMessage @@ -197,6 +201,22 @@ class ControlPrune(google.protobuf.message.Message): global___ControlPrune = ControlPrune +@typing.final +class ControlIDontWant(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGEIDS_FIELD_NUMBER: builtins.int + @property + def messageIDs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bytes]: ... + def __init__( + self, + *, + messageIDs: collections.abc.Iterable[builtins.bytes] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["messageIDs", b"messageIDs"]) -> None: ... + +global___ControlIDontWant = ControlIDontWant + @typing.final class PeerInfo(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index f7d367e70..4529188f2 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -27,6 +27,7 @@ FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID GOSSIPSUB_PROTOCOL_ID_V1 = gossipsub.PROTOCOL_ID_V11 +GOSSIPSUB_PROTOCOL_ID_V12 = gossipsub.PROTOCOL_ID_V12 class GossipsubParams(NamedTuple): diff --git a/tests/core/pubsub/test_gossipsub_v12.py b/tests/core/pubsub/test_gossipsub_v12.py new file mode 100644 index 000000000..9e2f62397 --- /dev/null +++ b/tests/core/pubsub/test_gossipsub_v12.py @@ -0,0 +1,224 @@ +import pytest +import trio + +from libp2p.pubsub.gossipsub import ( + PROTOCOL_ID_V12, + GossipSub, +) +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.tools.utils import ( + connect, +) +from tests.utils.factories import ( + IDFactory, + PubsubFactory, +) +from tests.utils.pubsub.utils import ( + one_to_all_connect, +) + + +@pytest.mark.trio +async def test_gossipsub_v12_protocol_support(): + """Test that GossipSub supports protocol v1.2.""" + async with PubsubFactory.create_batch_with_gossipsub( + 2, protocols=[PROTOCOL_ID_V12], max_idontwant_messages=5 + ) as pubsubs_gsub: + # Verify the protocol is registered correctly + for pubsub in pubsubs_gsub: + if isinstance(pubsub.router, GossipSub): + assert PROTOCOL_ID_V12 in pubsub.router.protocols + assert pubsub.router.max_idontwant_messages == 5 + + +@pytest.mark.trio +async def test_idontwant_data_structures(): + """Test IDONTWANT data structure initialization and cleanup.""" + async with PubsubFactory.create_batch_with_gossipsub( + 2, protocols=[PROTOCOL_ID_V12] + ) as pubsubs_gsub: + router = pubsubs_gsub[0].router + assert isinstance(router, GossipSub) + + # Initially empty + assert len(router.dont_send_message_ids) == 0 + + # Connect peers + await connect(pubsubs_gsub[0].host, pubsubs_gsub[1].host) + await trio.sleep(0.1) + + # Verify peer tracking is initialized + peer_id = pubsubs_gsub[1].host.get_id() + assert peer_id in router.dont_send_message_ids + assert isinstance(router.dont_send_message_ids[peer_id], set) + assert len(router.dont_send_message_ids[peer_id]) == 0 + + +@pytest.mark.trio +async def test_handle_idontwant_message(): + """Test handling of incoming IDONTWANT control messages.""" + async with PubsubFactory.create_batch_with_gossipsub( + 2, protocols=[PROTOCOL_ID_V12] + ) as pubsubs_gsub: + router = pubsubs_gsub[0].router + assert isinstance(router, GossipSub) + + # Connect peers + await connect(pubsubs_gsub[0].host, pubsubs_gsub[1].host) + await trio.sleep(0.1) + + sender_peer_id = pubsubs_gsub[1].host.get_id() + + # Create IDONTWANT message + msg_ids = [b"msg1", b"msg2", b"msg3"] + idontwant_msg = rpc_pb2.ControlIDontWant() + idontwant_msg.messageIDs.extend(msg_ids) + + # Handle the IDONTWANT message + await router.handle_idontwant(idontwant_msg, sender_peer_id) + + # Verify message IDs are stored + assert sender_peer_id in router.dont_send_message_ids + stored_ids = router.dont_send_message_ids[sender_peer_id] + for msg_id in msg_ids: + assert msg_id in stored_ids + + +@pytest.mark.trio +async def test_message_filtering_with_idontwant(): + """Test that messages are filtered based on IDONTWANT.""" + async with PubsubFactory.create_batch_with_gossipsub( + 3, + protocols=[PROTOCOL_ID_V12], + heartbeat_interval=100, # Disable heartbeat + ) as pubsubs_gsub: + topic = "test_topic" + + # Connect all peers + hosts = [pubsub.host for pubsub in pubsubs_gsub] + await one_to_all_connect(hosts, 0) + await trio.sleep(0.1) + + # Subscribe all to the topic + for pubsub in pubsubs_gsub: + await pubsub.subscribe(topic) + await trio.sleep(0.5) # Allow mesh to form + + router = pubsubs_gsub[0].router + assert isinstance(router, GossipSub) + + # Create a fake message + msg_id = b"test_msg_id" + msg_forwarder = pubsubs_gsub[1].host.get_id() + origin = pubsubs_gsub[2].host.get_id() + + # Add IDONTWANT for peer 1 + peer1_id = pubsubs_gsub[1].host.get_id() + router.dont_send_message_ids[peer1_id].add(msg_id) + + # Get peers to send to + peers_to_send = list( + router._get_peers_to_send([topic], msg_forwarder, origin, msg_id) + ) + + # Peer 1 should be filtered out due to IDONTWANT + assert peer1_id not in peers_to_send + + # But peer 2 should still be included + peer2_id = pubsubs_gsub[2].host.get_id() + # Note: peer2 is origin, so won't be in the list + # Let's test with peer 2 as forwarder instead + peers_to_send_alt = list( + router._get_peers_to_send([topic], peer2_id, origin, msg_id) + ) + + # Peer 1 should still be filtered out + assert peer1_id not in peers_to_send_alt + + +@pytest.mark.trio +async def test_idontwant_pruning(): + """Test that IDONTWANT entries are pruned during heartbeat.""" + async with PubsubFactory.create_batch_with_gossipsub( + 2, + protocols=[PROTOCOL_ID_V12], + heartbeat_interval=100, # Disable auto heartbeat + ) as pubsubs_gsub: + router = pubsubs_gsub[0].router + assert isinstance(router, GossipSub) + + # Connect peers + await connect(pubsubs_gsub[0].host, pubsubs_gsub[1].host) + await trio.sleep(0.1) + + peer_id = pubsubs_gsub[1].host.get_id() + + # Add some IDONTWANT entries + router.dont_send_message_ids[peer_id].add(b"msg1") + router.dont_send_message_ids[peer_id].add(b"msg2") + assert len(router.dont_send_message_ids[peer_id]) == 2 + + # Run pruning + router._prune_idontwant_entries() + + # Verify entries are cleared + assert len(router.dont_send_message_ids[peer_id]) == 0 + + +@pytest.mark.trio +async def test_emit_idontwant(): + """Test emitting IDONTWANT control messages.""" + async with PubsubFactory.create_batch_with_gossipsub( + 2, protocols=[PROTOCOL_ID_V12] + ) as pubsubs_gsub: + # Connect peers + await connect(pubsubs_gsub[0].host, pubsubs_gsub[1].host) + await trio.sleep(0.1) + + router = pubsubs_gsub[0].router + assert isinstance(router, GossipSub) + + peer_id = pubsubs_gsub[1].host.get_id() + msg_ids = [b"msg1", b"msg2"] + + # This should not raise an exception + await router.emit_idontwant(msg_ids, peer_id) + + # Note: Testing actual message delivery would require more complex + # test setup to capture the sent messages + + +@pytest.mark.trio +async def test_mixed_protocol_versions(): + """Test that IDONTWANT is only sent to v1.2 peers.""" + async with PubsubFactory.create_batch_with_gossipsub( + 1, protocols=[PROTOCOL_ID_V12] + ) as v12_pubsubs: + router = v12_pubsubs[0].router + assert isinstance(router, GossipSub) + + # Mock different protocol versions for testing + from libp2p.pubsub.gossipsub import PROTOCOL_ID_V11 + + peer_id_v11 = IDFactory() + peer_id_v12 = IDFactory() + + # Add peers with different protocol versions + router.add_peer(peer_id_v11, PROTOCOL_ID_V11) + router.add_peer(peer_id_v12, PROTOCOL_ID_V12) + + # Create mock mesh + topic = "test_topic" + router.mesh[topic] = {peer_id_v11, peer_id_v12} + + # Mock message ID + msg_id = b"test_message" + + # The emit function should handle protocol filtering internally + # This should not raise an exception - it should only attempt to + # send IDONTWANT to v1.2 peers + await router._emit_idontwant_for_message(msg_id, [topic]) + + # Verify that the method completed without error + # (In a real implementation, we'd verify the actual message sending, + # but that would require more complex mocking) diff --git a/tests/utils/factories.py b/tests/utils/factories.py index 75639e369..b45e445ed 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -447,6 +447,7 @@ class Meta: px_peers_count = GOSSIPSUB_PARAMS.px_peers_count prune_back_off = GOSSIPSUB_PARAMS.prune_back_off unsubscribe_back_off = GOSSIPSUB_PARAMS.unsubscribe_back_off + max_idontwant_messages = 10 class PubsubFactory(factory.Factory): @@ -576,6 +577,7 @@ async def create_batch_with_gossipsub( px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count, prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off, unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off, + max_idontwant_messages: int = 10, security_protocol: TProtocol | None = None, muxer_opt: TMuxerOptions | None = None, msg_id_constructor: None @@ -600,6 +602,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + max_idontwant_messages=max_idontwant_messages, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -618,6 +621,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + max_idontwant_messages=max_idontwant_messages, ) async with cls._create_batch_with_router(