Skip to content

Commit 4a53fc3

Browse files
Direct Peers : Gossipsub V1.1 (#594)
* added basic structure for direct peers * added direct connect heartbeat * added logic to reject GRAFT from direct peers * added invocation of direct_connect_heartbeat * updated _get_peers_to_send to include direct peers * fixed failing gossipsub core and demo tests * fixed failing test_examples.py * add tests for peer management * fix lint * update tests * fixed direct_peers type and peer_records test * fixed failing gossipsub direct peers test * added reject graft test * updated reconnection test * added newsfragment * improved reject graft test * updated default value for direct peers * renamed direct_connect_init_delay parameter * reverted back to direct_connect_initial_delay param name --------- Co-authored-by: Khwahish Patel <[email protected]>
1 parent 5b40e25 commit 4a53fc3

File tree

8 files changed

+260
-2
lines changed

8 files changed

+260
-2
lines changed

examples/pubsub/pubsub.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ async def run(topic: str, destination: Optional[str], port: Optional[int]) -> No
136136
degree=3, # Number of peers to maintain in mesh
137137
degree_low=2, # Lower bound for mesh peers
138138
degree_high=4, # Upper bound for mesh peers
139+
direct_peers=None, # Direct peers
139140
time_to_live=60, # TTL for message cache in seconds
140141
gossip_window=2, # Smaller window for faster gossip
141142
gossip_history=5, # Keep more history

libp2p/peer/peerstore.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from collections.abc import (
55
Sequence,
66
)
7+
import sys
78
from typing import (
89
Any,
910
)
@@ -32,6 +33,8 @@
3233
PeerInfo,
3334
)
3435

36+
PERMANENT_ADDR_TTL = sys.maxsize
37+
3538

3639
class PeerStore(IPeerStore):
3740
peer_data_map: dict[ID, PeerData]

libp2p/pubsub/gossipsub.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@
2929
from libp2p.peer.id import (
3030
ID,
3131
)
32+
from libp2p.peer.peerinfo import (
33+
PeerInfo,
34+
)
35+
from libp2p.peer.peerstore import (
36+
PERMANENT_ADDR_TTL,
37+
)
3238
from libp2p.pubsub import (
3339
floodsub,
3440
)
@@ -82,17 +88,24 @@ class GossipSub(IPubsubRouter, Service):
8288
heartbeat_initial_delay: float
8389
heartbeat_interval: int
8490

