Skip to content

Commit e3a296d

Browse files
authored
Merge pull request #1069 from sumanjeet0012/fix/pubsub-protocol-negotiation-crash-910
Add exception handling to prevent pubsub service crashes.
2 parents 761e0bb + 9d4417a commit e3a296d

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

libp2p/pubsub/pubsub.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,17 @@ async def _handle_new_peer(self, peer_id: ID) -> None:
481481

482482
logger.debug("added new peer %s", peer_id)
483483

484+
async def _handle_new_peer_safe(self, peer_id: ID) -> None:
485+
"""
486+
Safely handle new peer with exception handling.
487+
This wrapper ensures that any exceptions during peer negotiation
488+
don't crash the entire pubsub service.
489+
"""
490+
try:
491+
await self._handle_new_peer(peer_id)
492+
except Exception as error:
493+
logger.info(f"Protocol negotiation failed for peer {peer_id}: {error}")
494+
484495
def _handle_dead_peer(self, peer_id: ID) -> None:
485496
if peer_id not in self.peers:
486497
return
@@ -503,8 +514,8 @@ async def handle_peer_queue(self) -> None:
503514
async with self.peer_receive_channel:
504515
self.event_handle_peer_queue_started.set()
505516
async for peer_id in self.peer_receive_channel:
506-
# Add Peer
507-
self.manager.run_task(self._handle_new_peer, peer_id)
517+
# Add Peer - wrap in exception handler to prevent service crash
518+
self.manager.run_task(self._handle_new_peer_safe, peer_id)
508519

509520
async def handle_dead_peer_queue(self) -> None:
510521
"""

newsfragments/910.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed pubsub service crashes when protocol negotiation fails by adding proper exception handling.

tests/core/pubsub/test_pubsub.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,3 +1277,29 @@ def custom_msg_id_constructor(msg):
12771277
expected_custom_id = msg.data + msg.from_id
12781278

12791279
assert custom_msg_id == expected_custom_id
1280+
1281+
1282+
@pytest.mark.trio
1283+
async def test_handle_peer_queue_exception_handling():
1284+
"""Test that _handle_new_peer_safe gracefully handles exceptions."""
1285+
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
1286+
pubsub = pubsubs_fsub[0]
1287+
1288+
original_handle_new_peer = pubsub._handle_new_peer
1289+
1290+
async def mock_handle_new_peer(peer_id):
1291+
raise Exception("Protocol negotiation failed")
1292+
1293+
pubsub._handle_new_peer = mock_handle_new_peer
1294+
1295+
test_peer = IDFactory()
1296+
1297+
# Directly call the safe wrapper that's used by handle_peer_queue
1298+
await pubsub._handle_new_peer_safe(test_peer)
1299+
1300+
# The key test: service should still be running despite the exception
1301+
assert pubsub.manager.is_running, (
1302+
"Pubsub service should continue running even when peer negotiation fails"
1303+
)
1304+
1305+
pubsub._handle_new_peer = original_handle_new_peer

0 commit comments

Comments
 (0)