Skip to content

Commit c33ab32

Browse files
committed
init
1 parent d2825af commit c33ab32

File tree

3 files changed

+45
-43
lines changed

3 files changed

+45
-43
lines changed

libp2p/pubsub/floodsub.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,9 @@
1212
from libp2p.custom_types import (
1313
TProtocol,
1414
)
15-
from libp2p.network.stream.exceptions import (
16-
StreamClosed,
17-
)
1815
from libp2p.peer.id import (
1916
ID,
2017
)
21-
from libp2p.utils import (
22-
encode_varint_prefixed,
23-
)
2418

2519
from .exceptions import (
2620
PubsubRouterError,
@@ -120,13 +114,7 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
120114
if peer_id not in pubsub.peers:
121115
continue
122116
stream = pubsub.peers[peer_id]
123-
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
124-
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
125-
try:
126-
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
127-
except StreamClosed:
128-
logger.debug("Fail to publish message to %s: stream closed", peer_id)
129-
pubsub._handle_dead_peer(peer_id)
117+
await pubsub.write_msg(stream, rpc_msg)
130118

131119
async def join(self, topic: str) -> None:
132120
"""

libp2p/pubsub/gossipsub.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
from libp2p.custom_types import (
2525
TProtocol,
2626
)
27-
from libp2p.network.stream.exceptions import (
28-
StreamClosed,
29-
)
3027
from libp2p.peer.id import (
3128
ID,
3229
)
@@ -42,9 +39,6 @@
4239
from libp2p.tools.async_service import (
4340
Service,
4441
)
45-
from libp2p.utils import (
46-
encode_varint_prefixed,
47-
)
4842

4943
from .exceptions import (
5044
NoPubsubAttached,
@@ -249,14 +243,8 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
249243
if peer_id not in self.pubsub.peers:
250244
continue
251245
stream = self.pubsub.peers[peer_id]
252-
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
253-
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
254246
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
255-
try:
256-
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
257-
except StreamClosed:
258-
logger.debug("Fail to publish message to %s: stream closed", peer_id)
259-
self.pubsub._handle_dead_peer(peer_id)
247+
await self.pubsub.write_msg(stream, rpc_msg)
260248
for topic in pubsub_msg.topicIDs:
261249
self.time_since_last_publish[topic] = int(time.time())
262250

@@ -705,8 +693,6 @@ async def handle_iwant(
705693

706694
packet.publish.extend(msgs_to_forward)
707695

708-
# 2) Serialize that packet
709-
rpc_msg: bytes = packet.SerializeToString()
710696
if self.pubsub is None:
711697
raise NoPubsubAttached
712698

@@ -720,14 +706,7 @@ async def handle_iwant(
720706
peer_stream = self.pubsub.peers[sender_peer_id]
721707

722708
# 4) And write the packet to the stream
723-
try:
724-
await peer_stream.write(encode_varint_prefixed(rpc_msg))
725-
except StreamClosed:
726-
logger.debug(
727-
"Fail to responed to iwant request from %s: stream closed",
728-
sender_peer_id,
729-
)
730-
self.pubsub._handle_dead_peer(sender_peer_id)
709+
await self.pubsub.write_msg(peer_stream, packet)
731710

732711
async def handle_graft(
733712
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
@@ -826,8 +805,6 @@ async def emit_control_message(
826805
packet: rpc_pb2.RPC = rpc_pb2.RPC()
827806
packet.control.CopyFrom(control_msg)
828807

829-
rpc_msg: bytes = packet.SerializeToString()
830-
831808
# Get stream for peer from pubsub
832809
if to_peer not in self.pubsub.peers:
833810
logger.debug(
@@ -837,8 +814,4 @@ async def emit_control_message(
837814
peer_stream = self.pubsub.peers[to_peer]
838815

839816
# Write rpc to stream
840-
try:
841-
await peer_stream.write(encode_varint_prefixed(rpc_msg))
842-
except StreamClosed:
843-
logger.debug("Fail to emit control message to %s: stream closed", to_peer)
844-
self.pubsub._handle_dead_peer(to_peer)
817+
await self.pubsub.write_msg(peer_stream, packet)

libp2p/pubsub/pubsub.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
encode_varint_prefixed,
6767
read_varint_prefixed_bytes,
6868
)
69+
from libp2p.utils.varint import encode_uvarint
6970

7071
from .pb import (
7172
rpc_pb2,
@@ -773,3 +774,43 @@ def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None:
773774

774775
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
775776
return any(topic in self.topic_ids for topic in msg.topicIDs)
777+
778+
async def write_msg(self, stream: INetStream, rpc_msg: rpc_pb2.RPC) -> bool:
779+
"""
780+
Write an RPC message to a stream with proper error handling.
781+
782+
Implements WriteMsg similar to go-libp2p-pubsub comm.go
783+
Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
784+
785+
786+
:param stream: stream to write the message to
787+
:param rpc_msg: RPC message to write
788+
:return: True if successful, False if stream was closed
789+
"""
790+
try:
791+
# Calculate message size first
792+
msg_bytes = rpc_msg.SerializeToString()
793+
msg_size = len(msg_bytes)
794+
795+
# Calculate varint size and allocate exact buffer size needed
796+
797+
varint_bytes = encode_uvarint(msg_size)
798+
varint_size = len(varint_bytes)
799+
800+
# Allocate buffer with exact size (like Go's pool.Get())
801+
buf = bytearray(varint_size + msg_size)
802+
803+
# Write varint length prefix to buffer (like Go's binary.PutUvarint())
804+
buf[:varint_size] = varint_bytes
805+
806+
# Write serialized message after varint (like Go's rpc.MarshalTo())
807+
buf[varint_size:] = msg_bytes
808+
809+
# Single write operation (like Go's s.Write(buf))
810+
await stream.write(bytes(buf))
811+
return True
812+
except StreamClosed:
813+
peer_id = stream.muxed_conn.peer_id
814+
logger.debug("Fail to write message to %s: stream closed", peer_id)
815+
self._handle_dead_peer(peer_id)
816+
return False

0 commit comments

Comments
 (0)