32
32
)
33
33
from libp2p .peer .peerinfo import (
34
34
PeerInfo ,
35
+ peer_info_from_bytes ,
36
+ peer_info_to_bytes ,
35
37
)
36
38
from libp2p .peer .peerstore import (
37
39
PERMANENT_ADDR_TTL ,
@@ -92,6 +94,12 @@ class GossipSub(IPubsubRouter, Service):
92
94
direct_connect_initial_delay : float
93
95
direct_connect_interval : int
94
96
97
+ do_px : bool
98
+ px_peers_count : int
99
+ back_off : dict [str , dict [ID , int ]]
100
+ prune_back_off : int
101
+ unsubscribe_back_off : int
102
+
95
103
def __init__ (
96
104
self ,
97
105
protocols : Sequence [TProtocol ],
@@ -106,6 +114,10 @@ def __init__(
106
114
heartbeat_interval : int = 120 ,
107
115
direct_connect_initial_delay : float = 0.1 ,
108
116
direct_connect_interval : int = 300 ,
117
+ do_px : bool = False ,
118
+ px_peers_count : int = 16 ,
119
+ prune_back_off : int = 60 ,
120
+ unsubscribe_back_off : int = 10 ,
109
121
) -> None :
110
122
self .protocols = list (protocols )
111
123
self .pubsub = None
@@ -140,6 +152,12 @@ def __init__(
140
152
self .direct_connect_initial_delay = direct_connect_initial_delay
141
153
self .time_since_last_publish = {}
142
154
155
+ self .do_px = do_px
156
+ self .px_peers_count = px_peers_count
157
+ self .back_off = dict ()
158
+ self .prune_back_off = prune_back_off
159
+ self .unsubscribe_back_off = unsubscribe_back_off
160
+
143
161
async def run (self ) -> None :
144
162
self .manager .run_daemon_task (self .heartbeat )
145
163
if len (self .direct_peers ) > 0 :
@@ -333,15 +351,22 @@ async def join(self, topic: str) -> None:
333
351
self .mesh [topic ] = set ()
334
352
335
353
topic_in_fanout : bool = topic in self .fanout
336
- fanout_peers : set [ID ] = self .fanout [topic ] if topic_in_fanout else set ()
354
+ fanout_peers : set [ID ] = set ()
355
+
356
+ if topic_in_fanout :
357
+ for peer in self .fanout [topic ]:
358
+ if self ._check_back_off (peer , topic ):
359
+ continue
360
+ fanout_peers .add (peer )
361
+
337
362
fanout_size = len (fanout_peers )
338
363
if not topic_in_fanout or (topic_in_fanout and fanout_size < self .degree ):
339
364
# There are less than D peers (let this number be x)
340
365
# in the fanout for a topic (or the topic is not in the fanout).
341
366
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
342
- if topic in self .pubsub .peer_topics :
367
+ if self . pubsub is not None and topic in self .pubsub .peer_topics :
343
368
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
344
- topic , self .degree - fanout_size , fanout_peers
369
+ topic , self .degree - fanout_size , fanout_peers , True
345
370
)
346
371
# Combine fanout peers with selected peers
347
372
fanout_peers .update (selected_peers )
@@ -368,7 +393,8 @@ async def leave(self, topic: str) -> None:
368
393
return
369
394
# Notify the peers in mesh[topic] with a PRUNE(topic) message
370
395
for peer in self .mesh [topic ]:
371
- await self .emit_prune (topic , peer )
396
+ await self .emit_prune (topic , peer , self .do_px , True )
397
+ self ._add_back_off (peer , topic , True )
372
398
373
399
# Forget mesh[topic]
374
400
self .mesh .pop (topic , None )
@@ -504,7 +530,7 @@ def mesh_heartbeat(
504
530
if num_mesh_peers_in_topic < self .degree_low :
505
531
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
506
532
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
507
- topic , self .degree - num_mesh_peers_in_topic , self .mesh [topic ]
533
+ topic , self .degree - num_mesh_peers_in_topic , self .mesh [topic ], True
508
534
)
509
535
510
536
for peer in selected_peers :
@@ -567,9 +593,7 @@ def _handle_topic_heartbeat(
567
593
if len (in_topic_peers ) < self .degree :
568
594
# Select additional peers from peers.gossipsub[topic]
569
595
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
570
- topic ,
571
- self .degree - len (in_topic_peers ),
572
- in_topic_peers ,
596
+ topic , self .degree - len (in_topic_peers ), in_topic_peers , True
573
597
)
574
598
# Add the selected peers
575
599
in_topic_peers .update (selected_peers )
@@ -580,7 +604,7 @@ def _handle_topic_heartbeat(
580
604
if msg_ids :
581
605
# Select D peers from peers.gossipsub[topic] excluding current peers
582
606
peers_to_emit_ihave_to = self ._get_in_topic_gossipsub_peers_from_minus (
583
- topic , self .degree , current_peers
607
+ topic , self .degree , current_peers , True
584
608
)
585
609
msg_id_strs = [str (msg_id ) for msg_id in msg_ids ]
586
610
for peer in peers_to_emit_ihave_to :
@@ -654,7 +678,11 @@ def select_from_minus(
654
678
return selection
655
679
656
680
def _get_in_topic_gossipsub_peers_from_minus (
657
- self , topic : str , num_to_select : int , minus : Iterable [ID ]
681
+ self ,
682
+ topic : str ,
683
+ num_to_select : int ,
684
+ minus : Iterable [ID ],
685
+ backoff_check : bool = False ,
658
686
) -> list [ID ]:
659
687
if self .pubsub is None :
660
688
raise NoPubsubAttached
@@ -663,8 +691,88 @@ def _get_in_topic_gossipsub_peers_from_minus(
663
691
for peer_id in self .pubsub .peer_topics [topic ]
664
692
if self .peer_protocol [peer_id ] == PROTOCOL_ID
665
693
}
694
+ if backoff_check :
695
+ # filter out peers that are in back off for this topic
696
+ gossipsub_peers_in_topic = {
697
+ peer_id
698
+ for peer_id in gossipsub_peers_in_topic
699
+ if self ._check_back_off (peer_id , topic ) is False
700
+ }
666
701
return self .select_from_minus (num_to_select , gossipsub_peers_in_topic , minus )
667
702
703
+ def _add_back_off (
704
+ self , peer : ID , topic : str , is_unsubscribe : bool , backoff_duration : int = 0
705
+ ) -> None :
706
+ """
707
+ Add back off for a peer in a topic.
708
+ :param peer: peer to add back off for
709
+ :param topic: topic to add back off for
710
+ :param is_unsubscribe: whether this is an unsubscribe operation
711
+ :param backoff_duration: duration of back off in seconds, if 0, use default
712
+ """
713
+ if topic not in self .back_off :
714
+ self .back_off [topic ] = dict ()
715
+
716
+ backoff_till = int (time .time ())
717
+ if backoff_duration > 0 :
718
+ backoff_till += backoff_duration
719
+ else :
720
+ if is_unsubscribe :
721
+ backoff_till += self .unsubscribe_back_off
722
+ else :
723
+ backoff_till += self .prune_back_off
724
+
725
+ if peer not in self .back_off [topic ]:
726
+ self .back_off [topic ][peer ] = backoff_till
727
+ else :
728
+ self .back_off [topic ][peer ] = max (self .back_off [topic ][peer ], backoff_till )
729
+
730
+ def _check_back_off (self , peer : ID , topic : str ) -> bool :
731
+ """
732
+ Check if a peer is in back off for a topic and cleanup expired back off entries.
733
+ :param peer: peer to check
734
+ :param topic: topic to check
735
+ :return: True if the peer is in back off, False otherwise
736
+ """
737
+ if topic not in self .back_off or peer not in self .back_off [topic ]:
738
+ return False
739
+ if self .back_off [topic ].get (peer , 0 ) > int (time .time ()):
740
+ return True
741
+ else :
742
+ del self .back_off [topic ][peer ]
743
+ return False
744
+
745
+ async def _do_px (self , px_peers : list [rpc_pb2 .PeerInfo ]) -> None :
746
+ if len (px_peers ) > self .px_peers_count :
747
+ px_peers = px_peers [: self .px_peers_count ]
748
+
749
+ for peer in px_peers :
750
+ peer_id : ID = ID (peer .peerID )
751
+
752
+ if self .pubsub and peer_id in self .pubsub .peers :
753
+ continue
754
+
755
+ try :
756
+ peer_info = peer_info_from_bytes (peer .signedPeerRecord )
757
+ try :
758
+ if self .pubsub is None :
759
+ raise NoPubsubAttached
760
+ await self .pubsub .host .connect (peer_info )
761
+ except Exception as e :
762
+ logger .warning (
763
+ "failed to connect to px peer %s: %s" ,
764
+ peer_id ,
765
+ e ,
766
+ )
767
+ continue
768
+ except Exception as e :
769
+ logger .warning (
770
+ "failed to parse peer info from px peer %s: %s" ,
771
+ peer_id ,
772
+ e ,
773
+ )
774
+ continue
775
+
668
776
# RPC handlers
669
777
670
778
async def handle_ihave (
@@ -757,24 +865,46 @@ async def handle_graft(
757
865
logger .warning (
758
866
"GRAFT: ignoring request from direct peer %s" , sender_peer_id
759
867
)
760
- await self .emit_prune (topic , sender_peer_id )
868
+ await self .emit_prune (topic , sender_peer_id , False , False )
761
869
return
762
870
871
+ if self ._check_back_off (sender_peer_id , topic ):
872
+ logger .warning (
873
+ "GRAFT: ignoring request from %s, back off until %d" ,
874
+ sender_peer_id ,
875
+ self .back_off [topic ][sender_peer_id ],
876
+ )
877
+ self ._add_back_off (sender_peer_id , topic , False )
878
+ await self .emit_prune (topic , sender_peer_id , False , False )
879
+ return
880
+
763
881
if sender_peer_id not in self .mesh [topic ]:
764
882
self .mesh [topic ].add (sender_peer_id )
765
883
else :
766
884
# Respond with PRUNE if not subscribed to the topic
767
- await self .emit_prune (topic , sender_peer_id )
885
+ await self .emit_prune (topic , sender_peer_id , self . do_px , False )
768
886
769
887
async def handle_prune (
770
888
self , prune_msg : rpc_pb2 .ControlPrune , sender_peer_id : ID
771
889
) -> None :
772
890
topic : str = prune_msg .topicID
891
+ backoff_till : int = prune_msg .backoff
892
+ px_peers : list [rpc_pb2 .PeerInfo ] = []
893
+ for peer in prune_msg .peers :
894
+ px_peers .append (peer )
773
895
774
896
# Remove peer from mesh for topic
775
897
if topic in self .mesh :
898
+ if backoff_till > 0 :
899
+ self ._add_back_off (sender_peer_id , topic , False , backoff_till )
900
+ else :
901
+ self ._add_back_off (sender_peer_id , topic , False )
902
+
776
903
self .mesh [topic ].discard (sender_peer_id )
777
904
905
+ if px_peers :
906
+ await self ._do_px (px_peers )
907
+
778
908
# RPC emitters
779
909
780
910
def pack_control_msgs (
@@ -823,15 +953,36 @@ async def emit_graft(self, topic: str, id: ID) -> None:
823
953
824
954
await self .emit_control_message (control_msg , id )
825
955
826
- async def emit_prune (self , topic : str , id : ID ) -> None :
956
+ async def emit_prune (
957
+ self , topic : str , to_peer : ID , do_px : bool , is_unsubscribe : bool
958
+ ) -> None :
827
959
"""Emit graft message, sent to to_peer, for topic."""
828
960
prune_msg : rpc_pb2 .ControlPrune = rpc_pb2 .ControlPrune ()
829
961
prune_msg .topicID = topic
830
962
963
+ back_off_duration = self .prune_back_off
964
+ if is_unsubscribe :
965
+ back_off_duration = self .unsubscribe_back_off
966
+
967
+ prune_msg .backoff = back_off_duration
968
+
969
+ if do_px :
970
+ exchange_peers = self ._get_in_topic_gossipsub_peers_from_minus (
971
+ topic , self .px_peers_count , [to_peer ]
972
+ )
973
+ for peer in exchange_peers :
974
+ if self .pubsub is None :
975
+ raise NoPubsubAttached
976
+ peer_info = self .pubsub .host .get_peerstore ().peer_info (peer )
977
+ signed_peer_record : rpc_pb2 .PeerInfo = rpc_pb2 .PeerInfo ()
978
+ signed_peer_record .peerID = peer .to_bytes ()
979
+ signed_peer_record .signedPeerRecord = peer_info_to_bytes (peer_info )
980
+ prune_msg .peers .append (signed_peer_record )
981
+
831
982
control_msg : rpc_pb2 .ControlMessage = rpc_pb2 .ControlMessage ()
832
983
control_msg .prune .extend ([prune_msg ])
833
984
834
- await self .emit_control_message (control_msg , id )
985
+ await self .emit_control_message (control_msg , to_peer )
835
986
836
987
async def emit_control_message (
837
988
self , control_msg : rpc_pb2 .ControlMessage , to_peer : ID
0 commit comments