Skip to content

Commit 79094d7

Browse files
guha-rahulseetadev
andauthored
Optimize pubsub publishing to support multiple topics in single RPC message (#686)
* init * add newsfragment * lint --------- Co-authored-by: Manu Sheel Gupta <[email protected]>
1 parent 2ed2587 commit 79094d7

File tree

4 files changed

+18
-17
lines changed

4 files changed

+18
-17
lines changed

libp2p/abc.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,14 +2130,14 @@ async def unsubscribe(self, topic_id: str) -> None:
21302130
...
21312131

21322132
@abstractmethod
2133-
async def publish(self, topic_id: str, data: bytes) -> None:
2133+
async def publish(self, topic_id: str | list[str], data: bytes) -> None:
21342134
"""
2135-
Publish a message to a topic.
2135+
Publish a message to a topic or multiple topics.
21362136
21372137
Parameters
21382138
----------
2139-
topic_id : str
2140-
The identifier of the topic.
2139+
topic_id : str | list[str]
2140+
The identifier of the topic (str) or topics (list[str]).
21412141
data : bytes
21422142
The data to publish.
21432143

libp2p/pubsub/pubsub.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -620,16 +620,22 @@ async def message_all_peers(self, raw_msg: bytes) -> None:
620620
logger.debug("Fail to message peer %s: stream closed", peer_id)
621621
self._handle_dead_peer(peer_id)
622622

623-
async def publish(self, topic_id: str, data: bytes) -> None:
623+
async def publish(self, topic_id: str | list[str], data: bytes) -> None:
624624
"""
625-
Publish data to a topic.
625+
Publish data to a topic or multiple topics.
626626
627-
:param topic_id: topic which we are going to publish the data to
627+
:param topic_id: topic (str) or topics (list[str]) to publish the data to
628628
:param data: data which we are publishing
629629
"""
630+
# Handle both single topic (str) and multiple topics (list[str])
631+
if isinstance(topic_id, str):
632+
topic_ids = [topic_id]
633+
else:
634+
topic_ids = topic_id
635+
630636
msg = rpc_pb2.Message(
631637
data=data,
632-
topicIDs=[topic_id],
638+
topicIDs=topic_ids,
633639
# Origin is ourself.
634640
from_id=self.my_id.to_bytes(),
635641
seqno=self._next_seqno(),

newsfragments/685.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimized pubsub publishing to send multiple topics in a single message instead of separate messages per topic.

tests/utils/pubsub/floodsub_integration_test_settings.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
# type: ignore
2-
# To add typing to this module, it's better to do it after refactoring test cases
3-
# into classes
4-
51
import pytest
62
import trio
73

@@ -151,7 +147,7 @@
151147
]
152148

153149
floodsub_protocol_pytest_params = [
154-
pytest.param(test_case, id=test_case["name"])
150+
pytest.param(test_case, id=str(test_case["name"]))
155151
for test_case in FLOODSUB_PROTOCOL_TEST_CASES
156152
]
157153

@@ -241,10 +237,8 @@ async def subscribe_node(node_id, topic):
241237
data = msg["data"]
242238
node_id = msg["node_id"]
243239

244-
# Publish message
245-
# TODO: Should be single RPC package with several topics
246-
for topic in topics:
247-
await pubsub_map[node_id].publish(topic, data)
240+
# Publish message - now uses single RPC package with several topics
241+
await pubsub_map[node_id].publish(topics, data)
248242

249243
# For each topic in topics, add (topic, node_id, data) tuple to
250244
# ordered test list

0 commit comments

Comments
 (0)