Skip to content

Commit 80c686d

Browse files
authored
Merge branch 'main' into feature/bootstrap
2 parents dcb199a + 0679efb commit 80c686d

File tree

12 files changed

+1554
-127
lines changed

12 files changed

+1554
-127
lines changed

libp2p/abc.py

Lines changed: 572 additions & 54 deletions
Large diffs are not rendered by default.

libp2p/peer/peerdata.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@
1818
PublicKey,
1919
)
2020

21+
"""
22+
Latency EWMA Smoothing governs the deacy of the EWMA (the speed at which
23+
is changes). This must be a normalized (0-1) value.
24+
1 is 100% change, 0 is no change.
25+
"""
26+
LATENCY_EWMA_SMOOTHING = 0.1
27+
2128

2229
class PeerData(IPeerData):
2330
pubkey: PublicKey | None
@@ -27,6 +34,7 @@ class PeerData(IPeerData):
2734
addrs: list[Multiaddr]
2835
last_identified: int
2936
ttl: int # Keep ttl=0 by default for always valid
37+
latmap: float
3038

3139
def __init__(self) -> None:
3240
self.pubkey = None
@@ -36,6 +44,9 @@ def __init__(self) -> None:
3644
self.addrs = []
3745
self.last_identified = int(time.time())
3846
self.ttl = 0
47+
self.latmap = 0
48+
49+
# --------PROTO-BOOK--------
3950

4051
def get_protocols(self) -> list[str]:
4152
"""
@@ -55,6 +66,37 @@ def set_protocols(self, protocols: Sequence[str]) -> None:
5566
"""
5667
self.protocols = list(protocols)
5768

69+
def remove_protocols(self, protocols: Sequence[str]) -> None:
70+
"""
71+
:param protocols: protocols to remove
72+
"""
73+
for protocol in protocols:
74+
if protocol in self.protocols:
75+
self.protocols.remove(protocol)
76+
77+
def supports_protocols(self, protocols: Sequence[str]) -> list[str]:
78+
"""
79+
:param protocols: protocols to check from
80+
:return: all supported protocols in the given list
81+
"""
82+
return [proto for proto in protocols if proto in self.protocols]
83+
84+
def first_supported_protocol(self, protocols: Sequence[str]) -> str:
85+
"""
86+
:param protocols: protocols to check from
87+
:return: first supported protocol in the given list
88+
"""
89+
for protocol in protocols:
90+
if protocol in self.protocols:
91+
return protocol
92+
93+
return "None supported"
94+
95+
def clear_protocol_data(self) -> None:
96+
"""Clear all protocols"""
97+
self.protocols = []
98+
99+
# -------ADDR-BOOK---------
58100
def add_addrs(self, addrs: Sequence[Multiaddr]) -> None:
59101
"""
60102
:param addrs: multiaddresses to add
@@ -73,6 +115,7 @@ def clear_addrs(self) -> None:
73115
"""Clear all addresses."""
74116
self.addrs = []
75117

118+
# -------METADATA-----------
76119
def put_metadata(self, key: str, val: Any) -> None:
77120
"""
78121
:param key: key in KV pair
@@ -90,6 +133,11 @@ def get_metadata(self, key: str) -> Any:
90133
return self.metadata[key]
91134
raise PeerDataError("key not found")
92135

136+
def clear_metadata(self) -> None:
137+
"""Clears metadata."""
138+
self.metadata = {}
139+
140+
# -------KEY-BOOK---------------
93141
def add_pubkey(self, pubkey: PublicKey) -> None:
94142
"""
95143
:param pubkey:
@@ -120,9 +168,41 @@ def get_privkey(self) -> PrivateKey:
120168
raise PeerDataError("private key not found")
121169
return self.privkey
122170

171+
def clear_keydata(self) -> None:
172+
"""Clears keydata"""
173+
self.pubkey = None
174+
self.privkey = None
175+
176+
# ----------METRICS--------------
177+
def record_latency(self, new_latency: float) -> None:
178+
"""
179+
Records a new latency measurement for the given peer
180+
using Exponentially Weighted Moving Average (EWMA)
181+
:param new_latency: the new latency value
182+
"""
183+
s = LATENCY_EWMA_SMOOTHING
184+
if s > 1 or s < 0:
185+
s = 0.1
186+
187+
if self.latmap == 0:
188+
self.latmap = new_latency
189+
else:
190+
prev = self.latmap
191+
updated = ((1.0 - s) * prev) + (s * new_latency)
192+
self.latmap = updated
193+
194+
def latency_EWMA(self) -> float:
195+
"""Returns the latency EWMA value"""
196+
return self.latmap
197+
198+
def clear_metrics(self) -> None:
199+
"""Clear the latency metrics"""
200+
self.latmap = 0
201+
123202
def update_last_identified(self) -> None:
124203
self.last_identified = int(time.time())
125204

205+
# ----------TTL------------------
126206
def get_last_identified(self) -> int:
127207
"""
128208
:return: last identified timestamp

