32
32
)
33
33
from libp2p .peer .peerinfo import (
34
34
PeerInfo ,
35
+ peer_info_from_bytes ,
36
+ peer_info_to_bytes ,
35
37
)
36
38
from libp2p .peer .peerstore import (
37
39
PERMANENT_ADDR_TTL ,
@@ -92,6 +94,12 @@ class GossipSub(IPubsubRouter, Service):
92
94
direct_connect_initial_delay : float
93
95
direct_connect_interval : int
94
96
97
+ do_px : bool
98
+ px_peers_count : int
99
+ back_off : dict [str , dict [ID , int ]]
100
+ prune_back_off : int
101
+ unsubscribe_back_off : int
102
+
95
103
def __init__ (
96
104
self ,
97
105
protocols : Sequence [TProtocol ],
@@ -106,6 +114,10 @@ def __init__(
106
114
heartbeat_interval : int = 120 ,
107
115
direct_connect_initial_delay : float = 0.1 ,
108
116
direct_connect_interval : int = 300 ,
117
+ do_px : bool = False ,
118
+ px_peers_count : int = 16 ,
119
+ prune_back_off : int = 60 ,
120
+ unsubscribe_back_off : int = 10 ,
109
121
) -> None :
110
122
self .protocols = list (protocols )
111
123
self .pubsub = None
@@ -140,6 +152,12 @@ def __init__(
140
152
self .direct_connect_initial_delay = direct_connect_initial_delay
141
153
self .time_since_last_publish = {}
142
154
155
+ self .do_px = do_px
156
+ self .px_peers_count = px_peers_count
157
+ self .back_off = dict ()
158
+ self .prune_back_off = prune_back_off
159
+ self .unsubscribe_back_off = unsubscribe_back_off
160
+
143
161
async def run (self ) -> None :
144
162
self .manager .run_daemon_task (self .heartbeat )
145
163
if len (self .direct_peers ) > 0 :
@@ -251,7 +269,6 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
251
269
stream = self .pubsub .peers [peer_id ]
252
270
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
253
271
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
254
- # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
255
272
try :
256
273
await stream .write (encode_varint_prefixed (rpc_msg .SerializeToString ()))
257
274
except StreamClosed :
@@ -334,15 +351,22 @@ async def join(self, topic: str) -> None:
334
351
self .mesh [topic ] = set ()
335
352
336
353
topic_in_fanout : bool = topic in self .fanout
337
- fanout_peers : set [ID ] = self .fanout [topic ] if topic_in_fanout else set ()
354
+ fanout_peers : set [ID ] = set ()
355
+
356
+ if topic_in_fanout :
357
+ for peer in self .fanout [topic ]:
358
+ if self ._check_back_off (peer , topic ):
359
+ continue
360
+ fanout_peers .add (peer )
361
+
338
362
fanout_size = len (fanout_peers )
339
363
if not topic_in_fanout or (topic_in_fanout and fanout_size < self .degree ):
340
364
# There are less than D peers (let this number be x)
341
365
# in the fanout for a topic (or the topic is not in the fanout).
342
366
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
343
- if topic in self .pubsub .peer_topics :
367
+ if self . pubsub is not None and topic in self .pubsub .peer_topics :
344
368
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
345
- topic , self .degree - fanout_size , fanout_peers
369
+ topic , self .degree - fanout_size , fanout_peers , True
346
370
)
347
371
# Combine fanout peers with selected peers
348
372
fanout_peers .update (selected_peers )
@@ -369,7 +393,8 @@ async def leave(self, topic: str) -> None:
369
393
return
370
394
# Notify the peers in mesh[topic] with a PRUNE(topic) message
371
395
for peer in self .mesh [topic ]:
372
- await self .emit_prune (topic , peer )
396
+ await self .emit_prune (topic , peer , self .do_px , True )
397
+ self ._add_back_off (peer , topic , True )
373
398
374
399
# Forget mesh[topic]
375
400
self .mesh .pop (topic , None )
@@ -459,8 +484,8 @@ async def heartbeat(self) -> None:
459
484
self .fanout_heartbeat ()
460
485
# Get the peers to send IHAVE to
461
486
peers_to_gossip = self .gossip_heartbeat ()
462
- # Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and
463
- # send it
487
+ # Pack(piggyback) GRAFT, PRUNE and IHAVE for the same peer into
488
+ # one control message and send it
464
489
await self ._emit_control_msgs (
465
490
peers_to_graft , peers_to_prune , peers_to_gossip
466
491
)
@@ -505,7 +530,7 @@ def mesh_heartbeat(
505
530
if num_mesh_peers_in_topic < self .degree_low :
506
531
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # noqa: E501
507
532
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
508
- topic , self .degree - num_mesh_peers_in_topic , self .mesh [topic ]
533
+ topic , self .degree - num_mesh_peers_in_topic , self .mesh [topic ], True
509
534
)
510
535
511
536
for peer in selected_peers :
@@ -568,9 +593,7 @@ def _handle_topic_heartbeat(
568
593
if len (in_topic_peers ) < self .degree :
569
594
# Select additional peers from peers.gossipsub[topic]
570
595
selected_peers = self ._get_in_topic_gossipsub_peers_from_minus (
571
- topic ,
572
- self .degree - len (in_topic_peers ),
573
- in_topic_peers ,
596
+ topic , self .degree - len (in_topic_peers ), in_topic_peers , True
574
597
)
575
598
# Add the selected peers
576
599
in_topic_peers .update (selected_peers )
@@ -581,7 +604,7 @@ def _handle_topic_heartbeat(
581
604
if msg_ids :
582
605
# Select D peers from peers.gossipsub[topic] excluding current peers
583
606
peers_to_emit_ihave_to = self ._get_in_topic_gossipsub_peers_from_minus (
584
- topic , self .degree , current_peers
607
+ topic , self .degree , current_peers , True
585
608
)
586
609
msg_id_strs = [str (msg_id ) for msg_id in msg_ids ]
587
610
for peer in peers_to_emit_ihave_to :
@@ -655,7 +678,11 @@ def select_from_minus(
655
678
return selection
656
679
657
680
def _get_in_topic_gossipsub_peers_from_minus (
658
- self , topic : str , num_to_select : int , minus : Iterable [ID ]
681
+ self ,
682
+ topic : str ,
683
+ num_to_select : int ,
684
+ minus : Iterable [ID ],
685
+ backoff_check : bool = False ,
659
686
) -> list [ID ]:
660
687
if self .pubsub is None :
661
688
raise NoPubsubAttached
@@ -664,8 +691,88 @@ def _get_in_topic_gossipsub_peers_from_minus(
664
691
for peer_id in self .pubsub .peer_topics [topic ]
665
692
if self .peer_protocol [peer_id ] == PROTOCOL_ID
666
693
}
694
+ if backoff_check :
695
+ # filter out peers that are in back off for this topic
696
+ gossipsub_peers_in_topic = {
697
+ peer_id
698
+ for peer_id in gossipsub_peers_in_topic
699
+ if self ._check_back_off (peer_id , topic ) is False
700
+ }
667
701
return self .select_from_minus (num_to_select , gossipsub_peers_in_topic , minus )
668
702
703
+ def _add_back_off (
704
+ self , peer : ID , topic : str , is_unsubscribe : bool , backoff_duration : int = 0
705
+ ) -> None :
706
+ """
707
+ Add back off for a peer in a topic.
708
+ :param peer: peer to add back off for
709
+ :param topic: topic to add back off for
710
+ :param is_unsubscribe: whether this is an unsubscribe operation
711
+ :param backoff_duration: duration of back off in seconds, if 0, use default
712
+ """
713
+ if topic not in self .back_off :
714
+ self .back_off [topic ] = dict ()
715
+
716
+ backoff_till = int (time .time ())
717
+ if backoff_duration > 0 :
718
+ backoff_till += backoff_duration
719
+ else :
720
+ if is_unsubscribe :
721
+ backoff_till += self .unsubscribe_back_off
722
+ else :
723
+ backoff_till += self .prune_back_off
724
+
725
+ if peer not in self .back_off [topic ]:
726
+ self .back_off [topic ][peer ] = backoff_till
727
+ else :
728
+ self .back_off [topic ][peer ] = max (self .back_off [topic ][peer ], backoff_till )
729
+
730
+ def _check_back_off (self , peer : ID , topic : str ) -> bool :
731
+ """
732
+ Check if a peer is in back off for a topic and cleanup expired back off entries.
733
+ :param peer: peer to check
734
+ :param topic: topic to check
735
+ :return: True if the peer is in back off, False otherwise
736
+ """
737
+ if topic not in self .back_off or peer not in self .back_off [topic ]:
738
+ return False
739
+ if self .back_off [topic ].get (peer , 0 ) > int (time .time ()):
740
+ return True
741
+ else :
742
+ del self .back_off [topic ][peer ]
743
+ return False
744
+
745
+ async def _do_px (self , px_peers : list [rpc_pb2 .PeerInfo ]) -> None :
746
+ if len (px_peers ) > self .px_peers_count :
747
+ px_peers = px_peers [: self .px_peers_count ]
748
+
749
+ for peer in px_peers :
750
+ peer_id : ID = ID (peer .peerID )
751
+
752
+ if self .pubsub and peer_id in self .pubsub .peers :
753
+ continue
754
+
755
+ try :
756
+ peer_info = peer_info_from_bytes (peer .signedPeerRecord )
757
+ try :
758
+ if self .pubsub is None :
759
+ raise NoPubsubAttached
760
+ await self .pubsub .host .connect (peer_info )
761
+ except Exception as e :
762
+ logger .warning (
763
+ "failed to connect to px peer %s: %s" ,
764
+ peer_id ,
765
+ e ,
766
+ )
767
+ continue
768
+ except Exception as e :
769
+ logger .warning (
770
+ "failed to parse peer info from px peer %s: %s" ,
771
+ peer_id ,
772
+ e ,
773
+ )
774
+ continue
775
+
669
776
# RPC handlers
670
777
671
778
async def handle_ihave (
@@ -758,24 +865,46 @@ async def handle_graft(
758
865
logger .warning (
759
866
"GRAFT: ignoring request from direct peer %s" , sender_peer_id
760
867
)
761
- await self .emit_prune (topic , sender_peer_id )
868
+ await self .emit_prune (topic , sender_peer_id , False , False )
762
869
return
763
870
871
+ if self ._check_back_off (sender_peer_id , topic ):
872
+ logger .warning (
873
+ "GRAFT: ignoring request from %s, back off until %d" ,
874
+ sender_peer_id ,
875
+ self .back_off [topic ][sender_peer_id ],
876
+ )
877
+ self ._add_back_off (sender_peer_id , topic , False )
878
+ await self .emit_prune (topic , sender_peer_id , False , False )
879
+ return
880
+
764
881
if sender_peer_id not in self .mesh [topic ]:
765
882
self .mesh [topic ].add (sender_peer_id )
766
883
else :
767
884
# Respond with PRUNE if not subscribed to the topic
768
- await self .emit_prune (topic , sender_peer_id )
885
+ await self .emit_prune (topic , sender_peer_id , self . do_px , False )
769
886
770
887
async def handle_prune (
771
888
self , prune_msg : rpc_pb2 .ControlPrune , sender_peer_id : ID
772
889
) -> None :
773
890
topic : str = prune_msg .topicID
891
+ backoff_till : int = prune_msg .backoff
892
+ px_peers : list [rpc_pb2 .PeerInfo ] = []
893
+ for peer in prune_msg .peers :
894
+ px_peers .append (peer )
774
895
775
896
# Remove peer from mesh for topic
776
897
if topic in self .mesh :
898
+ if backoff_till > 0 :
899
+ self ._add_back_off (sender_peer_id , topic , False , backoff_till )
900
+ else :
901
+ self ._add_back_off (sender_peer_id , topic , False )
902
+
777
903
self .mesh [topic ].discard (sender_peer_id )
778
904
905
+ if px_peers :
906
+ await self ._do_px (px_peers )
907
+
779
908
# RPC emitters
780
909
781
910
def pack_control_msgs (
@@ -824,15 +953,36 @@ async def emit_graft(self, topic: str, id: ID) -> None:
824
953
825
954
await self .emit_control_message (control_msg , id )
826
955
827
- async def emit_prune (self , topic : str , id : ID ) -> None :
956
+ async def emit_prune (
957
+ self , topic : str , to_peer : ID , do_px : bool , is_unsubscribe : bool
958
+ ) -> None :
828
959
"""Emit graft message, sent to to_peer, for topic."""
829
960
prune_msg : rpc_pb2 .ControlPrune = rpc_pb2 .ControlPrune ()
830
961
prune_msg .topicID = topic
831
962
963
+ back_off_duration = self .prune_back_off
964
+ if is_unsubscribe :
965
+ back_off_duration = self .unsubscribe_back_off
966
+
967
+ prune_msg .backoff = back_off_duration
968
+
969
+ if do_px :
970
+ exchange_peers = self ._get_in_topic_gossipsub_peers_from_minus (
971
+ topic , self .px_peers_count , [to_peer ]
972
+ )
973
+ for peer in exchange_peers :
974
+ if self .pubsub is None :
975
+ raise NoPubsubAttached
976
+ peer_info = self .pubsub .host .get_peerstore ().peer_info (peer )
977
+ signed_peer_record : rpc_pb2 .PeerInfo = rpc_pb2 .PeerInfo ()
978
+ signed_peer_record .peerID = peer .to_bytes ()
979
+ signed_peer_record .signedPeerRecord = peer_info_to_bytes (peer_info )
980
+ prune_msg .peers .append (signed_peer_record )
981
+
832
982
control_msg : rpc_pb2 .ControlMessage = rpc_pb2 .ControlMessage ()
833
983
control_msg .prune .extend ([prune_msg ])
834
984
835
- await self .emit_control_message (control_msg , id )
985
+ await self .emit_control_message (control_msg , to_peer )
836
986
837
987
async def emit_control_message (
838
988
self , control_msg : rpc_pb2 .ControlMessage , to_peer : ID
0 commit comments