Skip to content

Commit b26e833

Browse files
committed
updated as per the suggestions in #815
1 parent d99b67e commit b26e833

File tree

5 files changed

+65
-44
lines changed

5 files changed

+65
-44
lines changed

libp2p/pubsub/floodsub.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from libp2p.peer.id import (
1616
ID,
1717
)
18-
from libp2p.pubsub.utils import env_to_send_in_RPC
18+
from libp2p.peer.peerstore import env_to_send_in_RPC
1919

2020
from .exceptions import (
2121
PubsubRouterError,
@@ -106,7 +106,7 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
106106

107107
# Add the senderRecord of the peer in the RPC msg
108108
if isinstance(self.pubsub, Pubsub):
109-
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
109+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
110110
rpc_msg.senderRecord = envelope_bytes
111111

112112
logger.debug("publishing message %s", pubsub_msg)

libp2p/pubsub/gossipsub.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
)
3535
from libp2p.peer.peerstore import (
3636
PERMANENT_ADDR_TTL,
37+
env_to_send_in_RPC,
3738
)
3839
from libp2p.pubsub import (
3940
floodsub,
4041
)
41-
from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record
42+
from libp2p.pubsub.utils import maybe_consume_signed_record
4243
from libp2p.tools.async_service import (
4344
Service,
4445
)
@@ -229,7 +230,7 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
229230
"""
230231
# Process the senderRecord if sent
231232
if isinstance(self.pubsub, Pubsub):
232-
if not maybe_consume_signed_record(rpc, self.pubsub.host):
233+
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
233234
logger.error("Received an invalid-signed-record, ignoring the message")
234235
return
235236

@@ -262,7 +263,7 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
262263

263264
# Add the senderRecord of the peer in the RPC msg
264265
if isinstance(self.pubsub, Pubsub):
265-
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
266+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
266267
rpc_msg.senderRecord = envelope_bytes
267268

268269
logger.debug("publishing message %s", pubsub_msg)
@@ -834,7 +835,7 @@ async def handle_iwant(
834835
# to the iwant control msg, so we will send a freshly created senderRecord
835836
# with the RPC msg
836837
if isinstance(self.pubsub, Pubsub):
837-
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
838+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
838839
packet.senderRecord = envelope_bytes
839840

840841
packet.publish.extend(msgs_to_forward)
@@ -995,7 +996,7 @@ async def emit_control_message(
995996

996997
# Add the sender's peer-record in the RPC msg
997998
if isinstance(self.pubsub, Pubsub):
998-
envelope_bytes, bool = env_to_send_in_RPC(self.pubsub.host)
999+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
9991000
packet.senderRecord = envelope_bytes
10001001

10011002
packet.control.CopyFrom(control_msg)

libp2p/pubsub/pubsub.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@
5656
from libp2p.peer.peerdata import (
5757
PeerDataError,
5858
)
59-
from libp2p.pubsub.utils import env_to_send_in_RPC, maybe_consume_signed_record
59+
from libp2p.peer.peerstore import env_to_send_in_RPC
60+
from libp2p.pubsub.utils import maybe_consume_signed_record
6061
from libp2p.tools.async_service import (
6162
Service,
6263
)
@@ -249,7 +250,7 @@ def get_hello_packet(self) -> rpc_pb2.RPC:
249250
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
250251
)
251252
# Add the sender's signedRecord in the RPC message
252-
envelope_bytes, bool = env_to_send_in_RPC(self.host)
253+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
253254
packet.senderRecord = envelope_bytes
254255

255256
return packet
@@ -270,7 +271,7 @@ async def continuously_read_stream(self, stream: INetStream) -> None:
270271
rpc_incoming.ParseFromString(incoming)
271272

272273
# Process the sender's signed-record if sent
273-
if not maybe_consume_signed_record(rpc_incoming, self.host):
274+
if not maybe_consume_signed_record(rpc_incoming, self.host, peer_id):
274275
logger.error(
275276
"Received an invalid-signed-record, ignoring the incoming msg"
276277
)
@@ -586,7 +587,7 @@ async def subscribe(self, topic_id: str) -> ISubscriptionAPI:
586587
)
587588

588589
# Add the senderRecord of the peer in the RPC msg
589-
envelope_bytes, bool = env_to_send_in_RPC(self.host)
590+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
590591
packet.senderRecord = envelope_bytes
591592
# Send out subscribe message to all peers
592593
await self.message_all_peers(packet.SerializeToString())
@@ -621,7 +622,7 @@ async def unsubscribe(self, topic_id: str) -> None:
621622
[rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]
622623
)
623624
# Add the senderRecord of the peer in the RPC msg
624-
envelope_bytes, bool = env_to_send_in_RPC(self.host)
625+
envelope_bytes, _ = env_to_send_in_RPC(self.host)
625626
packet.senderRecord = envelope_bytes
626627

627628
# Send out unsubscribe message to all peers

libp2p/pubsub/utils.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,49 @@
22

33
from libp2p.abc import IHost
44
from libp2p.peer.envelope import consume_envelope
5-
from libp2p.peer.peerstore import create_signed_peer_record
5+
from libp2p.peer.id import ID
66
from libp2p.pubsub.pb.rpc_pb2 import RPC
77

88
logger = logging.getLogger("pubsub-example.utils")
99

1010

11-
def maybe_consume_signed_record(msg: RPC, host: IHost) -> bool:
11+
def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
12+
"""
13+
Attempt to parse and store a signed-peer-record (Envelope) received during
14+
PubSub communication. If the record is invalid, the peer-id does not match, or
15+
updating the peerstore fails, the function logs an error and returns False.
16+
17+
Parameters
18+
----------
19+
msg : RPC
20+
The protobuf message received during PubSub communication.
21+
host : IHost
22+
The local host instance, providing access to the peerstore for storing
23+
verified peer records.
24+
peer_id : ID | None, optional
25+
The expected peer ID for record validation. If provided, the peer ID
26+
inside the record must match this value.
27+
28+
Returns
29+
-------
30+
bool
31+
True if a valid signed peer record was successfully consumed and stored,
32+
False otherwise.
33+
34+
"""
1235
if msg.HasField("senderRecord"):
1336
try:
1437
# Convert the signed-peer-record(Envelope) from
1538
# protobuf bytes
16-
envelope, _ = consume_envelope(msg.senderRecord, "libp2p-peer-record")
39+
envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
40+
if not record.peer_id == peer_id:
41+
return False
42+
1743
# Use the default TTL of 2 hours (7200 seconds)
1844
if not host.get_peerstore().consume_peer_record(envelope, 7200):
1945
logger.error("Updating the certified-addr-book was unsuccessful")
46+
return False
2047
except Exception as e:
2148
logger.error("Error updating the certified addr book for peer: %s", e)
2249
return False
2350
return True
24-
25-
26-
def env_to_send_in_RPC(host: IHost) -> tuple[bytes, bool]:
27-
listen_addrs_set = {addr for addr in host.get_addrs()}
28-
local_env = host.get_peerstore().get_local_record()
29-
30-
if local_env is None:
31-
# No cached SPR yet -> create one
32-
return issue_and_cache_local_record(host), True
33-
else:
34-
record_addrs_set = local_env._env_addrs_set()
35-
if record_addrs_set == listen_addrs_set:
36-
# Perfect match -> reuse the cached envelope
37-
return local_env.marshal_envelope(), False
38-
else:
39-
# Addresses changed -> issue a new SPR and cache it
40-
return issue_and_cache_local_record(host), True
41-
42-
43-
def issue_and_cache_local_record(host: IHost) -> bytes:
44-
env = create_signed_peer_record(
45-
host.get_id(),
46-
host.get_addrs(),
47-
host.get_private_key(),
48-
)
49-
# Cache it for next time
50-
host.get_peerstore().set_local_record(env)
51-
return env.marshal_envelope()

tests/core/pubsub/test_pubsub.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
from libp2p.network.stream.exceptions import (
2020
StreamEOF,
2121
)
22-
from libp2p.peer.envelope import Envelope
22+
from libp2p.peer.envelope import Envelope, seal_record
2323
from libp2p.peer.id import (
2424
ID,
2525
)
26+
from libp2p.peer.peer_record import PeerRecord
2627
from libp2p.pubsub.pb import (
2728
rpc_pb2,
2829
)
@@ -170,6 +171,8 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer():
170171

171172
# Corrupt host_a's local peer record
172173
envelope = pubsubs_fsub[0].host.get_peerstore().get_local_record()
174+
if envelope is not None:
175+
true_record = envelope.record()
173176
key_pair = create_new_key_pair()
174177

175178
if envelope is not None:
@@ -183,6 +186,23 @@ async def test_peer_subscribe_fail_upon_invald_record_transfer():
183186
TESTING_TOPIC, set()
184187
)
185188

189+
# Create a corrupt envelope with correct signature but false peer-id
190+
false_record = PeerRecord(
191+
ID.from_pubkey(key_pair.public_key), true_record.addrs
192+
)
193+
false_envelope = seal_record(
194+
false_record, pubsubs_fsub[0].host.get_private_key()
195+
)
196+
197+
pubsubs_fsub[0].host.get_peerstore().set_local_record(false_envelope)
198+
199+
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
200+
# Yeild to let 0 notify 1
201+
await trio.sleep(1)
202+
assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics.get(
203+
TESTING_TOPIC, set()
204+
)
205+
186206

187207
@pytest.mark.trio
188208
async def test_get_hello_packet():

0 commit comments

Comments
 (0)