Skip to content

Commit 934f49a

Browse files
authored
Merge branch 'main' into fix_multiselect_negotiate_type
2 parents fc6b290 + 970b535 commit 934f49a

File tree

9 files changed

+373
-356
lines changed

9 files changed

+373
-356
lines changed

libp2p/pubsub/floodsub.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from libp2p.peer.id import (
1616
ID,
1717
)
18+
from libp2p.peer.peerstore import env_to_send_in_RPC
1819

1920
from .exceptions import (
2021
PubsubRouterError,
@@ -103,6 +104,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
103104
)
104105
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
105106

107+
# Add the senderRecord of the peer in the RPC msg
108+
if isinstance(self.pubsub, Pubsub):
109+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
110+
rpc_msg.senderRecord = envelope_bytes
111+
106112
logger.debug("publishing message %s", pubsub_msg)
107113

108114
if self.pubsub is None:

libp2p/pubsub/gossipsub.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
)
3535
from libp2p.peer.peerstore import (
3636
PERMANENT_ADDR_TTL,
37+
env_to_send_in_RPC,
3738
)
3839
from libp2p.pubsub import (
3940
floodsub,
4041
)
42+
from libp2p.pubsub.utils import maybe_consume_signed_record
4143
from libp2p.tools.async_service import (
4244
Service,
4345
)
@@ -226,6 +228,12 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
226228
:param rpc: RPC message
227229
:param sender_peer_id: id of the peer who sent the message
228230
"""
231+
# Process the senderRecord if sent
232+
if isinstance(self.pubsub, Pubsub):
233+
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
234+
logger.error("Received an invalid-signed-record, ignoring the message")
235+
return
236+
229237
control_message = rpc.control
230238

231239
# Relay each rpc control message to the appropriate handler
@@ -253,6 +261,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
253261
)
254262
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
255263

264+
# Add the senderRecord of the peer in the RPC msg
265+
if isinstance(self.pubsub, Pubsub):
266+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
267+
rpc_msg.senderRecord = envelope_bytes
268+
256269
logger.debug("publishing message %s", pubsub_msg)
257270

258271
for peer_id in peers_gen:
@@ -818,6 +831,13 @@ async def handle_iwant(
818831
# 1) Package these messages into a single packet
819832
packet: rpc_pb2.RPC = rpc_pb2.RPC()
820833

834+
# Here the an RPC message is being created and published in response
835+
# to the iwant control msg, so we will send a freshly created senderRecord
836+
# with the RPC msg
837+
if isinstance(self.pubsub, Pubsub):
838+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
839+
packet.senderRecord = envelope_bytes
840+
821841
packet.publish.extend(msgs_to_forward)
822842

823843
if self.pubsub is None:
@@ -973,6 +993,12 @@ async def emit_control_message(
973993
raise NoPubsubAttached
974994
# Add control message to packet
975995
packet: rpc_pb2.RPC = rpc_pb2.RPC()
996+
997+
# Add the sender's peer-record in the RPC msg
998+
if isinstance(self.pubsub, Pubsub):
999+
envelope_bytes, _ = env_to_send_in_RPC(self.pubsub.host)
1000+
packet.senderRecord = envelope_bytes
1001+
9761002
packet.control.CopyFrom(control_msg)
9771003

9781004
# Get stream for peer from pubsub

libp2p/pubsub/pb/rpc.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ message RPC {
1414
}
1515

1616
optional ControlMessage control = 3;
17+
optional bytes senderRecord = 4;
1718
}
1819

1920
message Message {

libp2p/pubsub/pb/rpc_pb2.py

Lines changed: 34 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)