32
32
)
33
33
from libp2p .peer .peerinfo import (
34
34
PeerInfo ,
35
+ peer_info_from_bytes ,
36
+ peer_info_to_bytes ,
35
37
)
36
38
from libp2p .peer .peerstore import (
37
39
PERMANENT_ADDR_TTL ,
@@ -92,6 +94,12 @@ class GossipSub(IPubsubRouter, Service):
92
94
direct_connect_initial_delay : float
93
95
direct_connect_interval : int
94
96
97
+ do_px : bool
98
+ px_peers_count : int
99
+ back_off : dict [str , dict [ID , int ]]
100
+ prune_back_off : int
101
+ unsubscribe_back_off : int
102
+
95
103
def __init__ (
96
104
self ,
97
105
protocols : Sequence [TProtocol ],
@@ -106,6 +114,10 @@ def __init__(
106
114
heartbeat_interval : int = 120 ,
107
115
direct_connect_initial_delay : float = 0.1 ,
108
116
direct_connect_interval : int = 300 ,
117
+ do_px : bool = False ,
118
+ px_peers_count : int = 16 ,
119
+ prune_back_off : int = 60 ,
120
+ unsubscribe_back_off : int = 10 ,
109
121
) -> None :
110
122
self .protocols = list (protocols )
111
123
self .pubsub = None
@@ -140,6 +152,12 @@ def __init__(
140
152
self .direct_connect_initial_delay = direct_connect_initial_delay
141
153
self .time_since_last_publish = {}
142
154
155
+ self .do_px = do_px
156
+ self .px_peers_count = px_peers_count
157
+ self .back_off = dict ()
158
+ self .prune_back_off = prune_back_off
159
+ self .unsubscribe_back_off = unsubscribe_back_off
160
+
143
161
async def run (self ) -> None :
144
162
self .manager .run_daemon_task (self .heartbeat )
145
163
if len (self .direct_peers ) > 0 :
@@ -334,15 +352,22 @@ async def join(self, topic: str) -> None:
334
352
self .mesh [topic ] = set ()
335
353
336
354
topic_in_fanout : bool = topic in self .fanout
337
- fanout_peers : set [ID ] = self .fanout [topic ] if topic_in_fanout else set ()
355
+ fanout_peers : set [ID ] = set ()
356
+
357
+ if topic_in_fanout :
358
+ for peer in self .fanout [topic ]:
359
+ if self ._check_back_off (peer , topic ):
360
+ continue
361
+ fanout_peers .add (peer )
362
+
338
363
fanout_size = len (fanout_peers )
339
364
if not topic_in_fanout or (topic_in_fanout and fanout_size < self .degree ):
340
365
# There are less than D peers (let this number be x)
341
366
# in the fanout for a topic (or the topic is not in the fanout).
342
367
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
343
- if topic in self .pubsub .peer_topics :
368
+ if self . pubsub is not None and topic in self .pubsub .peer_topics :
344
369
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
345
- topic , self .degree - fanout_size , fanout_peers
370
+ topic , self .degree - fanout_size , fanout_peers , True
346
371
)
347
372
# Combine fanout peers with selected peers
348
373
fanout_peers .update (selected_peers )
@@ -369,7 +394,8 @@ async def leave(self, topic: str) -> None:
369
394
return
370
395
# Notify the peers in mesh[topic] with a PRUNE(topic) message
371
396
for peer in self .mesh [topic ]:
372
- await self .emit_prune (topic , peer )
397
+ await self .emit_prune (topic , peer , self .do_px , True )
398
+ self ._add_back_off (peer , topic , True )
373
399
374
400
# Forget mesh[topic]
375
401
self .mesh .pop (topic , None )
@@ -505,7 +531,7 @@ def mesh_heartbeat(
505
531
if num_mesh_peers_in_topic < self .degree_low :
506
532
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
507
533
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
508
- topic , self .degree - num_mesh_peers_in_topic , self .mesh [topic ]
534
+ topic , self .degree - num_mesh_peers_in_topic , self .mesh [topic ], True
509
535
)
510
536
511
537
for peer in selected_peers :
@@ -568,9 +594,7 @@ def _handle_topic_heartbeat(
568
594
if len (in_topic_peers ) < self .degree :
569
595
# Select additional peers from peers.gossipsub[topic]
570
596
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
571
- topic ,
572
- self .degree - len (in_topic_peers ),
573
- in_topic_peers ,
597
+ topic , self .degree - len (in_topic_peers ), in_topic_peers , True
574
598
)
575
599
# Add the selected peers
576
600
in_topic_peers .update (selected_peers )
@@ -581,7 +605,7 @@ def _handle_topic_heartbeat(
581
605
if msg_ids :
582
606
# Select D peers from peers.gossipsub[topic] excluding current peers
583
607
peers_to_emit_ihave_to = self ._get_in_topic_gossipsub_peers_from_minus (
584
- topic , self .degree , current_peers
608
+ topic , self .degree , current_peers , True
585
609
)
586
610
msg_id_strs = [str (msg_id ) for msg_id in msg_ids ]
587
611
for peer in peers_to_emit_ihave_to :
@@ -655,7 +679,11 @@ def select_from_minus(
655
679
return selection
656
680
657
681
def _get_in_topic_gossipsub_peers_from_minus (
658
- self , topic : str , num_to_select : int , minus : Iterable [ID ]
682
+ self ,
683
+ topic : str ,
684
+ num_to_select : int ,
685
+ minus : Iterable [ID ],
686
+ backoff_check : bool = False ,
659
687
) -> list [ID ]:
660
688
if self .pubsub is None :
661
689
raise NoPubsubAttached
@@ -664,8 +692,88 @@ def _get_in_topic_gossipsub_peers_from_minus(
664
692
for peer_id in self .pubsub .peer_topics [topic ]
665
693
if self .peer_protocol [peer_id ] == PROTOCOL_ID
666
694
}
695
+ if backoff_check :
696
+ # filter out peers that are in back off for this topic
697
+ gossipsub_peers_in_topic = {
698
+ peer_id
699
+ for peer_id in gossipsub_peers_in_topic
700
+ if self ._check_back_off (peer_id , topic ) is False
701
+ }
667
702
return self .select_from_minus (num_to_select , gossipsub_peers_in_topic , minus )
668
703
704
+ def _add_back_off (
705
+ self , peer : ID , topic : str , is_unsubscribe : bool , backoff_duration : int = 0
706
+ ) -> None :
707
+ """
708
+ Add back off for a peer in a topic.
709
+ :param peer: peer to add back off for
710
+ :param topic: topic to add back off for
711
+ :param is_unsubscribe: whether this is an unsubscribe operation
712
+ :param backoff_duration: duration of back off in seconds, if 0, use default
713
+ """
714
+ if topic not in self .back_off :
715
+ self .back_off [topic ] = dict ()
716
+
717
+ backoff_till = int (time .time ())
718
+ if backoff_duration > 0 :
719
+ backoff_till += backoff_duration
720
+ else :
721
+ if is_unsubscribe :
722
+ backoff_till += self .unsubscribe_back_off
723
+ else :
724
+ backoff_till += self .prune_back_off
725
+
726
+ if peer not in self .back_off [topic ]:
727
+ self .back_off [topic ][peer ] = backoff_till
728
+ else :
729
+ self .back_off [topic ][peer ] = max (self .back_off [topic ][peer ], backoff_till )
730
+
731
+ def _check_back_off (self , peer : ID , topic : str ) -> bool :
732
+ """
733
+ Check if a peer is in back off for a topic and cleanup expired back off entries.
734
+ :param peer: peer to check
735
+ :param topic: topic to check
736
+ :return: True if the peer is in back off, False otherwise
737
+ """
738
+ if topic not in self .back_off or peer not in self .back_off [topic ]:
739
+ return False
740
+ if self .back_off [topic ].get (peer , 0 ) > int (time .time ()):
741
+ return True
742
+ else :
743
+ del self .back_off [topic ][peer ]
744
+ return False
745
+
746
+ async def _do_px (self , px_peers : list [rpc_pb2 .PeerInfo ]) -> None :
747
+ if len (px_peers ) > self .px_peers_count :
748
+ px_peers = px_peers [: self .px_peers_count ]
749
+
750
+ for peer in px_peers :
751
+ peer_id : ID = ID (peer .peerID )
752
+
753
+ if self .pubsub and peer_id in self .pubsub .peers :
754
+ continue
755
+
756
+ try :
757
+ peer_info = peer_info_from_bytes (peer .signedPeerRecord )
758
+ try :
759
+ if self .pubsub is None :
760
+ raise NoPubsubAttached
761
+ await self .pubsub .host .connect (peer_info )
762
+ except Exception as e :
763
+ logger .warning (
764
+ "failed to connect to px peer %s: %s" ,
765
+ peer_id ,
766
+ e ,
767
+ )
768
+ continue
769
+ except Exception as e :
770
+ logger .warning (
771
+ "failed to parse peer info from px peer %s: %s" ,
772
+ peer_id ,
773
+ e ,
774
+ )
775
+ continue
776
+
669
777
# RPC handlers
670
778
671
779
async def handle_ihave (
@@ -758,24 +866,46 @@ async def handle_graft(
758
866
logger .warning (
759
867
"GRAFT: ignoring request from direct peer %s" , sender_peer_id
760
868
)
761
- await self .emit_prune (topic , sender_peer_id )
869
+ await self .emit_prune (topic , sender_peer_id , False , False )
762
870
return
763
871
872
+ if self ._check_back_off (sender_peer_id , topic ):
873
+ logger .warning (
874
+ "GRAFT: ignoring request from %s, back off until %d" ,
875
+ sender_peer_id ,
876
+ self .back_off [topic ][sender_peer_id ],
877
+ )
878
+ self ._add_back_off (sender_peer_id , topic , False )
879
+ await self .emit_prune (topic , sender_peer_id , False , False )
880
+ return
881
+
764
882
if sender_peer_id not in self .mesh [topic ]:
765
883
self .mesh [topic ].add (sender_peer_id )
766
884
else :
767
885
# Respond with PRUNE if not subscribed to the topic
768
- await self .emit_prune (topic , sender_peer_id )
886
+ await self .emit_prune (topic , sender_peer_id , self . do_px , False )
769
887
770
888
async def handle_prune (
771
889
self , prune_msg : rpc_pb2 .ControlPrune , sender_peer_id : ID
772
890
) -> None :
773
891
topic : str = prune_msg .topicID
892
+ backoff_till : int = prune_msg .backoff
893
+ px_peers : list [rpc_pb2 .PeerInfo ] = []
894
+ for peer in prune_msg .peers :
895
+ px_peers .append (peer )
774
896
775
897
# Remove peer from mesh for topic
776
898
if topic in self .mesh :
899
+ if backoff_till > 0 :
900
+ self ._add_back_off (sender_peer_id , topic , False , backoff_till )
901
+ else :
902
+ self ._add_back_off (sender_peer_id , topic , False )
903
+
777
904
self .mesh [topic ].discard (sender_peer_id )
778
905
906
+ if px_peers :
907
+ await self ._do_px (px_peers )
908
+
779
909
# RPC emitters
780
910
781
911
def pack_control_msgs (
@@ -824,15 +954,36 @@ async def emit_graft(self, topic: str, id: ID) -> None:
824
954
825
955
await self .emit_control_message (control_msg , id )
826
956
827
- async def emit_prune (self , topic : str , id : ID ) -> None :
957
+ async def emit_prune (
958
+ self , topic : str , to_peer : ID , do_px : bool , is_unsubscribe : bool
959
+ ) -> None :
828
960
"""Emit graft message, sent to to_peer, for topic."""
829
961
prune_msg : rpc_pb2 .ControlPrune = rpc_pb2 .ControlPrune ()
830
962
prune_msg .topicID = topic
831
963
964
+ back_off_duration = self .prune_back_off
965
+ if is_unsubscribe :
966
+ back_off_duration = self .unsubscribe_back_off
967
+
968
+ prune_msg .backoff = back_off_duration
969
+
970
+ if do_px :
971
+ exchange_peers = self ._get_in_topic_gossipsub_peers_from_minus (
972
+ topic , self .px_peers_count , [to_peer ]
973
+ )
974
+ for peer in exchange_peers :
975
+ if self .pubsub is None :
976
+ raise NoPubsubAttached
977
+ peer_info = self .pubsub .host .get_peerstore ().peer_info (peer )
978
+ signed_peer_record : rpc_pb2 .PeerInfo = rpc_pb2 .PeerInfo ()
979
+ signed_peer_record .peerID = peer .to_bytes ()
980
+ signed_peer_record .signedPeerRecord = peer_info_to_bytes (peer_info )
981
+ prune_msg .peers .append (signed_peer_record )
982
+
832
983
control_msg : rpc_pb2 .ControlMessage = rpc_pb2 .ControlMessage ()
833
984
control_msg .prune .extend ([prune_msg ])
834
985
835
- await self .emit_control_message (control_msg , id )
986
+ await self .emit_control_message (control_msg , to_peer )
836
987
837
988
async def emit_control_message (
838
989
self , control_msg : rpc_pb2 .ControlMessage , to_peer : ID
0 commit comments