@@ -92,6 +92,10 @@ class GossipSub(IPubsubRouter, Service):
92
92
direct_connect_initial_delay : float
93
93
direct_connect_interval : int
94
94
95
+ do_px : bool
96
+ back_off : int
97
+ unsubscribe_back_off : int
98
+
95
99
def __init__ (
96
100
self ,
97
101
protocols : Sequence [TProtocol ],
@@ -106,6 +110,9 @@ def __init__(
106
110
heartbeat_interval : int = 120 ,
107
111
direct_connect_initial_delay : float = 0.1 ,
108
112
direct_connect_interval : int = 300 ,
113
+ do_px : bool = False ,
114
+ back_off : int = 60 ,
115
+ unsubscribe_back_off : int = 10 ,
109
116
) -> None :
110
117
self .protocols = list (protocols )
111
118
self .pubsub = None
@@ -140,6 +147,10 @@ def __init__(
140
147
self .direct_connect_initial_delay = direct_connect_initial_delay
141
148
self .time_since_last_publish = {}
142
149
150
+ self .do_px = do_px
151
+ self .back_off = back_off
152
+ self .unsubscribe_back_off = unsubscribe_back_off
153
+
143
154
async def run (self ) -> None :
144
155
self .manager .run_daemon_task (self .heartbeat )
145
156
if len (self .direct_peers ) > 0 :
@@ -369,7 +380,7 @@ async def leave(self, topic: str) -> None:
369
380
return
370
381
# Notify the peers in mesh[topic] with a PRUNE(topic) message
371
382
for peer in self .mesh [topic ]:
372
- await self .emit_prune (topic , peer )
383
+ await self .emit_prune (topic , peer , do_px = self . do_px , is_unsubscribe = True )
373
384
374
385
# Forget mesh[topic]
375
386
self .mesh .pop (topic , None )
@@ -751,21 +762,28 @@ async def handle_graft(
751
762
) -> None :
752
763
topic : str = graft_msg .topicID
753
764
765
+ # TODO: complete the remaining logic
766
+ self .do_px
767
+
754
768
# Add peer to mesh for topic
755
769
if topic in self .mesh :
756
770
for direct_peer in self .direct_peers :
757
771
if direct_peer == sender_peer_id :
758
772
logger .warning (
759
773
"GRAFT: ignoring request from direct peer %s" , sender_peer_id
760
774
)
761
- await self .emit_prune (topic , sender_peer_id )
775
+ await self .emit_prune (
776
+ topic , sender_peer_id , do_px = self .do_px , is_unsubscribe = False
777
+ )
762
778
return
763
779
764
780
if sender_peer_id not in self .mesh [topic ]:
765
781
self .mesh [topic ].add (sender_peer_id )
766
782
else :
767
783
# Respond with PRUNE if not subscribed to the topic
768
- await self .emit_prune (topic , sender_peer_id )
784
+ await self .emit_prune (
785
+ topic , sender_peer_id , do_px = self .do_px , is_unsubscribe = False
786
+ )
769
787
770
788
async def handle_prune (
771
789
self , prune_msg : rpc_pb2 .ControlPrune , sender_peer_id : ID
@@ -824,11 +842,23 @@ async def emit_graft(self, topic: str, id: ID) -> None:
824
842
825
843
await self .emit_control_message (control_msg , id )
826
844
845
+ async def emit_prune (
846
+ self , topic : str , to_peer : ID , do_px : bool , is_unsubscribe : bool
847
+ ) -> None :
827
848
async def emit_prune (self , topic : str , id : ID ) -> None :
828
849
"""Emit graft message, sent to to_peer, for topic."""
829
850
prune_msg : rpc_pb2 .ControlPrune = rpc_pb2 .ControlPrune ()
830
851
prune_msg .topicID = topic
831
852
853
+ back_off_duration = self .back_off
854
+ if is_unsubscribe :
855
+ back_off_duration = self .unsubscribe_back_off
856
+
857
+ prune_msg .backoff = back_off_duration
858
+
859
+ # TODO: add peers once peerstore changes are complete
860
+ # prune_msg.peers =
861
+
832
862
control_msg : rpc_pb2 .ControlMessage = rpc_pb2 .ControlMessage ()
833
863
control_msg .prune .extend ([prune_msg ])
834
864
0 commit comments