Skip to content

Commit d1c3148

Browse files
committed
Implemented addr_stream in the peerstore
1 parent 51c08de commit d1c3148

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

libp2p/peer/peerstore.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
defaultdict,
33
)
44
from collections.abc import (
5+
AsyncIterable,
56
Sequence,
67
)
78
from typing import (
@@ -11,6 +12,8 @@
1112
from multiaddr import (
1213
Multiaddr,
1314
)
15+
import trio
16+
from trio import MemoryReceiveChannel, MemorySendChannel
1417

1518
from libp2p.abc import (
1619
IPeerStore,
@@ -40,6 +43,7 @@ class PeerStore(IPeerStore):
4043

4144
def __init__(self) -> None:
4245
self.peer_data_map = defaultdict(PeerData)
46+
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
4347

4448
def peer_info(self, peer_id: ID) -> PeerInfo:
4549
"""
@@ -178,6 +182,13 @@ def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int = 0) -> No
178182
peer_data.set_ttl(ttl)
179183
peer_data.update_last_identified()
180184

185+
if peer_id in self.addr_update_channels:
186+
for addr in addrs:
187+
try:
188+
self.addr_update_channels[peer_id].send_nowait(addr)
189+
except trio.WouldBlock:
190+
pass # Or consider logging / dropping / replacing stream
191+
181192
def addrs(self, peer_id: ID) -> list[Multiaddr]:
182193
"""
183194
:param peer_id: peer ID to get addrs for
@@ -217,6 +228,25 @@ def peers_with_addrs(self) -> list[ID]:
217228
peer_data.clear_addrs()
218229
return output
219230

231+
async def addr_stream(self, peer_id: ID) -> AsyncIterable[Multiaddr]:
232+
"""
233+
Returns an async stream of newly added addresses for the given peer.
234+
235+
This function allows consumers to subscribe to address updates for a peer
236+
and receive each new address as it is added via `add_addr` or `add_addrs`.
237+
238+
:param peer_id: The ID of the peer to monitor address updates for.
239+
:return: An async iterator yielding Multiaddr instances as they are added.
240+
"""
241+
send: MemorySendChannel[Multiaddr]
242+
receive: MemoryReceiveChannel[Multiaddr]
243+
244+
send, receive = trio.open_memory_channel(0)
245+
self.addr_update_channels[peer_id] = send
246+
247+
async for addr in receive:
248+
yield addr
249+
220250
# -------KEY-BOOK---------
221251

222252
def add_pubkey(self, peer_id: ID, pubkey: PublicKey) -> None:

tests/core/peer/test_peerstore.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pytest
44
from multiaddr import Multiaddr
5+
import trio
56

67
from libp2p.peer.id import ID
78
from libp2p.peer.peerstore import (
@@ -89,3 +90,36 @@ def test_peers():
8990
store.add_addr(ID(b"peer3"), Multiaddr("/ip4/127.0.0.1/tcp/4001"), 10)
9091

9192
assert set(store.peer_ids()) == {ID(b"peer1"), ID(b"peer2"), ID(b"peer3")}
93+
94+
95+
@pytest.mark.trio
96+
async def test_addr_stream_yields_new_addrs():
97+
store = PeerStore()
98+
peer_id = ID(b"peer1")
99+
addr1 = Multiaddr("/ip4/127.0.0.1/tcp/4001")
100+
addr2 = Multiaddr("/ip4/127.0.0.1/tcp/4002")
101+
102+
# 🔧 Pre-initialize peer in peer_data_map
103+
# store.add_addr(peer_id, Multiaddr("/ip4/127.0.0.1/tcp/0"), ttl=1)
104+
105+
collected = []
106+
107+
async def consume_addrs():
108+
async for addr in store.addr_stream(peer_id):
109+
collected.append(addr)
110+
if len(collected) == 2:
111+
break
112+
113+
async with trio.open_nursery() as nursery:
114+
nursery.start_soon(consume_addrs)
115+
await trio.sleep(2) # Give time for the stream to start
116+
117+
store.add_addr(peer_id, addr1, ttl=10)
118+
await trio.sleep(0.2)
119+
store.add_addr(peer_id, addr2, ttl=10)
120+
await trio.sleep(0.2)
121+
122+
# After collecting expected addresses, cancel the stream
123+
nursery.cancel_scope.cancel()
124+
125+
assert collected == [addr1, addr2]

0 commit comments

Comments
 (0)