Skip to content

Commit 788b4cf

Browse files
committed
added complete back_off implementation
1 parent b78468c commit 788b4cf

File tree

2 files changed

+95
-27
lines changed

2 files changed

+95
-27
lines changed

libp2p/pubsub/gossipsub.py

Lines changed: 93 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class GossipSub(IPubsubRouter, Service):
9393
direct_connect_interval: int
9494

9595
do_px: bool
96-
back_off: int
96+
back_off: dict[str, dict[ID, int]]
97+
prune_back_off: int
9798
unsubscribe_back_off: int
9899

99100
def __init__(
@@ -111,7 +112,7 @@ def __init__(
111112
direct_connect_initial_delay: float = 0.1,
112113
direct_connect_interval: int = 300,
113114
do_px: bool = False,
114-
back_off: int = 60,
115+
prune_back_off: int = 60,
115116
unsubscribe_back_off: int = 10,
116117
) -> None:
117118
self.protocols = list(protocols)
@@ -148,7 +149,8 @@ def __init__(
148149
self.time_since_last_publish = {}
149150

150151
self.do_px = do_px
151-
self.back_off = back_off
152+
self.back_off = dict()
153+
self.prune_back_off = prune_back_off
152154
self.unsubscribe_back_off = unsubscribe_back_off
153155

154156
async def run(self) -> None:
@@ -345,15 +347,21 @@ async def join(self, topic: str) -> None:
345347
self.mesh[topic] = set()
346348

347349
topic_in_fanout: bool = topic in self.fanout
348-
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
350+
fanout_peers: set[ID] = set()
351+
352+
for peer in self.fanout[topic]:
353+
if self._check_back_off(peer, topic):
354+
continue
355+
fanout_peers.add(peer)
356+
349357
fanout_size = len(fanout_peers)
350358
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
351359
# There are less than D peers (let this number be x)
352360
# in the fanout for a topic (or the topic is not in the fanout).
353361
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
354-
if topic in self.pubsub.peer_topics:
362+
if self.pubsub is not None and topic in self.pubsub.peer_topics:
355363
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
356-
topic, self.degree - fanout_size, fanout_peers
364+
topic, self.degree - fanout_size, fanout_peers, True
357365
)
358366
# Combine fanout peers with selected peers
359367
fanout_peers.update(selected_peers)
@@ -380,7 +388,8 @@ async def leave(self, topic: str) -> None:
380388
return
381389
# Notify the peers in mesh[topic] with a PRUNE(topic) message
382390
for peer in self.mesh[topic]:
383-
await self.emit_prune(topic, peer, do_px=self.do_px, is_unsubscribe=True)
391+
await self.emit_prune(topic, peer, self.do_px, True)
392+
self._add_back_off(peer, topic, True)
384393

385394
# Forget mesh[topic]
386395
self.mesh.pop(topic, None)
@@ -516,7 +525,7 @@ def mesh_heartbeat(
516525
if num_mesh_peers_in_topic < self.degree_low:
517526
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
518527
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
519-
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
528+
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic], True
520529
)
521530

