Skip to content

Commit 460f502

Browse files
authored
Merge branch 'main' into write_msg_pubsub
2 parents f12ca4e + 6a92fa2 commit 460f502

File tree

10 files changed

+554
-50
lines changed

10 files changed

+554
-50
lines changed

libp2p/peer/peerinfo.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,23 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
6666
return PeerInfo(peer_id, [addr])
6767

6868

69+
def peer_info_to_bytes(peer_info: PeerInfo) -> bytes:
70+
lines = [str(peer_info.peer_id)] + [str(addr) for addr in peer_info.addrs]
71+
return "\n".join(lines).encode("utf-8")
72+
73+
74+
def peer_info_from_bytes(data: bytes) -> PeerInfo:
75+
try:
76+
lines = data.decode("utf-8").splitlines()
77+
if not lines:
78+
raise InvalidAddrError("no data to decode PeerInfo")
79+
80+
peer_id = ID.from_base58(lines[0])
81+
addrs = [multiaddr.Multiaddr(addr_str) for addr_str in lines[1:]]
82+
return PeerInfo(peer_id, addrs)
83+
except Exception as e:
84+
raise InvalidAddrError(f"failed to decode PeerInfo: {e}")
85+
86+
6987
class InvalidAddrError(ValueError):
7088
pass

libp2p/pubsub/gossipsub.py

