32
32
)
33
33
from libp2p .peer .peerinfo import (
34
34
PeerInfo ,
35
+ peer_info_from_bytes ,
36
+ peer_info_to_bytes ,
35
37
)
36
38
from libp2p .peer .peerstore import (
37
39
PERMANENT_ADDR_TTL ,
@@ -93,6 +95,7 @@ class GossipSub(IPubsubRouter, Service):
93
95
direct_connect_interval : int
94
96
95
97
do_px : bool
98
+ px_peers_count : int
96
99
back_off : dict [str , dict [ID , int ]]
97
100
prune_back_off : int
98
101
unsubscribe_back_off : int
@@ -112,6 +115,7 @@ def __init__(
112
115
direct_connect_initial_delay : float = 0.1 ,
113
116
direct_connect_interval : int = 300 ,
114
117
do_px : bool = False ,
118
+ px_peers_count : int = 16 ,
115
119
prune_back_off : int = 60 ,
116
120
unsubscribe_back_off : int = 10 ,
117
121
) -> None :
@@ -149,6 +153,7 @@ def __init__(
149
153
self .time_since_last_publish = {}
150
154
151
155
self .do_px = do_px
156
+ self .px_peers_count = px_peers_count
152
157
self .back_off = dict ()
153
158
self .prune_back_off = prune_back_off
154
159
self .unsubscribe_back_off = unsubscribe_back_off
@@ -737,6 +742,37 @@ def _check_back_off(self, peer: ID, topic: str) -> bool:
737
742
del self .back_off [topic ][peer ]
738
743
return False
739
744
745
+ async def _do_px (self , px_peers : list [rpc_pb2 .PeerInfo ]) -> None :
746
+ if len (px_peers ) > self .px_peers_count :
747
+ px_peers = px_peers [: self .px_peers_count ]
748
+
749
+ for peer in px_peers :
750
+ peer_id : ID = ID (peer .peerID )
751
+
752
+ if self .pubsub and peer_id in self .pubsub .peers :
753
+ continue
754
+
755
+ try :
756
+ peer_info = peer_info_from_bytes (peer .signedPeerRecord )
757
+ try :
758
+ if self .pubsub is None :
759
+ raise NoPubsubAttached
760
+ await self .pubsub .host .connect (peer_info )
761
+ except Exception as e :
762
+ logger .warning (
763
+ "failed to connect to px peer %s: %s" ,
764
+ peer_id ,
765
+ e ,
766
+ )
767
+ continue
768
+ except Exception as e :
769
+ logger .warning (
770
+ "failed to parse peer info from px peer %s: %s" ,
771
+ peer_id ,
772
+ e ,
773
+ )
774
+ continue
775
+
740
776
# RPC handlers
741
777
742
778
async def handle_ihave (
@@ -853,6 +889,9 @@ async def handle_prune(
853
889
) -> None :
854
890
topic : str = prune_msg .topicID
855
891
backoff_till : int = prune_msg .backoff
892
+ px_peers : list [rpc_pb2 .PeerInfo ] = []
893
+ for peer in prune_msg .peers :
894
+ px_peers .append (peer )
856
895
857
896
# Remove peer from mesh for topic
858
897
if topic in self .mesh :
@@ -863,6 +902,9 @@ async def handle_prune(
863
902
864
903
self .mesh [topic ].discard (sender_peer_id )
865
904
905
+ if px_peers :
906
+ await self ._do_px (px_peers )
907
+
866
908
# RPC emitters
867
909
868
910
def pack_control_msgs (
@@ -924,8 +966,18 @@ async def emit_prune(
924
966
925
967
prune_msg .backoff = back_off_duration
926
968
927
- # TODO: add peers once peerstore changes are complete
928
- # prune_msg.peers =
969
+ if do_px :
970
+ exchange_peers = self ._get_in_topic_gossipsub_peers_from_minus (
971
+ topic , self .px_peers_count , [to_peer ]
972
+ )
973
+ for peer in exchange_peers :
974
+ if self .pubsub is None :
975
+ raise NoPubsubAttached
976
+ peer_info = self .pubsub .host .get_peerstore ().peer_info (peer )
977
+ signed_peer_record : rpc_pb2 .PeerInfo = rpc_pb2 .PeerInfo ()
978
+ signed_peer_record .peerID = peer .to_bytes ()
979
+ signed_peer_record .signedPeerRecord = peer_info_to_bytes (peer_info )
980
+ prune_msg .peers .append (signed_peer_record )
929
981
930
982
control_msg : rpc_pb2 .ControlMessage = rpc_pb2 .ControlMessage ()
931
983
control_msg .prune .extend ([prune_msg ])
0 commit comments