522531
for peer in selected_peers:
@@ -579,9 +588,7 @@ def _handle_topic_heartbeat(
579588
if len(in_topic_peers) < self.degree:
580589
# Select additional peers from peers.gossipsub[topic]
581590
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
582-
topic,
583-
self.degree - len(in_topic_peers),
584-
in_topic_peers,
591+
topic, self.degree - len(in_topic_peers), in_topic_peers, True
585592
)
586593
# Add the selected peers
587594
in_topic_peers.update(selected_peers)
@@ -592,7 +599,7 @@ def _handle_topic_heartbeat(
592599
if msg_ids:
593600
# Select D peers from peers.gossipsub[topic] excluding current peers
594601
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
595-
topic, self.degree, current_peers
602+
topic, self.degree, current_peers, True
596603
)
597604
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
598605
for peer in peers_to_emit_ihave_to:
@@ -666,7 +673,11 @@ def select_from_minus(
666673
return selection
667674

668675
def _get_in_topic_gossipsub_peers_from_minus(
669-
self, topic: str, num_to_select: int, minus: Iterable[ID]
676+
self,
677+
topic: str,
678+
num_to_select: int,
679+
minus: Iterable[ID],
680+
backoff_check: bool = False,
670681
) -> list[ID]:
671682
if self.pubsub is None:
672683
raise NoPubsubAttached
@@ -675,8 +686,57 @@ def _get_in_topic_gossipsub_peers_from_minus(
675686
for peer_id in self.pubsub.peer_topics[topic]
676687
if self.peer_protocol[peer_id] == PROTOCOL_ID
677688
}
689+
if backoff_check:
690+
# filter out peers that are in back off for this topic
691+
gossipsub_peers_in_topic = {
692+
peer_id
693+
for peer_id in gossipsub_peers_in_topic
694+
if self._check_back_off(peer_id, topic) is False
695+
}
678696
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
679697

698+
def _add_back_off(
699+
self, peer: ID, topic: str, is_unsubscribe: bool, backoff_duration: int = 0
700+
) -> None:
701+
"""
702+
Add back off for a peer in a topic.
703+
:param peer: peer to add back off for
704+
:param topic: topic to add back off for
705+
:param is_unsubscribe: whether this is an unsubscribe operation
706+
:param backoff_duration: duration of back off in seconds, if 0, use default
707+
"""
708+
if topic not in self.back_off:
709+
self.back_off[topic] = dict()
710+
711+
backoff_till = int(time.time())
712+
if backoff_duration > 0:
713+
backoff_till += backoff_duration
714+
else:
715+
if is_unsubscribe:
716+
backoff_till += self.unsubscribe_back_off
717+
else:
718+
backoff_till += self.prune_back_off
719+
720+
if peer not in self.back_off[topic]:
721+
self.back_off[topic][peer] = backoff_till
722+
else:
723+
self.back_off[topic][peer] = max(self.back_off[topic][peer], backoff_till)
724+
725+
def _check_back_off(self, peer: ID, topic: str) -> bool:
726+
"""
727+
Check if a peer is in back off for a topic and cleanup expired back off entries.
728+
:param peer: peer to check
729+
:param topic: topic to check
730+
:return: True if the peer is in back off, False otherwise
731+
"""
732+
if topic not in self.back_off:
733+
return False
734+
if self.back_off[topic].get(peer, 0) > int(time.time()):
735+
return True
736+
else:
737+
del self.back_off[topic][peer]
738+
return False
739+
680740
# RPC handlers
681741

682742
async def handle_ihave(
@@ -762,36 +822,45 @@ async def handle_graft(
762822
) -> None:
763823
topic: str = graft_msg.topicID
764824

765-
# TODO: complete the remaining logic
766-
self.do_px
767-
768825
# Add peer to mesh for topic
769826
if topic in self.mesh:
770827
for direct_peer in self.direct_peers:
771828
if direct_peer == sender_peer_id:
772829
logger.warning(
773830
"GRAFT: ignoring request from direct peer %s", sender_peer_id
774831
)
775-
await self.emit_prune(
776-
topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False
777-
)
832+
await self.emit_prune(topic, sender_peer_id, False, False)
778833
return
779834

835+
if self._check_back_off(sender_peer_id, topic):
836+
logger.warning(
837+
"GRAFT: ignoring request from %s, back off until %d",
838+
sender_peer_id,
839+
self.back_off[topic][sender_peer_id],
840+
)
841+
self._add_back_off(sender_peer_id, topic, False)
842+
await self.emit_prune(topic, sender_peer_id, False, False)
843+
return
844+
780845
if sender_peer_id not in self.mesh[topic]:
781846
self.mesh[topic].add(sender_peer_id)
782847
else:
783848
# Respond with PRUNE if not subscribed to the topic
784-
await self.emit_prune(
785-
topic, sender_peer_id, do_px=self.do_px, is_unsubscribe=False
786-
)
849+
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
787850

788851
async def handle_prune(
789852
self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID
790853
) -> None:
791854
topic: str = prune_msg.topicID
855+
backoff_till: int = prune_msg.backoff
792856

793857
# Remove peer from mesh for topic
794858
if topic in self.mesh:
859+
if backoff_till > 0:
860+
self._add_back_off(sender_peer_id, topic, False, backoff_till)
861+
else:
862+
self._add_back_off(sender_peer_id, topic, False)
863+
795864
self.mesh[topic].discard(sender_peer_id)
796865

797866
# RPC emitters
@@ -845,12 +914,11 @@ async def emit_graft(self, topic: str, id: ID) -> None:
845914
async def emit_prune(
846915
self, topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool
847916
) -> None:
848-
async def emit_prune(self, topic: str, id: ID) -> None:
849917
"""Emit graft message, sent to to_peer, for topic."""
850918
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
851919
prune_msg.topicID = topic
852920

853-
back_off_duration = self.back_off
921+
back_off_duration = self.prune_back_off
854922
if is_unsubscribe:
855923
back_off_duration = self.unsubscribe_back_off
856924

@@ -862,7 +930,7 @@ async def emit_prune(self, topic: str, id: ID) -> None:
862930
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
863931
control_msg.prune.extend([prune_msg])
864932

865-
await self.emit_control_message(control_msg, id)
933+
await self.emit_control_message(control_msg, to_peer)
866934

867935
async def emit_control_message(
868936
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID

tests/core/pubsub/test_gossipsub.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async def test_handle_graft(monkeypatch):
134134
# check if it is called in `handle_graft`
135135
event_emit_prune = trio.Event()
136136

137-
async def emit_prune(topic, sender_peer_id):
137+
async def emit_prune(topic, sender_peer_id, do_px, is_unsubscribe):
138138
event_emit_prune.set()
139139
await trio.lowlevel.checkpoint()
140140

@@ -193,7 +193,7 @@ async def test_handle_prune():
193193

194194
# alice emit prune message to bob, alice should be removed
195195
# from bob's mesh peer
196-
await gossipsubs[index_alice].emit_prune(topic, id_bob)
196+
await gossipsubs[index_alice].emit_prune(topic, id_bob, False, False)
197197
# `emit_prune` does not remove bob from alice's mesh peers
198198
assert id_bob in gossipsubs[index_alice].mesh[topic]
199199

0 commit comments

Comments
 (0)