Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
75b6a10
init
guha-rahul Jun 23, 2025
6bfbdce
Merge branch 'main' into 679_voucher
seetadev Jun 23, 2025
7a9b021
Merge branch 'main' into 679_voucher
seetadev Jun 24, 2025
20f252a
fix test and typeerror
guha-rahul Jun 26, 2025
d391f73
Merge branch 'main' into 679_voucher
seetadev Jun 26, 2025
0d44632
some fixes and logging
guha-rahul Jun 30, 2025
51a405e
Merge branch 'main' into 679_voucher
guha-rahul Jun 30, 2025
65bb710
Merge branch 'main' into 679_voucher
seetadev Jun 30, 2025
60d88fb
add newsfragment
guha-rahul Jul 1, 2025
0fcfbc5
Merge branch 'main' into 679_voucher
seetadev Jul 2, 2025
d3e311e
Merge branch 'main' into 679_voucher
seetadev Jul 3, 2025
cafc298
Merge branch 'main' into 679_voucher
seetadev Jul 5, 2025
15ce97b
Merge branch 'main' into 679_voucher
seetadev Jul 5, 2025
54e39c8
add multihop protection and add test
guha-rahul Jul 6, 2025
7284168
Merge branch 'main' into 679_voucher
guha-rahul Jul 7, 2025
85a5bb1
Merge branch 'main' into 679_voucher
guha-rahul Jul 10, 2025
8037071
Merge branch 'main' into 679_voucher
seetadev Jul 12, 2025
774a7f3
Merge branch 'main' into 679_voucher
seetadev Jul 15, 2025
a0904fe
Merge branch 'main' into 679_voucher
seetadev Jul 16, 2025
f1019a5
Merge branch 'main' into 679_voucher
seetadev Jul 17, 2025
474965a
Merge branch 'main' into 679_voucher
seetadev Jul 21, 2025
8137069
Merge branch 'main' into 679_voucher
seetadev Jul 21, 2025
0bcba4c
Merge branch 'main' into 679_voucher
seetadev Jul 26, 2025
4714054
Merge branch 'main' into 679_voucher
seetadev Jul 26, 2025
82f3390
Merge branch 'main' into 679_voucher
seetadev Aug 9, 2025
670d630
Merge branch 'main' into 679_voucher
seetadev Aug 10, 2025
5aeb690
Merge branch 'main' into 679_voucher
seetadev Aug 12, 2025
9d9da91
Merge branch 'main' into 679_voucher
seetadev Aug 18, 2025
862e28a
Merge branch 'main' into 679_voucher
seetadev Aug 25, 2025
5633d7b
Merge branch 'main' into 679_voucher
seetadev Aug 25, 2025
bc3fc57
Merge branch 'main' into 679_voucher
seetadev Aug 28, 2025
b639315
Merge branch 'main' into 679_voucher
seetadev Sep 1, 2025
a231987
Merge branch 'main' into 679_voucher
seetadev Sep 4, 2025
4c643c5
Merge branch 'main' into 679_voucher
seetadev Sep 5, 2025
45ea250
Merge branch 'main' into 679_voucher
seetadev Sep 5, 2025
5cecc74
Merge branch 'main' into 679_voucher
seetadev Sep 15, 2025
372790b
Merge branch 'main' into 679_voucher
seetadev Sep 21, 2025
2c8c59f
Merge branch 'main' of https://github.com/guha-rahul/py-libp2p into 6…
guha-rahul Oct 20, 2025
6659c7b
Merge branch 'main' into 679_voucher
seetadev Oct 20, 2025
e0538f0
Merge branch 'main' into 679_voucher
guha-rahul Dec 1, 2025
9e5d47b
Merge branch 'main' into 679_voucher
seetadev Dec 1, 2025
c43d58f
Merge branch 'main' into 679_voucher
seetadev Dec 1, 2025
8de2ced
fix: Fix NetStream peer ID access and make senderRecord validation no…
Winter-Soren Dec 28, 2025
b51cf0b
Merge branch 'main' into 679_voucher
Winter-Soren Dec 29, 2025
850c6d1
fix: RelayResourceManager constructor signature and type issues after…
Winter-Soren Dec 29, 2025
b0da1d1
fix: test_circuit_v2_verify_reservation by adding mock host for signa…
Winter-Soren Dec 29, 2025
5fe9a39
Merge branch 'main' into 679_voucher
seetadev Jan 1, 2026
5971aaa
Merge branch 'main' into 679_voucher
seetadev Jan 5, 2026
93530b8
Merge branch 'main' into 679_voucher
Winter-Soren Jan 7, 2026
d63c449
fix: increase wait time in test_handle_prune to accommodate 3s heartb…
Winter-Soren Jan 7, 2026
f0ba32a
Merge branch '679_voucher' of https://github.com/guha-rahul/py-libp2p…
Winter-Soren Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 94 additions & 20 deletions libp2p/relay/circuit_v2/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ def __init__(
self.read_timeout = read_timeout
self.write_timeout = write_timeout
self.close_timeout = close_timeout
self.resource_manager = RelayResourceManager(
self.limits, self.host.get_peerstore()
)
self.resource_manager = RelayResourceManager(self.limits, self.host)
self._active_relays: dict[ID, tuple[INetStream, INetStream | None]] = {}
self.event_started = trio.Event()

Expand Down Expand Up @@ -193,13 +191,20 @@ async def run(self, *, task_status: Any = trio.TASK_STATUS_IGNORED) -> None:
await self._close_stream(dst_stream)
self._active_relays.clear()

# Unregister protocol handlers
# Unregister protocol handlers - safely handle missing method
if self.allow_hop:
try:
# Cast host to extended interface with remove_stream_handler
host_with_handlers = cast(IHostWithStreamHandlers, self.host)
host_with_handlers.remove_stream_handler(PROTOCOL_ID)
host_with_handlers.remove_stream_handler(STOP_PROTOCOL_ID)
# Try to unregister handlers - some host implementations
# may not have this method
self.host.remove_stream_handler(PROTOCOL_ID) # type: ignore
self.host.remove_stream_handler(STOP_PROTOCOL_ID) # type: ignore
except AttributeError:
# Host does not support remove_stream_handler - handlers will be
# garbage collected
logger.debug(
"Host does not support remove_stream_handler, "
"handlers will be garbage collected"
)
except Exception as e:
logger.error("Error unregistering stream handlers: %s", str(e))

Expand Down Expand Up @@ -589,6 +594,38 @@ async def _handle_reserve(self, stream: INetStream, msg: HopMessage) -> None:
status_code = StatusCode.OK
status_msg_text = "Reservation accepted"

# Get the reservation object to access its voucher and sign it
reservation_obj = self.resource_manager._reservations.get(peer_id)
if not reservation_obj:
raise ValueError(f"Failed to create reservation for peer {peer_id}")

# Create the protobuf reservation with voucher and signature
reservation_obj.to_proto()

# Get the peer's addresses from the peerstore if available
addrs: list[bytes] = []
try:
# Try to get peer addresses from the host's peerstore
# Most host implementations have a peerstore attribute
peer_addrs = self.host.get_peerstore().addrs(peer_id)
# Convert addresses to bytes for the protocol buffer
addrs = [addr.to_bytes() for addr in peer_addrs]
logger.debug(
"Including %d addresses for peer %s in reservation response",
len(addrs),
peer_id,
)
except AttributeError:
# Host does not have peerstore or peerstore doesn't have addrs method
logger.debug("Host peerstore not available for address lookup")
except Exception as e:
logger.warning("Error getting peer addresses: %s", str(e))

# Add addresses to the reservation object
reservation_obj.addrs = addrs # type: ignore
# Note: pb_reservation.addrs not available in current protobuf definition
# pb_reservation.addrs.extend(addrs)

# Send reservation success response
with trio.fail_after(self.write_timeout):
status = create_status(code=status_code, message=status_msg_text)
Expand Down Expand Up @@ -646,6 +683,7 @@ async def _handle_connect(self, stream: INetStream, msg: HopMessage) -> None:
source_addr = stream.muxed_conn.peer_id
logger.debug("Handling CONNECT request for peer %s", peer_id)
dst_stream: INetStream | None = None
logger.debug("Handling connect request to peer %s", peer_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion. Can we have a self.resource_manager._reservations.get(peer_id) check at start, and if it fails, then we can directly reject the request with NO_RESERVATION = 204 status. (similar to other implementations).


# Verify reservation if provided
if msg.HasField("reservation"):
Expand Down Expand Up @@ -695,6 +733,8 @@ async def _handle_connect(self, stream: INetStream, msg: HopMessage) -> None:
# Get relay's SPR to send in the STOP CONNECT message
relay_envelope_bytes, _ = env_to_send_in_RPC(self.host)

logger.debug("Connected to destination peer %s", peer_id)

# Send STOP CONNECT message
stop_msg = StopMessage(
type=StopMessage.CONNECT,
Expand Down Expand Up @@ -729,17 +769,29 @@ async def _handle_connect(self, stream: INetStream, msg: HopMessage) -> None:
status_msg = "No status provided"

if status_code != StatusCode.OK:
logger.warning(
"Destination rejected connection: %s (code %s)",
status_msg,
status_code,
)
raise ConnectionError(
f"Destination rejected connection: {status_msg}"
)

# Update active relays with destination stream
self._active_relays[peer_id] = (stream, dst_stream)
logger.debug("Connection established for peer %s", peer_id)

# Update reservation connection count
reservation = self.resource_manager._reservations.get(peer_id)
if reservation:
reservation.active_connections += 1
logger.debug(
"Updated active connections for peer %s: %d/%d",
peer_id,
reservation.active_connections,
reservation.limits.max_circuit_conns,
)

# Get destination peer's SPR to send to source
signed_envelope = self.host.get_peerstore().get_peer_record(peer_id)
Expand Down Expand Up @@ -769,12 +821,6 @@ async def _handle_connect(self, stream: INetStream, msg: HopMessage) -> None:
str(e),
relay_envelope,
)
if peer_id in self._active_relays:
del self._active_relays[peer_id]
# Clean up reservation connection count on failure
reservation = self.resource_manager._reservations.get(peer_id)
if reservation:
reservation.active_connections -= 1
await stream.reset()
if dst_stream:
await dst_stream.reset()
Expand All @@ -788,8 +834,6 @@ async def _handle_connect(self, stream: INetStream, msg: HopMessage) -> None:
"Internal error",
relay_envelope,
)
if peer_id in self._active_relays:
del self._active_relays[peer_id]
await stream.reset()
if dst_stream:
await dst_stream.reset()
Expand All @@ -806,22 +850,52 @@ async def _relay_data(
Parameters
----------
src_stream : INetStream
Source stream to read from
The source stream
dst_stream : INetStream
Destination stream to write to
The destination stream
peer_id : ID
ID of the peer being relayed
The peer ID for the reservation

"""
try:
# Get the reservation for tracking data usage
reservation = self.resource_manager._reservations.get(peer_id)
total_bytes = 0

while True:
# Read data with retries
data = await self._read_stream_with_retry(src_stream)
if not data:
logger.info("Source stream closed/reset")
break

# Write data with timeout
# Track data usage
bytes_transferred = len(data)
total_bytes += bytes_transferred

# Track data and check limits
if reservation and not self.resource_manager.track_data_transfer(
peer_id, bytes_transferred
):
logger.warning(
"Data transfer limit exceeded for peer %s: "
"current=%d, attempted=%d, limit=%d",
peer_id,
reservation.data_used,
bytes_transferred,
reservation.limits.data,
)
# Close the connection due to limit exceeded
await self._send_status(
src_stream,
StatusCode.RESOURCE_LIMIT_EXCEEDED,
"Data transfer limit exceeded",
)
await src_stream.reset()
await dst_stream.reset()
return

# Write data to destination
try:
with trio.fail_after(self.write_timeout):
await dst_stream.write(data)
Expand Down
Loading
Loading