Skip to content

Commit 71ea513

Browse files
committed
Store /p2p-circuit/... addresses in the peer metadata to support reconnects and discovery
1 parent f76ad77 commit 71ea513

File tree

3 files changed

+405
-3
lines changed

3 files changed

+405
-3
lines changed

libp2p/relay/circuit_v2/resources.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,6 @@ def refresh_reservation(self, peer_id: ID) -> int:
316316
if self.has_reservation(peer_id):
317317
self.create_reservation(peer_id)
318318
return self.limits.duration
319-
319+
320320
return 0
321321

libp2p/relay/circuit_v2/transport.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,36 @@ async def dial_peer_info(
164164
If the connection cannot be established
165165
166166
"""
167+
# Prefer stored /p2p-circuit addrs from peerstore
168+
# Try first to read addresses from peerstore
169+
peer_store = self.host.get_peerstore()
170+
stored_addrs = peer_store.addrs(peer_info.peer_id)
171+
172+
# Get validated stored p2p-circuit addrs
173+
circuit_addrs = []
174+
for ma in stored_addrs:
175+
try:
176+
_, target_peer_id = self.parse_circuit_ma(ma)
177+
if target_peer_id == peer_info.peer_id:
178+
circuit_addrs.append(ma)
179+
except ValueError:
180+
continue
181+
182+
for ma in circuit_addrs:
183+
try:
184+
logger.debug(
185+
"Trying stored circuit multiaddr %s for peer %s",
186+
ma,
187+
peer_info.peer_id
188+
)
189+
conn = await self._dial_via_circuit_addr(ma, peer_info)
190+
if conn:
191+
logger.debug("Connected via stored circuit addr %s", ma)
192+
return conn
193+
logger.debug("Dial via %s returned None", ma)
194+
except Exception as e:
195+
logger.debug("Stored circuit addr failed (%s): %s", ma, e)
196+
167197
# If no specific relay is provided, try to find one
168198
if relay_peer_id is None:
169199
relay_peer_id = await self._select_relay(peer_info)
@@ -205,12 +235,155 @@ async def dial_peer_info(
205235
raise ConnectionError(f"Relay connection failed: {status_msg}")
206236

207237
# Create raw connection from stream
238+
self._store_multiaddrs(peer_info, relay_peer_id)
208239
return RawConnection(stream=relay_stream, initiator=True)
209240

210241
except Exception as e:
211242
await relay_stream.close()
212243
raise ConnectionError(f"Failed to establish relay connection: {str(e)}")
213244

245+
def parse_circuit_ma(
246+
self,
247+
ma: multiaddr.Multiaddr
248+
) -> tuple[multiaddr.Multiaddr, ID]:
249+
"""
250+
Parse a /p2p-circuit/p2p/<targetPeerID> path from a relay Multiaddr.
251+
252+
Returns:
253+
relay_ma: Multiaddr to the relay
254+
target_peer_id: ID of the target peer
255+
256+
Raises:
257+
ValueError: if the Multiaddr is not a valid circuit address
258+
259+
"""
260+
parts = ma.items()
261+
262+
if len(parts) < 2:
263+
raise ValueError(f"Invalid circuit Multiaddr, too short: {ma}")
264+
265+
proto_name, _ = parts[-2]
266+
if proto_name.name != "p2p-circuit":
267+
raise ValueError(f"Missing /p2p-circuit in Multiaddr: {ma}")
268+
269+
proto_name, val = parts[-1]
270+
if proto_name.name != "p2p":
271+
raise ValueError(f"Missing /p2p/<peerID> at the end: {ma}")
272+
273+
try:
274+
if isinstance(val, ID):
275+
target_peer_id = val
276+
else:
277+
target_peer_id = ID.from_base58(val)
278+
except Exception as e:
279+
raise ValueError(f"Invalid peer ID in circuit Multiaddr: {val}") from e
280+
281+
relay_parts = parts[:-2]
282+
relay_ma_str = "/".join(
283+
f"{p[0].name}/{p[1]}"
284+
for p in relay_parts
285+
if p[1] is not None
286+
)
287+
relay_ma = (
288+
multiaddr.Multiaddr(relay_ma_str)
289+
if relay_ma_str
290+
else multiaddr.Multiaddr("/")
291+
)
292+
293+
return relay_ma, target_peer_id
294+
295+
def _store_multiaddrs(self, peer_info: PeerInfo, relay_peer_id: ID) -> None:
296+
"""
297+
Store all /p2p-circuit addresses for a peer in the peerstore,
298+
based on the relay's addresses.
299+
"""
300+
try:
301+
relay_addrs = self.host.get_peerstore().addrs(relay_peer_id)
302+
if not relay_addrs:
303+
return
304+
305+
peer_store = self.host.get_peerstore()
306+
for relay_ma in relay_addrs:
307+
if not isinstance(relay_ma, multiaddr.Multiaddr):
308+
continue
309+
310+
# Construct /p2p-circuit address
311+
circuit_ma = (
312+
relay_ma
313+
.encapsulate(multiaddr.Multiaddr("/p2p-circuit"))
314+
.encapsulate(multiaddr.Multiaddr(f"/p2p/{peer_info.peer_id}"))
315+
)
316+
317+
peer_store.add_addrs(peer_info.peer_id, [circuit_ma], ttl=2**31-1)
318+
logger.debug(
319+
"Stored relay circuit multiaddr %s for peer %s",
320+
circuit_ma,
321+
peer_info.peer_id
322+
)
323+
324+
except Exception as e:
325+
logger.error(
326+
"Failed to store relay multiaddrs for peer %s: %s",
327+
peer_info.peer_id,
328+
e
329+
)
330+
331+
332+
async def _dial_via_circuit_addr(
333+
self,
334+
circuit_ma: multiaddr.Multiaddr,
335+
peer_info: PeerInfo
336+
) -> RawConnection:
337+
"""
338+
Dial using a stored /p2p-circuit multiaddr.
339+
340+
circuit_ma looks like: <relay-ma>/p2p-circuit/p2p/<target-peer-id>
341+
We extract the relay multiaddr (everything before /p2p-circuit), dial the relay,
342+
and issue a HOP CONNECT to the target peer.
343+
"""
344+
ma_str = str(circuit_ma)
345+
idx = ma_str.find("/p2p-circuit")
346+
if idx == -1:
347+
raise ConnectionError("Not a p2p-ciruit multiaddr")
348+
349+
relay_ma_str = ma_str[:idx] # everything before /p2p-circuit
350+
relay_ma = multiaddr.Multiaddr(relay_ma_str)
351+
relay_peer_id_str = relay_ma.value_for_protocol("p2p")
352+
if not relay_peer_id_str:
353+
raise ConnectionError("Relay multiaddr missing peer id")
354+
355+
relay_peer_id = ID.from_base58(relay_peer_id_str)
356+
357+
# open stream to the relay and request hop connect
358+
relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID])
359+
if not relay_stream:
360+
raise ConnectionError(f"Could not open stream to relay {relay_peer_id}")
361+
362+
try:
363+
hop_msg = HopMessage(
364+
type=HopMessage.CONNECT,
365+
peer=peer_info.peer_id.to_bytes(),
366+
)
367+
await relay_stream.write(hop_msg.SerializeToString())
368+
369+
resp_bytes = await relay_stream.read()
370+
resp = HopMessage()
371+
resp.ParseFromString(resp_bytes)
372+
373+
status_code = getattr(resp.status, "code", StatusCode.OK)
374+
status_msg = getattr(resp.status, "message", "Unknown error")
375+
376+
if status_code != StatusCode.OK:
377+
await relay_stream.close()
378+
raise ConnectionError(f"Relay connection failed: {status_msg}")
379+
380+
return RawConnection(stream=relay_stream, initiator=True)
381+
382+
except Exception:
383+
await relay_stream.close()
384+
raise
385+
386+
214387
async def _select_relay(self, peer_info: PeerInfo) -> ID | None:
215388
"""
216389
Select an appropriate relay for the given peer.

0 commit comments

Comments
 (0)