libp2p/peer/peerstore.py

Lines changed: 122 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
defaultdict,
33
)
44
from collections.abc import (
5+
AsyncIterable,
56
Sequence,
67
)
78
from typing import (
@@ -11,6 +12,8 @@
1112
from multiaddr import (
1213
Multiaddr,
1314
)
15+
import trio
16+
from trio import MemoryReceiveChannel, MemorySendChannel
1417

1518
from libp2p.abc import (
1619
IPeerStore,
@@ -40,6 +43,7 @@ class PeerStore(IPeerStore):
4043

4144
def __init__(self) -> None:
4245
self.peer_data_map = defaultdict(PeerData)
46+
self.addr_update_channels: dict[ID, MemorySendChannel[Multiaddr]] = {}
4347

4448
def peer_info(self, peer_id: ID) -> PeerInfo:
4549
"""
@@ -53,6 +57,29 @@ def peer_info(self, peer_id: ID) -> PeerInfo:
5357
return PeerInfo(peer_id, peer_data.get_addrs())
5458
raise PeerStoreError("peer ID not found")
5559

60+
def peer_ids(self) -> list[ID]:
61+
"""
62+
:return: all of the peer IDs stored in peer store
63+
"""
64+
return list(self.peer_data_map.keys())
65+
66+
def clear_peerdata(self, peer_id: ID) -> None:
67+
"""Clears the peer data of the peer"""
68+
69+
def valid_peer_ids(self) -> list[ID]:
70+
"""
71+
:return: all of the valid peer IDs stored in peer store
72+
"""
73+
valid_peer_ids: list[ID] = []
74+
for peer_id, peer_data in self.peer_data_map.items():
75+
if not peer_data.is_expired():
76+
valid_peer_ids.append(peer_id)
77+
else:
78+
peer_data.clear_addrs()
79+
return valid_peer_ids
80+
81+
# --------PROTO-BOOK--------
82+
5683
def get_protocols(self, peer_id: ID) -> list[str]:
5784
"""
5885
:param peer_id: peer ID to get protocols for
@@ -79,23 +106,31 @@ def set_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None:
79106
peer_data = self.peer_data_map[peer_id]
80107
peer_data.set_protocols(list(protocols))
81108

82-
def peer_ids(self) -> list[ID]:
109+
def remove_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None:
83110
"""
84-
:return: all of the peer IDs stored in peer store
111+
:param peer_id: peer ID to get info for
112+
:param protocols: unsupported protocols to remove
85113
"""
86-
return list(self.peer_data_map.keys())
114+
peer_data = self.peer_data_map[peer_id]
115+
peer_data.remove_protocols(protocols)
87116

88-
def valid_peer_ids(self) -> list[ID]:
117+
def supports_protocols(self, peer_id: ID, protocols: Sequence[str]) -> list[str]:
89118
"""
90-
:return: all of the valid peer IDs stored in peer store
119+
:return: all of the peer IDs stored in peer store
91120
"""
92-
valid_peer_ids: list[ID] = []
93-
for peer_id, peer_data in self.peer_data_map.items():
94-
if not peer_data.is_expired():
95-
valid_peer_ids.append(peer_id)
96-
else:
97-
peer_data.clear_addrs()
98-
return valid_peer_ids
121+
peer_data = self.peer_data_map[peer_id]
122+
return peer_data.supports_protocols(protocols)
123+
124+
def first_supported_protocol(self, peer_id: ID, protocols: Sequence[str]) -> str:
125+
peer_data = self.peer_data_map[peer_id]
126+
return peer_data.first_supported_protocol(protocols)
127+
128+
def clear_protocol_data(self, peer_id: ID) -> None:
129+
"""Clears prtocoldata"""
130+
peer_data = self.peer_data_map[peer_id]
131+
peer_data.clear_protocol_data()
132+
133+
# ------METADATA---------
99134

100135
def get(self, peer_id: ID, key: str) -> Any:
101136
"""
@@ -121,6 +156,13 @@ def put(self, peer_id: ID, key: str, val: Any) -> None:
121156
peer_data = self.peer_data_map[peer_id]
122157
peer_data.put_metadata(key, val)
123158

159+
def clear_metadata(self, peer_id: ID) -> None:
160+
"""Clears metadata"""
161+
peer_data = self.peer_data_map[peer_id]
162+
peer_data.clear_metadata()
163+
164+
# -------ADDR-BOOK--------
165+
124166
def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int = 0) -> None:
125167
"""
126168
:param peer_id: peer ID to add address for
@@ -140,6 +182,13 @@ def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int = 0) -> No
140182
peer_data.set_ttl(ttl)
141183
peer_data.update_last_identified()
142184

185+
if peer_id in self.addr_update_channels:
186+
for addr in addrs:
187+
try:
188+
self.addr_update_channels[peer_id].send_nowait(addr)
189+
except trio.WouldBlock:
190+
pass # Or consider logging / dropping / replacing stream
191+
143192
def addrs(self, peer_id: ID) -> list[Multiaddr]:
144193
"""
145194
:param peer_id: peer ID to get addrs for
@@ -165,7 +214,7 @@ def clear_addrs(self, peer_id: ID) -> None:
165214

166215
def peers_with_addrs(self) -> list[ID]:
167216
"""
168-
:return: all of the peer IDs which has addrs stored in peer store
217+
:return: all of the peer IDs which has addrsfloat stored in peer store
169218
"""
170219
# Add all peers with addrs at least 1 to output
171220
output: list[ID] = []
@@ -179,6 +228,27 @@ def peers_with_addrs(self) -> list[ID]:
179228
peer_data.clear_addrs()
180229
return output
181230

231+
async def addr_stream(self, peer_id: ID) -> AsyncIterable[Multiaddr]:
232+
"""
233+
Returns an async stream of newly added addresses for the given peer.
234+
235+
This function allows consumers to subscribe to address updates for a peer
236+
and receive each new address as it is added via `add_addr` or `add_addrs`.
237+
238+
:param peer_id: The ID of the peer to monitor address updates for.
239+
:return: An async iterator yielding Multiaddr instances as they are added.
240+
"""
241+
send: MemorySendChannel[Multiaddr]
242+
receive: MemoryReceiveChannel[Multiaddr]
243+
244+
send, receive = trio.open_memory_channel(0)
245+
self.addr_update_channels[peer_id] = send
246+
247+
async for addr in receive:
248+
yield addr
249+
250+
# -------KEY-BOOK---------
251+
182252
def add_pubkey(self, peer_id: ID, pubkey: PublicKey) -> None:
183253
"""
184254
:param peer_id: peer ID to add public key for
@@ -239,6 +309,45 @@ def add_key_pair(self, peer_id: ID, key_pair: KeyPair) -> None:
239309
self.add_pubkey(peer_id, key_pair.public_key)
240310
self.add_privkey(peer_id, key_pair.private_key)
241311

312+
def peer_with_keys(self) -> list[ID]:
313+
"""Returns the peer_ids for which keys are stored"""
314+
return [
315+
peer_id
316+
for peer_id, pdata in self.peer_data_map.items()
317+
if pdata.pubkey is not None
318+
]
319+
320+
def clear_keydata(self, peer_id: ID) -> None:
321+
"""Clears the keys of the peer"""
322+
peer_data = self.peer_data_map[peer_id]
323+
peer_data.clear_keydata()
324+
325+
# --------METRICS--------
326+
327+
def record_latency(self, peer_id: ID, RTT: float) -> None:
328+
"""
329+
Records a new latency measurement for the given peer
330+
using Exponentially Weighted Moving Average (EWMA)
331+
332+
:param peer_id: peer ID to get private key for
333+
:param RTT: the new latency value (round trip time)
334+
"""
335+
peer_data = self.peer_data_map[peer_id]
336+
peer_data.record_latency(RTT)
337+
338+
def latency_EWMA(self, peer_id: ID) -> float:
339+
"""
340+
:param peer_id: peer ID to get private key for
341+
:return: The latency EWMA value for that peer
342+
"""
343+
peer_data = self.peer_data_map[peer_id]
344+
return peer_data.latency_EWMA()
345+
346+
def clear_metrics(self, peer_id: ID) -> None:
347+
"""Clear the latency metrics"""
348+
peer_data = self.peer_data_map[peer_id]
349+
peer_data.clear_metrics()
350+
242351

243352
class PeerStoreError(KeyError):
244353
"""Raised when peer ID is not found in peer store."""

0 commit comments

Comments
 (0)