Skip to content

Commit 92c9ba7

Browse files
authored
Merge pull request #689 from guha-rahul/write_msg_pubsub
feat: Implement WriteMsg method for efficient RPC message writing
2 parents 520e555 + ac51d87 commit 92c9ba7

File tree

4 files changed

+49
-43
lines changed

4 files changed

+49
-43
lines changed

libp2p/pubsub/floodsub.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,9 @@
1212
from libp2p.custom_types import (
1313
TProtocol,
1414
)
15-
from libp2p.network.stream.exceptions import (
16-
StreamClosed,
17-
)
1815
from libp2p.peer.id import (
1916
ID,
2017
)
21-
from libp2p.utils import (
22-
encode_varint_prefixed,
23-
)
2418

2519
from .exceptions import (
2620
PubsubRouterError,
@@ -120,13 +114,7 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
120114
if peer_id not in pubsub.peers:
121115
continue
122116
stream = pubsub.peers[peer_id]
123-
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
124-
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
125-
try:
126-
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
127-
except StreamClosed:
128-
logger.debug("Fail to publish message to %s: stream closed", peer_id)
129-
pubsub._handle_dead_peer(peer_id)
117+
await pubsub.write_msg(stream, rpc_msg)
130118

131119
async def join(self, topic: str) -> None:
132120
"""

libp2p/pubsub/gossipsub.py

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
from libp2p.custom_types import (
2525
TProtocol,
2626
)
27-
from libp2p.network.stream.exceptions import (
28-
StreamClosed,
29-
)
3027
from libp2p.peer.id import (
3128
ID,
3229
)
@@ -44,9 +41,6 @@
4441
from libp2p.tools.async_service import (
4542
Service,
4643
)
47-
from libp2p.utils import (
48-
encode_varint_prefixed,
49-
)
5044

5145
from .exceptions import (
5246
NoPubsubAttached,
@@ -267,13 +261,10 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
267261
if peer_id not in self.pubsub.peers:
268262
continue
269263
stream = self.pubsub.peers[peer_id]
270-
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
271-
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
272-
try:
273-
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
274-
except StreamClosed:
275-
logger.debug("Fail to publish message to %s: stream closed", peer_id)
276-
self.pubsub._handle_dead_peer(peer_id)
264+
265+
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
266+
await self.pubsub.write_msg(stream, rpc_msg)
267+
277268
for topic in pubsub_msg.topicIDs:
278269
self.time_since_last_publish[topic] = int(time.time())
279270

@@ -829,8 +820,6 @@ async def handle_iwant(
829820

830821
packet.publish.extend(msgs_to_forward)
831822

832-
# 2) Serialize that packet
833-
rpc_msg: bytes = packet.SerializeToString()
834823
if self.pubsub is None:
835824
raise NoPubsubAttached
836825

@@ -844,14 +833,7 @@ async def handle_iwant(
844833
peer_stream = self.pubsub.peers[sender_peer_id]
845834

846835
# 4) And write the packet to the stream
847-
try:
848-
await peer_stream.write(encode_varint_prefixed(rpc_msg))
849-
except StreamClosed:
850-
logger.debug(
851-
"Fail to responed to iwant request from %s: stream closed",
852-
sender_peer_id,
853-
)
854-
self.pubsub._handle_dead_peer(sender_peer_id)
836+
await self.pubsub.write_msg(peer_stream, packet)
855837

856838
async def handle_graft(
857839
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
@@ -993,8 +975,6 @@ async def emit_control_message(
993975
packet: rpc_pb2.RPC = rpc_pb2.RPC()
994976
packet.control.CopyFrom(control_msg)
995977

996-
rpc_msg: bytes = packet.SerializeToString()
997-
998978
# Get stream for peer from pubsub
999979
if to_peer not in self.pubsub.peers:
1000980
logger.debug(
@@ -1004,8 +984,4 @@ async def emit_control_message(
1004984
peer_stream = self.pubsub.peers[to_peer]
1005985

1006986
# Write rpc to stream
1007-
try:
1008-
await peer_stream.write(encode_varint_prefixed(rpc_msg))
1009-
except StreamClosed:
1010-
logger.debug("Fail to emit control message to %s: stream closed", to_peer)
1011-
self.pubsub._handle_dead_peer(to_peer)
987+
await self.pubsub.write_msg(peer_stream, packet)

libp2p/pubsub/pubsub.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
encode_varint_prefixed,
6767
read_varint_prefixed_bytes,
6868
)
69+
from libp2p.utils.varint import encode_uvarint
6970

7071
from .pb import (
7172
rpc_pb2,
@@ -778,3 +779,43 @@ def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None:
778779

779780
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
780781
return any(topic in self.topic_ids for topic in msg.topicIDs)
782+
783+
async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool:
784+
"""
785+
Write an RPC message to a stream with proper error handling.
786+
787+
Implements WriteMsg similar to go-msgio which is used in go-libp2p
788+
Ref: https://github.com/libp2p/go-msgio/blob/master/protoio/uvarint_writer.go#L56
789+
790+
791+
:param stream: stream to write the message to
792+
:param rpc_msg: RPC message to write
793+
:return: True if successful, False if stream was closed
794+
"""
795+
try:
796+
# Calculate message size first
797+
msg_bytes = rpc_msg.SerializeToString()
798+
msg_size = len(msg_bytes)
799+
800+
# Calculate varint size and allocate exact buffer size needed
801+
802+
varint_bytes = encode_uvarint(msg_size)
803+
varint_size = len(varint_bytes)
804+
805+
# Allocate buffer with exact size (like Go's pool.Get())
806+
buf = bytearray(varint_size + msg_size)
807+
808+
# Write varint length prefix to buffer (like Go's binary.PutUvarint())
809+
buf[:varint_size] = varint_bytes
810+
811+
# Write serialized message after varint (like Go's rpc.MarshalTo())
812+
buf[varint_size:] = msg_bytes
813+
814+
# Single write operation (like Go's s.Write(buf))
815+
await stream.write(bytes(buf))
816+
return True
817+
except StreamClosed:
818+
peer_id = stream.muxed_conn.peer_id
819+
logger.debug("Fail to write message to %s: stream closed", peer_id)
820+
self._handle_dead_peer(peer_id)
821+
return False

newsfragments/687.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimized pubsub message writing by implementing a write_msg() method that uses pre-allocated buffers and single write operations, improving performance by eliminating separate varint prefix encoding and write operations in FloodSub and GossipSub.

0 commit comments

Comments
 (0)