@@ -113,3 +113,53 @@ async def test_unsubscribe_backoff():
113
113
assert host_0 .get_id () in gsub1 .mesh [topic ], (
114
114
"peer should be able to rejoin after backoff"
115
115
)
116
+
117
+
118
+ @pytest .mark .trio
119
+ async def test_peer_exchange ():
120
+ async with PubsubFactory .create_batch_with_gossipsub (
121
+ 3 ,
122
+ heartbeat_interval = 0.5 ,
123
+ do_px = True ,
124
+ px_peers_count = 1 ,
125
+ ) as pubsubs :
126
+ gsub0 = pubsubs [0 ].router
127
+ gsub1 = pubsubs [1 ].router
128
+ gsub2 = pubsubs [2 ].router
129
+ assert isinstance (gsub0 , GossipSub )
130
+ assert isinstance (gsub1 , GossipSub )
131
+ assert isinstance (gsub2 , GossipSub )
132
+ host_0 = pubsubs [0 ].host
133
+ host_1 = pubsubs [1 ].host
134
+ host_2 = pubsubs [2 ].host
135
+
136
+ topic = "test_peer_exchange"
137
+
138
+ # connect hosts
139
+ await connect (host_1 , host_0 )
140
+ await connect (host_1 , host_2 )
141
+ await trio .sleep (0.5 )
142
+
143
+ # all join the topic and 0 <-> 1 and 1 <-> 2 graft
144
+ await pubsubs [1 ].subscribe (topic )
145
+ await pubsubs [0 ].subscribe (topic )
146
+ await pubsubs [2 ].subscribe (topic )
147
+ await gsub1 .emit_graft (topic , host_0 .get_id ())
148
+ await gsub1 .emit_graft (topic , host_2 .get_id ())
149
+ await gsub0 .emit_graft (topic , host_1 .get_id ())
150
+ await gsub2 .emit_graft (topic , host_1 .get_id ())
151
+ await trio .sleep (1 )
152
+
153
+ # ensure peer is registered in mesh
154
+ assert host_0 .get_id () in gsub1 .mesh [topic ]
155
+ assert host_2 .get_id () in gsub1 .mesh [topic ]
156
+ assert host_2 .get_id () not in gsub0 .mesh [topic ]
157
+
158
+ # host_1 unsubscribes from the topic
159
+ await gsub1 .leave (topic )
160
+ await trio .sleep (1 ) # Wait for heartbeat to update mesh
161
+ assert topic not in gsub1 .mesh
162
+
163
+ # Wait for gsub0 to graft host_2 into its mesh via PX
164
+ await trio .sleep (1 )
165
+ assert host_2 .get_id () in gsub0 .mesh [topic ]
0 commit comments