91+
direct_peers: dict[ID, PeerInfo]
92+
direct_connect_initial_delay: float
93+
direct_connect_interval: int
94+
8595
def __init__(
8696
self,
8797
protocols: Sequence[TProtocol],
8898
degree: int,
8999
degree_low: int,
90100
degree_high: int,
101+
direct_peers: Sequence[PeerInfo] = None,
91102
time_to_live: int = 60,
92103
gossip_window: int = 3,
93104
gossip_history: int = 5,
94105
heartbeat_initial_delay: float = 0.1,
95106
heartbeat_interval: int = 120,
107+
direct_connect_initial_delay: float = 0.1,
108+
direct_connect_interval: int = 300,
96109
) -> None:
97110
self.protocols = list(protocols)
98111
self.pubsub = None
@@ -119,10 +132,19 @@ def __init__(
119132
self.heartbeat_initial_delay = heartbeat_initial_delay
120133
self.heartbeat_interval = heartbeat_interval
121134

135+
# Create direct peers
136+
self.direct_peers = dict()
137+
for direct_peer in direct_peers or []:
138+
self.direct_peers[direct_peer.peer_id] = direct_peer
139+
self.direct_connect_interval = direct_connect_interval
140+
self.direct_connect_initial_delay = direct_connect_initial_delay
141+
122142
async def run(self) -> None:
123143
if self.pubsub is None:
124144
raise NoPubsubAttached
125145
self.manager.run_daemon_task(self.heartbeat)
146+
if len(self.direct_peers) > 0:
147+
self.manager.run_daemon_task(self.direct_connect_heartbeat)
126148
await self.manager.wait_finished()
127149

128150
# Interface functions
@@ -142,6 +164,12 @@ def attach(self, pubsub: Pubsub) -> None:
142164
"""
143165
self.pubsub = pubsub
144166

167+
if len(self.direct_peers) > 0:
168+
for pi in self.direct_peers:
169+
self.pubsub.host.get_peerstore().add_addrs(
170+
pi, self.direct_peers[pi].addrs, PERMANENT_ADDR_TTL
171+
)
172+
145173
logger.debug("attached to pusub")
146174

147175
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
@@ -241,6 +269,10 @@ def _get_peers_to_send(
241269
if topic not in self.pubsub.peer_topics:
242270
continue
243271

272+
# direct peers
273+
_direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
274+
send_to.update(_direct_peers)
275+
244276
# floodsub peers
245277
floodsub_peers: set[ID] = {
246278
peer_id
@@ -425,6 +457,24 @@ async def heartbeat(self) -> None:
425457

426458
await trio.sleep(self.heartbeat_interval)
427459

460+
async def direct_connect_heartbeat(self) -> None:
461+
"""
462+
Connect to direct peers.
463+
"""
464+
await trio.sleep(self.direct_connect_initial_delay)
465+
while True:
466+
for direct_peer in self.direct_peers:
467+
if direct_peer not in self.pubsub.peers:
468+
try:
469+
await self.pubsub.host.connect(self.direct_peers[direct_peer])
470+
except Exception as e:
471+
logger.debug(
472+
"failed to connect to a direct peer %s: %s",
473+
direct_peer,
474+
e,
475+
)
476+
await trio.sleep(self.direct_connect_interval)
477+
428478
def mesh_heartbeat(
429479
self,
430480
) -> tuple[DefaultDict[ID, list[str]], DefaultDict[ID, list[str]]]:
@@ -654,6 +704,14 @@ async def handle_graft(
654704

655705
# Add peer to mesh for topic
656706
if topic in self.mesh:
707+
for direct_peer in self.direct_peers:
708+
if direct_peer == sender_peer_id:
709+
logger.warning(
710+
"GRAFT: ignoring request from direct peer %s", sender_peer_id
711+
)
712+
await self.emit_prune(topic, sender_peer_id)
713+
return
714+
657715
if sender_peer_id not in self.mesh[topic]:
658716
self.mesh[topic].add(sender_peer_id)
659717
else:

libp2p/tools/constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
from collections.abc import (
2+
Sequence,
3+
)
14
from typing import (
25
NamedTuple,
36
)
47

58
import multiaddr
69

10+
from libp2p.peer.peerinfo import (
11+
PeerInfo,
12+
)
713
from libp2p.pubsub import (
814
floodsub,
915
gossipsub,
@@ -26,11 +32,14 @@ class GossipsubParams(NamedTuple):
2632
degree: int = 10
2733
degree_low: int = 9
2834
degree_high: int = 11
35+
direct_peers: Sequence[PeerInfo] = None
2936
time_to_live: int = 30
3037
gossip_window: int = 3
3138
gossip_history: int = 5
3239
heartbeat_initial_delay: float = 0.1
3340
heartbeat_interval: float = 0.5
41+
direct_connect_initial_delay: float = 0.1
42+
direct_connect_interval: int = 300
3443

3544

3645
GOSSIPSUB_PARAMS = GossipsubParams()

newsfragments/594.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
added ``direct peers`` as part of gossipsub v1.1 upgrade.

tests/core/examples/test_examples.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ async def ping_handler(stream):
209209

210210

211211
async def pubsub_demo(host_a, host_b):
212-
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1)
213-
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, 0.1, 1)
212+
gossipsub_a = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1)
213+
gossipsub_b = GossipSub([GOSSIPSUB_PROTOCOL_ID], 3, 2, 4, None, 0.1, 1)
214214
pubsub_a = Pubsub(host_a, gossipsub_a)
215215
pubsub_b = Pubsub(host_b, gossipsub_b)
216216
message_a_to_b = "Hello from A to B"
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import pytest
2+
import trio
3+
4+
from libp2p.peer.peerinfo import (
5+
info_from_p2p_addr,
6+
)
7+
from libp2p.tools.utils import (
8+
connect,
9+
)
10+
from tests.utils.factories import (
11+
PubsubFactory,
12+
)
13+
14+
15+
@pytest.mark.trio
16+
async def test_attach_peer_records():
17+
"""Test that attach ensures existence of peer records in peer store."""
18+
# Create first host
19+
async with PubsubFactory.create_batch_with_gossipsub(1) as pubsubs_gsub_0:
20+
host_0 = pubsubs_gsub_0[0].host
21+
22+
# Create second host with first host as direct peer
23+
async with PubsubFactory.create_batch_with_gossipsub(
24+
1,
25+
direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])],
26+
) as pubsubs_gsub_1:
27+
host_1 = pubsubs_gsub_1[0].host
28+
29+
# Wait for heartbeat to allow mesh to connect
30+
await trio.sleep(2)
31+
32+
try:
33+
# Verify that peer records exist in peer store
34+
peer_store_0 = host_0.get_peerstore()
35+
peer_store_1 = host_1.get_peerstore()
36+
37+
# Check that each host has the other's peer record
38+
peer_ids_0 = peer_store_0.peer_ids()
39+
peer_ids_1 = peer_store_1.peer_ids()
40+
41+
print(f"Peer store 0 IDs: {peer_ids_0}")
42+
print(f"Peer store 1 IDs: {peer_ids_1}")
43+
print(f"Host 0 ID: {host_0.get_id()}")
44+
print(f"Host 1 ID: {host_1.get_id()}")
45+
46+
assert host_0.get_id() in peer_ids_1, "Peer 0 not found in peer store 1"
47+
48+
except Exception as e:
49+
print(f"Test failed with error: {e}")
50+
raise
51+
52+
53+
@pytest.mark.trio
54+
async def test_reject_graft():
55+
"""Test that graft requests are rejected if the sender is a direct peer."""
56+
# Create first host
57+
async with PubsubFactory.create_batch_with_gossipsub(
58+
1, heartbeat_interval=1, direct_connect_interval=2
59+
) as pubsubs_gsub_0:
60+
host_0 = pubsubs_gsub_0[0].host
61+
62+
# Create second host with first host as direct peer
63+
async with PubsubFactory.create_batch_with_gossipsub(
64+
1,
65+
heartbeat_interval=1,
66+
direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])],
67+
direct_connect_interval=2,
68+
) as pubsubs_gsub_1:
69+
host_1 = pubsubs_gsub_1[0].host
70+
71+
try:
72+
# Connect the hosts
73+
await connect(host_0, host_1)
74+
75+
# Wait 2 seconds for heartbeat to allow mesh to connect
76+
await trio.sleep(1)
77+
78+
topic = "test_reject_graft"
79+
80+
# Gossipsub 0 and 1 joins topic
81+
await pubsubs_gsub_0[0].router.join(topic)
82+
await pubsubs_gsub_1[0].router.join(topic)
83+
84+
# Pre-Graft assertions
85+
assert (
86+
topic in pubsubs_gsub_0[0].router.mesh
87+
), "topic not in mesh for gossipsub 0"
88+
assert (
89+
topic in pubsubs_gsub_1[0].router.mesh
90+
), "topic not in mesh for gossipsub 1"
91+
assert (
92+
host_1.get_id() not in pubsubs_gsub_0[0].router.mesh[topic]
93+
), "gossipsub 1 in mesh topic for gossipsub 0"
94+
assert (
95+
host_0.get_id() not in pubsubs_gsub_1[0].router.mesh[topic]
96+
), "gossipsub 0 in mesh topic for gossipsub 1"
97+
98+
# Gossipsub 1 emits a graft request to Gossipsub 0
99+
await pubsubs_gsub_0[0].router.emit_graft(topic, host_1.get_id())
100+
101+
await trio.sleep(1)
102+
103+
# Post-Graft assertions
104+
assert (
105+
host_1.get_id() not in pubsubs_gsub_0[0].router.mesh[topic]
106+
), "gossipsub 1 in mesh topic for gossipsub 0"
107+
assert (
108+
host_0.get_id() not in pubsubs_gsub_1[0].router.mesh[topic]
109+
), "gossipsub 0 in mesh topic for gossipsub 1"
110+
111+
except Exception as e:
112+
print(f"Test failed with error: {e}")
113+
raise
114+
115+
116+
@pytest.mark.trio
117+
async def test_heartbeat_reconnect():
118+
"""Test that heartbeat can reconnect with disconnected direct peers gracefully."""
119+
# Create first host
120+
async with PubsubFactory.create_batch_with_gossipsub(
121+
1, heartbeat_interval=1, direct_connect_interval=3
122+
) as pubsubs_gsub_0:
123+
host_0 = pubsubs_gsub_0[0].host
124+
125+
# Create second host with first host as direct peer
126+
async with PubsubFactory.create_batch_with_gossipsub(
127+
1,
128+
heartbeat_interval=1,
129+
direct_peers=[info_from_p2p_addr(host_0.get_addrs()[0])],
130+
direct_connect_interval=3,
131+
) as pubsubs_gsub_1:
132+
host_1 = pubsubs_gsub_1[0].host
133+
134+
# Connect the hosts
135+
await connect(host_0, host_1)
136+
137+
try:
138+
# Wait for initial connection and mesh setup
139+
await trio.sleep(1)
140+
141+
# Verify initial connection
142+
assert (
143+
host_1.get_id() in pubsubs_gsub_0[0].peers
144+
), "Initial connection not established for gossipsub 0"
145+
assert (
146+
host_0.get_id() in pubsubs_gsub_1[0].peers
147+
), "Initial connection not established for gossipsub 0"
148+
149+
# Simulate disconnection
150+
await host_0.disconnect(host_1.get_id())
151+
152+
# Wait for heartbeat to detect disconnection
153+
await trio.sleep(1)
154+
155+
# Verify that peers are removed after disconnection
156+
assert (
157+
host_0.get_id() not in pubsubs_gsub_1[0].peers
158+
), "Peer 0 still in gossipsub 1 after disconnection"
159+
160+
# Wait for heartbeat to reestablish connection
161+
await trio.sleep(2)
162+
163+
# Verify connection reestablishment
164+
assert (
165+
host_0.get_id() in pubsubs_gsub_1[0].peers
166+
), "Reconnection not established for gossipsub 0"
167+
168+
except Exception as e:
169+
print(f"Test failed with error: {e}")
170+
raise

0 commit comments

Comments
 (0)