Skip to content

Commit 5c11ac2

Browse files
authored
Merge pull request #815 from lla-dane/kad-record
Signed-Peer-Record support in KAD-DHT message transfer mechanism.
2 parents 292bd1a + c2c4228 commit 5c11ac2

File tree

17 files changed

+821
-163
lines changed

17 files changed

+821
-163
lines changed

libp2p/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
)
5050
from libp2p.peer.peerstore import (
5151
PeerStore,
52+
create_signed_peer_record,
5253
)
5354
from libp2p.security.insecure.transport import (
5455
PLAINTEXT_PROTOCOL_ID,
@@ -155,7 +156,6 @@ def get_default_muxer_options() -> TMuxerOptions:
155156
else: # YAMUX is default
156157
return create_yamux_muxer_option()
157158

158-
159159
def new_swarm(
160160
key_pair: KeyPair | None = None,
161161
muxer_opt: TMuxerOptions | None = None,

libp2p/abc.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,14 @@ def peers_with_addrs(self) -> list[ID]:
970970

971971
# --------CERTIFIED-ADDR-BOOK----------
972972

973+
@abstractmethod
974+
def get_local_record(self) -> Optional["Envelope"]:
975+
"""Get the local-peer-record wrapped in Envelope"""
976+
977+
@abstractmethod
978+
def set_local_record(self, envelope: "Envelope") -> None:
979+
"""Set the local-peer-record wrapped in Envelope"""
980+
973981
@abstractmethod
974982
def consume_peer_record(self, envelope: "Envelope", ttl: int) -> bool:
975983
"""

libp2p/host/basic_host.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from libp2p.peer.peerinfo import (
4444
PeerInfo,
4545
)
46+
from libp2p.peer.peerstore import create_signed_peer_record
4647
from libp2p.protocol_muxer.exceptions import (
4748
MultiselectClientError,
4849
MultiselectError,
@@ -110,6 +111,14 @@ def __init__(
110111
if bootstrap:
111112
self.bootstrap = BootstrapDiscovery(network, bootstrap)
112113

114+
# Cache a signed-record if the local-node in the PeerStore
115+
envelope = create_signed_peer_record(
116+
self.get_id(),
117+
self.get_addrs(),
118+
self.get_private_key(),
119+
)
120+
self.get_peerstore().set_local_record(envelope)
121+
113122
def get_id(self) -> ID:
114123
"""
115124
:return: peer_id of host

libp2p/identity/identify/identify.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
from libp2p.network.stream.exceptions import (
1616
StreamClosed,
1717
)
18-
from libp2p.peer.envelope import seal_record
19-
from libp2p.peer.peer_record import PeerRecord
18+
from libp2p.peer.peerstore import env_to_send_in_RPC
2019
from libp2p.utils import (
2120
decode_varint_with_size,
2221
get_agent_version,
@@ -66,9 +65,7 @@ def _mk_identify_protobuf(
6665
protocols = tuple(str(p) for p in host.get_mux().get_protocols() if p is not None)
6766

6867
# Create a signed peer-record for the remote peer
69-
record = PeerRecord(host.get_id(), host.get_addrs())
70-
envelope = seal_record(record, host.get_private_key())
71-
protobuf = envelope.marshal_envelope()
68+
envelope_bytes, _ = env_to_send_in_RPC(host)
7269

7370
observed_addr = observed_multiaddr.to_bytes() if observed_multiaddr else b""
7471
return Identify(
@@ -78,7 +75,7 @@ def _mk_identify_protobuf(
7875
listen_addrs=map(_multiaddr_to_bytes, laddrs),
7976
observed_addr=observed_addr,
8077
protocols=protocols,
81-
signedPeerRecord=protobuf,
78+
signedPeerRecord=envelope_bytes,
8279
)
8380

8481

libp2p/kad_dht/kad_dht.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,18 @@
2222
IHost,
2323
)
2424
from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager
25+
from libp2p.kad_dht.utils import maybe_consume_signed_record
2526
from libp2p.network.stream.net_stream import (
2627
INetStream,
2728
)
29+
from libp2p.peer.envelope import Envelope
2830
from libp2p.peer.id import (
2931
ID,
3032
)
3133
from libp2p.peer.peerinfo import (
3234
PeerInfo,
3335
)
36+
from libp2p.peer.peerstore import env_to_send_in_RPC
3437
from libp2p.tools.async_service import (
3538
Service,
3639
)
@@ -234,6 +237,9 @@ async def handle_stream(self, stream: INetStream) -> None:
234237
await self.add_peer(peer_id)
235238
logger.debug(f"Added peer {peer_id} to routing table")
236239

240+
closer_peer_envelope: Envelope | None = None
241+
provider_peer_envelope: Envelope | None = None
242+
237243
try:
238244
# Read varint-prefixed length for the message
239245
length_prefix = b""
@@ -274,6 +280,14 @@ async def handle_stream(self, stream: INetStream) -> None:
274280
)
275281
logger.debug(f"Found {len(closest_peers)} peers close to target")
276282

283+
# Consume the source signed_peer_record if sent
284+
if not maybe_consume_signed_record(message, self.host, peer_id):
285+
logger.error(
286+
"Received an invalid-signed-record, dropping the stream"
287+
)
288+
await stream.close()
289+
return
290+
277291
# Build response message with protobuf
278292
response = Message()
279293
response.type = Message.MessageType.FIND_NODE
@@ -298,6 +312,21 @@ async def handle_stream(self, stream: INetStream) -> None:
298312
except Exception:
299313
pass
300314

315+
# Add the signed-peer-record for each peer in the peer-proto
316+
# if cached in the peerstore
317+
closer_peer_envelope = (
318+
self.host.get_peerstore().get_peer_record(peer)
319+
)
320+
321+
if closer_peer_envelope is not None:
322+
peer_proto.signedRecord = (
323+
closer_peer_envelope.marshal_envelope()
324+
)
325+
326+
# Create sender_signed_peer_record
327+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
328+
response.senderRecord = envelope_bytes
329+
301330
# Serialize and send response
302331
response_bytes = response.SerializeToString()
303332
await stream.write(varint.encode(len(response_bytes)))
@@ -312,6 +341,14 @@ async def handle_stream(self, stream: INetStream) -> None:
312341
key = message.key
313342
logger.debug(f"Received ADD_PROVIDER for key {key.hex()}")
314343

344+
# Consume the source signed-peer-record if sent
345+
if not maybe_consume_signed_record(message, self.host, peer_id):
346+
logger.error(
347+
"Received an invalid-signed-record, dropping the stream"
348+
)
349+
await stream.close()
350+
return
351+
315352
# Extract provider information
316353
for provider_proto in message.providerPeers:
317354
try:
@@ -338,6 +375,17 @@ async def handle_stream(self, stream: INetStream) -> None:
338375
logger.debug(
339376
f"Added provider {provider_id} for key {key.hex()}"
340377
)
378+
379+
# Process the signed-records of provider if sent
380+
if not maybe_consume_signed_record(
381+
provider_proto, self.host
382+
):
383+
logger.error(
384+
"Received an invalid-signed-record,"
385+
"dropping the stream"
386+
)
387+
await stream.close()
388+
return
341389
except Exception as e:
342390
logger.warning(f"Failed to process provider info: {e}")
343391

@@ -346,6 +394,10 @@ async def handle_stream(self, stream: INetStream) -> None:
346394
response.type = Message.MessageType.ADD_PROVIDER
347395
response.key = key
348396

397+
# Add sender's signed-peer-record
398+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
399+
response.senderRecord = envelope_bytes
400+
349401
response_bytes = response.SerializeToString()
350402
await stream.write(varint.encode(len(response_bytes)))
351403
await stream.write(response_bytes)
@@ -357,6 +409,14 @@ async def handle_stream(self, stream: INetStream) -> None:
357409
key = message.key
358410
logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}")
359411

412+
# Consume the source signed_peer_record if sent
413+
if not maybe_consume_signed_record(message, self.host, peer_id):
414+
logger.error(
415+
"Received an invalid-signed-record, dropping the stream"
416+
)
417+
await stream.close()
418+
return
419+
360420
# Find providers for the key
361421
providers = self.provider_store.get_providers(key)
362422
logger.debug(
@@ -368,12 +428,28 @@ async def handle_stream(self, stream: INetStream) -> None:
368428
response.type = Message.MessageType.GET_PROVIDERS
369429
response.key = key
370430

431+
# Create sender_signed_peer_record for the response
432+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
433+
response.senderRecord = envelope_bytes
434+
371435
# Add provider information to response
372436
for provider_info in providers:
373437
provider_proto = response.providerPeers.add()
374438
provider_proto.id = provider_info.peer_id.to_bytes()
375439
provider_proto.connection = Message.ConnectionType.CAN_CONNECT
376440

441+
# Add provider signed-records if cached
442+
provider_peer_envelope = (
443+
self.host.get_peerstore().get_peer_record(
444+
provider_info.peer_id
445+
)
446+
)
447+
448+
if provider_peer_envelope is not None:
449+
provider_proto.signedRecord = (
450+
provider_peer_envelope.marshal_envelope()
451+
)
452+
377453
# Add addresses if available
378454
for addr in provider_info.addrs:
379455
provider_proto.addrs.append(addr.to_bytes())
@@ -397,6 +473,16 @@ async def handle_stream(self, stream: INetStream) -> None:
397473
peer_proto.id = peer.to_bytes()
398474
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
399475

476+
# Add the signed-records of closest_peers if cached
477+
closer_peer_envelope = (
478+
self.host.get_peerstore().get_peer_record(peer)
479+
)
480+
481+
if closer_peer_envelope is not None:
482+
peer_proto.signedRecord = (
483+
closer_peer_envelope.marshal_envelope()
484+
)
485+
400486
# Add addresses if available
401487
try:
402488
addrs = self.host.get_peerstore().addrs(peer)
@@ -417,6 +503,14 @@ async def handle_stream(self, stream: INetStream) -> None:
417503
key = message.key
418504
logger.debug(f"Received GET_VALUE request for key {key.hex()}")
419505

506+
# Consume the sender_signed_peer_record
507+
if not maybe_consume_signed_record(message, self.host, peer_id):
508+
logger.error(
509+
"Received an invalid-signed-record, dropping the stream"
510+
)
511+
await stream.close()
512+
return
513+
420514
value = self.value_store.get(key)
421515
if value:
422516
logger.debug(f"Found value for key {key.hex()}")
@@ -431,6 +525,10 @@ async def handle_stream(self, stream: INetStream) -> None:
431525
response.record.value = value
432526
response.record.timeReceived = str(time.time())
433527

528+
# Create sender_signed_peer_record
529+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
530+
response.senderRecord = envelope_bytes
531+
434532
# Serialize and send response
435533
response_bytes = response.SerializeToString()
436534
await stream.write(varint.encode(len(response_bytes)))
@@ -444,6 +542,10 @@ async def handle_stream(self, stream: INetStream) -> None:
444542
response.type = Message.MessageType.GET_VALUE
445543
response.key = key
446544

545+
# Create sender_signed_peer_record for the response
546+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
547+
response.senderRecord = envelope_bytes
548+
447549
# Add closest peers to key
448550
closest_peers = self.routing_table.find_local_closest_peers(
449551
key, 20
@@ -462,6 +564,16 @@ async def handle_stream(self, stream: INetStream) -> None:
462564
peer_proto.id = peer.to_bytes()
463565
peer_proto.connection = Message.ConnectionType.CAN_CONNECT
464566

567+
# Add signed-records of closer-peers if cached
568+
closer_peer_envelope = (
569+
self.host.get_peerstore().get_peer_record(peer)
570+
)
571+
572+
if closer_peer_envelope is not None:
573+
peer_proto.signedRecord = (
574+
closer_peer_envelope.marshal_envelope()
575+
)
576+
465577
# Add addresses if available
466578
try:
467579
addrs = self.host.get_peerstore().addrs(peer)
@@ -484,6 +596,15 @@ async def handle_stream(self, stream: INetStream) -> None:
484596
key = message.record.key
485597
value = message.record.value
486598
success = False
599+
600+
# Consume the source signed_peer_record if sent
601+
if not maybe_consume_signed_record(message, self.host, peer_id):
602+
logger.error(
603+
"Received an invalid-signed-record, dropping the stream"
604+
)
605+
await stream.close()
606+
return
607+
487608
try:
488609
if not (key and value):
489610
raise ValueError(
@@ -504,6 +625,12 @@ async def handle_stream(self, stream: INetStream) -> None:
504625
response.type = Message.MessageType.PUT_VALUE
505626
if success:
506627
response.key = key
628+
629+
# Create sender_signed_peer_record for the response
630+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
631+
response.senderRecord = envelope_bytes
632+
633+
# Serialize and send response
507634
response_bytes = response.SerializeToString()
508635
await stream.write(varint.encode(len(response_bytes)))
509636
await stream.write(response_bytes)

libp2p/kad_dht/pb/kademlia.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ message Message {
2727
bytes id = 1;
2828
repeated bytes addrs = 2;
2929
ConnectionType connection = 3;
30+
optional bytes signedRecord = 4; // Envelope(PeerRecord) encoded
3031
}
3132

3233
MessageType type = 1;
@@ -35,4 +36,6 @@ message Message {
3536
Record record = 3;
3637
repeated Peer closerPeers = 8;
3738
repeated Peer providerPeers = 9;
39+
40+
optional bytes senderRecord = 11; // Envelope(PeerRecord) encoded
3841
}

0 commit comments

Comments
 (0)