Lines changed: 169 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
)
3030
from libp2p.peer.peerinfo import (
3131
PeerInfo,
32+
peer_info_from_bytes,
33+
peer_info_to_bytes,
3234
)
3335
from libp2p.peer.peerstore import (
3436
PERMANENT_ADDR_TTL,
@@ -86,6 +88,12 @@ class GossipSub(IPubsubRouter, Service):
8688
direct_connect_initial_delay: float
8789
direct_connect_interval: int
8890

91+
do_px: bool
92+
px_peers_count: int
93+
back_off: dict[str, dict[ID, int]]
94+
prune_back_off: int
95+
unsubscribe_back_off: int
96+
8997
def __init__(
9098
self,
9199
protocols: Sequence[TProtocol],
@@ -100,6 +108,10 @@ def __init__(
100108
heartbeat_interval: int = 120,
101109
direct_connect_initial_delay: float = 0.1,
102110
direct_connect_interval: int = 300,
111+
do_px: bool = False,
112+
px_peers_count: int = 16,
113+
prune_back_off: int = 60,
114+
unsubscribe_back_off: int = 10,
103115
) -> None:
104116
self.protocols = list(protocols)
105117
self.pubsub = None
@@ -134,6 +146,12 @@ def __init__(
134146
self.direct_connect_initial_delay = direct_connect_initial_delay
135147
self.time_since_last_publish = {}
136148

149+
self.do_px = do_px
150+
self.px_peers_count = px_peers_count
151+
self.back_off = dict()
152+
self.prune_back_off = prune_back_off
153+
self.unsubscribe_back_off = unsubscribe_back_off
154+
137155
async def run(self) -> None:
138156
self.manager.run_daemon_task(self.heartbeat)
139157
if len(self.direct_peers) > 0:
@@ -243,8 +261,10 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
243261
if peer_id not in self.pubsub.peers:
244262
continue
245263
stream = self.pubsub.peers[peer_id]
264+
246265
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
247266
await self.pubsub.write_msg(stream, rpc_msg)
267+
248268
for topic in pubsub_msg.topicIDs:
249269
self.time_since_last_publish[topic] = int(time.time())
250270

@@ -322,15 +342,22 @@ async def join(self, topic: str) -> None:
322342
self.mesh[topic] = set()
323343

324344
topic_in_fanout: bool = topic in self.fanout
325-
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
345+
fanout_peers: set[ID] = set()
346+
347+
if topic_in_fanout:
348+
for peer in self.fanout[topic]:
349+
if self._check_back_off(peer, topic):
350+
continue
351+
fanout_peers.add(peer)
352+
326353
fanout_size = len(fanout_peers)
327354
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
328355
# There are less than D peers (let this number be x)
329356
# in the fanout for a topic (or the topic is not in the fanout).
330357
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
331-
if topic in self.pubsub.peer_topics:
358+
if self.pubsub is not None and topic in self.pubsub.peer_topics:
332359
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
333-
topic, self.degree - fanout_size, fanout_peers
360+
topic, self.degree - fanout_size, fanout_peers, True
334361
)
335362
# Combine fanout peers with selected peers
336363
fanout_peers.update(selected_peers)
@@ -357,7 +384,8 @@ async def leave(self, topic: str) -> None:
357384
return
358385
# Notify the peers in mesh[topic] with a PRUNE(topic) message
359386
for peer in self.mesh[topic]:
360-
await self.emit_prune(topic, peer)
387+
await self.emit_prune(topic, peer, self.do_px, True)
388+
self._add_back_off(peer, topic, True)
361389

362390
# Forget mesh[topic]
363391
self.mesh.pop(topic, None)
@@ -447,8 +475,8 @@ async def heartbeat(self) -> None:
447475
self.fanout_heartbeat()
448476
# Get the peers to send IHAVE to
449477
peers_to_gossip = self.gossip_heartbeat()
450-
# Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and
451-
# send it
478+
# Pack(piggyback) GRAFT, PRUNE and IHAVE for the same peer into
479+
# one control message and send it
452480
await self._emit_control_msgs(
453481
peers_to_graft, peers_to_prune, peers_to_gossip
454482
)
@@ -493,7 +521,7 @@ def mesh_heartbeat(
493521
if num_mesh_peers_in_topic < self.degree_low:
494522
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
495523
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
496-
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
524+
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True
497525
)
498526

499527
for peer in selected_peers:
@@ -556,9 +584,7 @@ def _handle_topic_heartbeat(
556584
if len(in_topic_peers) < self.degree:
557585
# Select additional peers from peers.gossipsub[topic]
558586
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
559-
topic,
560-
self.degree - len(in_topic_peers),
561-
in_topic_peers,
587+
topic, self.degree - len(in_topic_peers), in_topic_peers, True
562588
)
563589
# Add the selected peers
564590
in_topic_peers.update(selected_peers)
@@ -569,7 +595,7 @@ def _handle_topic_heartbeat(
569595
if msg_ids:
570596
# Select D peers from peers.gossipsub[topic] excluding current peers
571597
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
572-
topic, self.degree, current_peers
598+
topic, self.degree, current_peers, True
573599
)
574600
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
575601
for peer in peers_to_emit_ihave_to:
@@ -643,7 +669,11 @@ def select_from_minus(
643669
return selection
644670

645671
def _get_in_topic_gossipsub_peers_from_minus(
646-
self, topic: str, num_to_select: int, minus: Iterable[ID]
672+
self,
673+
topic: str,
674+
num_to_select: int,
675+
minus: Iterable[ID],
676+
backoff_check: bool = False,
647677
) -> list[ID]:
648678
if self.pubsub is None:
649679
raise NoPubsubAttached
@@ -652,8 +682,88 @@ def _get_in_topic_gossipsub_peers_from_minus(
652682
for peer_id in self.pubsub.peer_topics[topic]
653683
if self.peer_protocol[peer_id] == PROTOCOL_ID
654684
}
685+
if backoff_check:
686+
# filter out peers that are in back off for this topic
687+
gossipsub_peers_in_topic = {
688+
peer_id
689+
for peer_id in gossipsub_peers_in_topic
690+
if self._check_back_off(peer_id, topic) is False
691+
}
655692
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
656693

694+
def _add_back_off(
695+
self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0
696+
) -> None:
697+
"""
698+
Add back off for a peer in a topic.
699+
:param peer: peer to add back off for
700+
:param topic: topic to add back off for
701+
:param is_unsubscribe: whether this is an unsubscribe operation
702+
:param backoff_duration: duration of back off in seconds, if 0, use default
703+
"""
704+
if topic not in self.back_off:
705+
self.back_off[topic] = dict()
706+
707+
backoff_till = int(time.time())
708+
if backoff_duration > 0:
709+
backoff_till += backoff_duration
710+
else:
711+
if is_unsubscribe:
712+
backoff_till += self.unsubscribe_back_off
713+
else:
714+
backoff_till += self.prune_back_off
715+
716+
if peer not in self.back_off[topic]:
717+
self.back_off[topic][peer] = backoff_till
718+
else:
719+
self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till)
720+
721+
def _check_back_off(self, peer: ID, topic: str) -> bool:
722+
"""
723+
Check if a peer is in back off for a topic and cleanup expired back off entries.
724+
:param peer: peer to check
725+
:param topic: topic to check
726+
:return: True if the peer is in back off, False otherwise
727+
"""
728+
if topic not in self.back_off or peer not in self.back_off[topic]:
729+
return False
730+
if self.back_off[topic].get(peer, 0) > int(time.time()):
731+
return True
732+
else:
733+
del self.back_off[topic][peer]
734+
return False
735+
736+
async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
737+
if len(px_peers) > self.px_peers_count:
738+
px_peers = px_peers[: self.px_peers_count]
739+
740+
for peer in px_peers:
741+
peer_id: ID = ID(peer.peerID)
742+
743+
if self.pubsub and peer_id in self.pubsub.peers:
744+
continue
745+
746+
try:
747+
peer_info = peer_info_from_bytes(peer.signedPeerRecord)
748+
try:
749+
if self.pubsub is None:
750+
raise NoPubsubAttached
751+
await self.pubsub.host.connect(peer_info)
752+
except Exception as e:
753+
logger.warning(
754+
"failed to connect to px peer %s: %s",
755+
peer_id,
756+
e,
757+
)
758+
continue
759+
except Exception as e:
760+
logger.warning(
761+
"failed to parse peer info from px peer %s: %s",
762+
peer_id,
763+
e,
764+
)
765+
continue
766+
657767
# RPC handlers
658768

659769
async def handle_ihave(
@@ -737,24 +847,46 @@ async def handle_graft(
737847
logger.warning(
738848
"GRAFT: ignoring request from direct peer %s", sender_peer_id
739849
)
740-
await self.emit_prune(topic, sender_peer_id)
850+
await self.emit_prune(topic, sender_peer_id, False, False)
741851
return
742852

853+
if self._check_back_off(sender_peer_id, topic):
854+
logger.warning(
855+
"GRAFT: ignoring request from %s, back off until %d",
856+
sender_peer_id,
857+
self.back_off[topic][sender_peer_id],
858+
)
859+
self._add_back_off(sender_peer_id, topic, False)
860+
await self.emit_prune(topic, sender_peer_id, False, False)
861+
return
862+
743863
if sender_peer_id not in self.mesh[topic]:
744864
self.mesh[topic].add(sender_peer_id)
745865
else:
746866
# Respond with PRUNE if not subscribed to the topic
747-
await self.emit_prune(topic, sender_peer_id)
867+
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
748868

749869
async def handle_prune(
750870
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
751871
) -> None:
752872
topic: str = prune_msg.topicID
873+
backoff_till: int = prune_msg.backoff
874+
px_peers: list[rpc_pb2.PeerInfo] = []
875+
for peer in prune_msg.peers:
876+
px_peers.append(peer)
753877

754878
# Remove peer from mesh for topic
755879
if topic in self.mesh:
880+
if backoff_till > 0:
881+
self._add_back_off(sender_peer_id, topic, False, backoff_till)
882+
else:
883+
self._add_back_off(sender_peer_id, topic, False)
884+
756885
self.mesh[topic].discard(sender_peer_id)
757886

887+
if px_peers:
888+
await self._do_px(px_peers)
889+
758890
# RPC emitters
759891

760892
def pack_control_msgs(
@@ -803,15 +935,36 @@ async def emit_graft(self, topic: str, id: ID) -> None:
803935

804936
await self.emit_control_message(control_msg, id)
805937

806-
async def emit_prune(self, topic: str, id: ID) -> None:
938+
async def emit_prune(
939+
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
940+
) -> None:
807941
"""Emit graft message, sent to to_peer, for topic."""
808942
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
809943
prune_msg.topicID = topic
810944

945+
back_off_duration = self.prune_back_off
946+
if is_unsubscribe:
947+
back_off_duration = self.unsubscribe_back_off
948+
949+
prune_msg.backoff = back_off_duration
950+
951+
if do_px:
952+
exchange_peers = self._get_in_topic_gossipsub_peers_from_minus(
953+
topic, self.px_peers_count, [to_peer]
954+
)
955+
for peer in exchange_peers:
956+
if self.pubsub is None:
957+
raise NoPubsubAttached
958+
peer_info = self.pubsub.host.get_peerstore().peer_info(peer)
959+
signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
960+
signed_peer_record.peerID = peer.to_bytes()
961+
signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info)
962+
prune_msg.peers.append(signed_peer_record)
963+
811964
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
812965
control_msg.prune.extend([prune_msg])
813966

814-
await self.emit_control_message(control_msg, id)
967+
await self.emit_control_message(control_msg, to_peer)
815968

816969
async def emit_control_message(
817970
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID

libp2p/pubsub/pb/rpc.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ message ControlGraft {
4747

4848
message ControlPrune {
4949
optional string topicID = 1;
50+
repeated PeerInfo peers = 2;
51+
optional uint64 backoff = 3;
52+
}
53+
54+
message PeerInfo {
55+
optional bytes peerID = 1;
56+
optional bytes signedPeerRecord = 2;
5057
}
5158

5259
message TopicDescriptor {

0 commit comments

Comments
 (0)