Skip to content

Commit ea6eef6

Browse files
committed
test px and backoff
1 parent 24e7320 commit ea6eef6

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed

libp2p/pubsub/gossipsub.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,7 @@ def _check_back_off(self, peer: ID, topic: str) -> bool:
735735
:param topic: topic to check
736736
:return: True if the peer is in back off, False otherwise
737737
"""
738-
if topic not in self.back_off:
738+
if topic not in self.back_off or peer not in self.back_off[topic]:
739739
return False
740740
if self.back_off[topic].get(peer, 0) > int(time.time()):
741741
return True

tests/core/pubsub/test_gossipsub.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,9 @@ async def test_fanout():
292292
@pytest.mark.trio
293293
@pytest.mark.slow
294294
async def test_fanout_maintenance():
295-
async with PubsubFactory.create_batch_with_gossipsub(10) as pubsubs_gsub:
295+
async with PubsubFactory.create_batch_with_gossipsub(
296+
10, unsubscribe_back_off=1
297+
) as pubsubs_gsub:
296298
hosts = [pubsub.host for pubsub in pubsubs_gsub]
297299
num_msgs = 5
298300

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import pytest
2+
import trio
3+
4+
from libp2p.pubsub.gossipsub import (
5+
GossipSub,
6+
)
7+
from libp2p.tools.utils import (
8+
connect,
9+
)
10+
from tests.utils.factories import (
11+
PubsubFactory,
12+
)
13+
14+
15+
@pytest.mark.trio
16+
async def test_prune_backoff():
17+
async with PubsubFactory.create_batch_with_gossipsub(
18+
2, heartbeat_interval=0.5, prune_back_off=2
19+
) as pubsubs:
20+
gsub0 = pubsubs[0].router
21+
gsub1 = pubsubs[1].router
22+
assert isinstance(gsub0, GossipSub)
23+
assert isinstance(gsub1, GossipSub)
24+
host_0 = pubsubs[0].host
25+
host_1 = pubsubs[1].host
26+
27+
topic = "test_prune_backoff"
28+
29+
# connect hosts
30+
await connect(host_0, host_1)
31+
await trio.sleep(0.5)
32+
33+
# both join the topic
34+
await gsub0.join(topic)
35+
await gsub1.join(topic)
36+
await gsub0.emit_graft(topic, host_1.get_id())
37+
await trio.sleep(0.5)
38+
39+
# ensure peer is registered in mesh
40+
assert host_0.get_id() in gsub1.mesh[topic]
41+
42+
# prune host_1 from gsub0's mesh
43+
await gsub0.emit_prune(topic, host_1.get_id(), False, False)
44+
await trio.sleep(0.5)
45+
46+
# host_0 should not be in gsub1's mesh
47+
assert host_0.get_id() not in gsub1.mesh[topic]
48+
49+
# try to graft again immediately (should be rejected due to backoff)
50+
await gsub0.emit_graft(topic, host_1.get_id())
51+
await trio.sleep(0.5)
52+
assert host_0.get_id() not in gsub1.mesh[topic], (
53+
"peer should be backoffed and not re-added"
54+
)
55+
56+
# try to graft again (should succeed after backoff)
57+
await trio.sleep(2)
58+
await gsub0.emit_graft(topic, host_1.get_id())
59+
await trio.sleep(1)
60+
assert host_0.get_id() in gsub1.mesh[topic], (
61+
"peer should be able to rejoin after backoff"
62+
)
63+
64+
65+
@pytest.mark.trio
66+
async def test_unsubscribe_backoff():
67+
async with PubsubFactory.create_batch_with_gossipsub(
68+
2, heartbeat_interval=1, prune_back_off=1, unsubscribe_back_off=2
69+
) as pubsubs:
70+
gsub0 = pubsubs[0].router
71+
gsub1 = pubsubs[1].router
72+
assert isinstance(gsub0, GossipSub)
73+
assert isinstance(gsub1, GossipSub)
74+
host_0 = pubsubs[0].host
75+
host_1 = pubsubs[1].host
76+
77+
topic = "test_unsubscribe_backoff"
78+
79+
# connect hosts
80+
await connect(host_0, host_1)
81+
await trio.sleep(0.5)
82+
83+
# both join the topic
84+
await gsub0.join(topic)
85+
await gsub1.join(topic)
86+
await gsub0.emit_graft(topic, host_1.get_id())
87+
await trio.sleep(0.5)
88+
89+
# ensure peer is registered in mesh
90+
assert host_0.get_id() in gsub1.mesh[topic]
91+
92+
# host_1 unsubscribes from the topic
93+
await gsub1.leave(topic)
94+
await trio.sleep(0.5)
95+
assert topic not in gsub1.mesh
96+
97+
# host_1 resubscribes to the topic
98+
await gsub1.join(topic)
99+
await trio.sleep(0.5)
100+
assert topic in gsub1.mesh
101+
102+
# try to graft again immediately (should be rejected due to backoff)
103+
await gsub0.emit_graft(topic, host_1.get_id())
104+
await trio.sleep(0.5)
105+
assert host_0.get_id() not in gsub1.mesh[topic], (
106+
"peer should be backoffed and not re-added"
107+
)
108+
109+
# try to graft again (should succeed after backoff)
110+
await trio.sleep(1)
111+
await gsub0.emit_graft(topic, host_1.get_id())
112+
await trio.sleep(1)
113+
assert host_0.get_id() in gsub1.mesh[topic], (
114+
"peer should be able to rejoin after backoff"
115+
)

0 commit comments

Comments
 (0)