Skip to content

Commit 73ebd27

Browse files
committed
added isolated_topics_test and stress_test
1 parent fbee0ba commit 73ebd27

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

tests/core/pubsub/test_gossipsub_px_and_backoff.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,112 @@ async def test_peer_exchange():
163163
# Wait for gsub0 to graft host_2 into its mesh via PX
164164
await trio.sleep(1)
165165
assert host_2.get_id() in gsub0.mesh[topic]
166+
167+
168+
@pytest.mark.trio
169+
async def test_topics_are_isolated():
170+
async with PubsubFactory.create_batch_with_gossipsub(
171+
2, heartbeat_interval=0.5, prune_back_off=2
172+
) as pubsubs:
173+
gsub0 = pubsubs[0].router
174+
gsub1 = pubsubs[1].router
175+
assert isinstance(gsub0, GossipSub)
176+
assert isinstance(gsub1, GossipSub)
177+
host_0 = pubsubs[0].host
178+
host_1 = pubsubs[1].host
179+
180+
topic1 = "test_prune_backoff"
181+
topic2 = "test_prune_backoff2"
182+
183+
# connect hosts
184+
await connect(host_0, host_1)
185+
await trio.sleep(0.5)
186+
187+
# both peers join both the topics
188+
await gsub0.join(topic1)
189+
await gsub1.join(topic1)
190+
await gsub0.join(topic2)
191+
await gsub1.join(topic2)
192+
await gsub0.emit_graft(topic1, host_1.get_id())
193+
await trio.sleep(0.5)
194+
195+
# ensure topic1 for peer is registered in mesh
196+
assert host_0.get_id() in gsub1.mesh[topic1]
197+
198+
# prune topic1 for host_1 from gsub0's mesh
199+
await gsub0.emit_prune(topic1, host_1.get_id(), False, False)
200+
await trio.sleep(0.5)
201+
202+
# topic1 for host_0 should not be in gsub1's mesh
203+
assert host_0.get_id() not in gsub1.mesh[topic1]
204+
205+
# try to regraft topic1 and graft new topic2
206+
await gsub0.emit_graft(topic1, host_1.get_id())
207+
await gsub0.emit_graft(topic2, host_1.get_id())
208+
await trio.sleep(0.5)
209+
assert host_0.get_id() not in gsub1.mesh[topic1], (
210+
"peer should be backoffed and not re-added"
211+
)
212+
assert host_0.get_id() in gsub1.mesh[topic2], (
213+
"peer should be able to join a different topic"
214+
)
215+
216+
217+
@pytest.mark.trio
218+
async def test_stress_churn():
219+
NUM_PEERS = 5
220+
CHURN_CYCLES = 30
221+
TOPIC = "stress_churn_topic"
222+
PRUNE_BACKOFF = 1
223+
HEARTBEAT_INTERVAL = 0.2
224+
225+
async with PubsubFactory.create_batch_with_gossipsub(
226+
NUM_PEERS,
227+
heartbeat_interval=HEARTBEAT_INTERVAL,
228+
prune_back_off=PRUNE_BACKOFF,
229+
) as pubsubs:
230+
routers: list[GossipSub] = []
231+
for ps in pubsubs:
232+
assert isinstance(ps.router, GossipSub)
233+
routers.append(ps.router)
234+
hosts = [ps.host for ps in pubsubs]
235+
236+
# fully connect all peers
237+
for i in range(NUM_PEERS):
238+
for j in range(i + 1, NUM_PEERS):
239+
await connect(hosts[i], hosts[j])
240+
await trio.sleep(1)
241+
242+
# all peers join the topic
243+
for router in routers:
244+
await router.join(TOPIC)
245+
await trio.sleep(1)
246+
247+
# rapid join/prune cycles
248+
for cycle in range(CHURN_CYCLES):
249+
for i, router in enumerate(routers):
250+
# prune all other peers from this router's mesh
251+
for j, peer_host in enumerate(hosts):
252+
if i != j:
253+
await router.emit_prune(TOPIC, peer_host.get_id(), False, False)
254+
await trio.sleep(0.1)
255+
for i, router in enumerate(routers):
256+
# graft all other peers back
257+
for j, peer_host in enumerate(hosts):
258+
if i != j:
259+
await router.emit_graft(TOPIC, peer_host.get_id())
260+
await trio.sleep(0.1)
261+
262+
# wait for backoff entries to expire and cleanup
263+
await trio.sleep(PRUNE_BACKOFF * 2)
264+
265+
# check that the backoff table is not unbounded
266+
for router in routers:
267+
# backoff is a dict: topic -> peer -> expiry
268+
backoff = getattr(router, "back_off", None)
269+
assert backoff is not None, "router missing backoff table"
270+
# only a small number of entries should remain (ideally 0)
271+
total_entries = sum(len(peers) for peers in backoff.values())
272+
assert total_entries < NUM_PEERS * 2, (
273+
f"backoff table grew too large: {total_entries} entries"
274+
)

0 commit comments

Comments
 (0)