Skip to content

Commit b80817b

Browse files
authored
Merge pull request #855 from bomanaps/tests/notifee-coverage
Add listener lifecycle tests
2 parents bc1b1ed + 79f3a17 commit b80817b

File tree

4 files changed

+249
-0
lines changed

4 files changed

+249
-0
lines changed

newsfragments/855.internal.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved PubsubNotifee integration tests and added failure scenario coverage.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import pytest
2+
from multiaddr import Multiaddr
3+
import trio
4+
5+
from libp2p.abc import (
6+
INetConn,
7+
INetStream,
8+
INetwork,
9+
INotifee,
10+
)
11+
from libp2p.tools.utils import connect_swarm
12+
from tests.utils.factories import SwarmFactory
13+
14+
15+
class CountingNotifee(INotifee):
16+
def __init__(self, event: trio.Event) -> None:
17+
self._event = event
18+
19+
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
20+
pass
21+
22+
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
23+
pass
24+
25+
async def connected(self, network: INetwork, conn: INetConn) -> None:
26+
self._event.set()
27+
28+
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
29+
pass
30+
31+
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
32+
pass
33+
34+
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
35+
pass
36+
37+
38+
class SlowNotifee(INotifee):
39+
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
40+
pass
41+
42+
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
43+
pass
44+
45+
async def connected(self, network: INetwork, conn: INetConn) -> None:
46+
await trio.sleep(0.5)
47+
48+
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
49+
pass
50+
51+
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
52+
pass
53+
54+
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
55+
pass
56+
57+
58+
@pytest.mark.trio
59+
async def test_many_notifees_receive_connected_quickly() -> None:
60+
async with SwarmFactory.create_batch_and_listen(2) as swarms:
61+
count = 200
62+
events = [trio.Event() for _ in range(count)]
63+
for ev in events:
64+
swarms[0].register_notifee(CountingNotifee(ev))
65+
await connect_swarm(swarms[0], swarms[1])
66+
with trio.fail_after(1.5):
67+
for ev in events:
68+
await ev.wait()
69+
70+
71+
@pytest.mark.trio
72+
async def test_slow_notifee_does_not_block_others() -> None:
73+
async with SwarmFactory.create_batch_and_listen(2) as swarms:
74+
fast_events = [trio.Event() for _ in range(20)]
75+
for ev in fast_events:
76+
swarms[0].register_notifee(CountingNotifee(ev))
77+
swarms[0].register_notifee(SlowNotifee())
78+
await connect_swarm(swarms[0], swarms[1])
79+
# Fast notifees should complete quickly despite one slow notifee
80+
with trio.fail_after(0.3):
81+
for ev in fast_events:
82+
await ev.wait()
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import enum
2+
3+
import pytest
4+
from multiaddr import Multiaddr
5+
import trio
6+
7+
from libp2p.abc import (
8+
INetConn,
9+
INetStream,
10+
INetwork,
11+
INotifee,
12+
)
13+
from libp2p.tools.async_service import background_trio_service
14+
from libp2p.tools.constants import LISTEN_MADDR
15+
from tests.utils.factories import SwarmFactory
16+
17+
18+
class Event(enum.Enum):
19+
Listen = 0
20+
ListenClose = 1
21+
22+
23+
class MyNotifee(INotifee):
24+
def __init__(self, events: list[Event]):
25+
self.events = events
26+
27+
async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
28+
pass
29+
30+
async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
31+
pass
32+
33+
async def connected(self, network: INetwork, conn: INetConn) -> None:
34+
pass
35+
36+
async def disconnected(self, network: INetwork, conn: INetConn) -> None:
37+
pass
38+
39+
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
40+
self.events.append(Event.Listen)
41+
42+
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
43+
self.events.append(Event.ListenClose)
44+
45+
46+
async def wait_for_event(
47+
events_list: list[Event], event: Event, timeout: float = 1.0
48+
) -> bool:
49+
with trio.move_on_after(timeout):
50+
while event not in events_list:
51+
await trio.sleep(0.01)
52+
return True
53+
return False
54+
55+
56+
@pytest.mark.trio
57+
async def test_listen_emitted_when_registered_before_listen():
58+
events: list[Event] = []
59+
swarm = SwarmFactory.build()
60+
swarm.register_notifee(MyNotifee(events))
61+
async with background_trio_service(swarm):
62+
# Start listening now; notifee was registered beforehand
63+
assert await swarm.listen(LISTEN_MADDR)
64+
assert await wait_for_event(events, Event.Listen)
65+
66+
67+
@pytest.mark.trio
68+
async def test_single_listener_close_emits_listen_close():
69+
events: list[Event] = []
70+
swarm = SwarmFactory.build()
71+
swarm.register_notifee(MyNotifee(events))
72+
async with background_trio_service(swarm):
73+
assert await swarm.listen(LISTEN_MADDR)
74+
# Explicitly notify listen_close (close path via manager doesn't emit it)
75+
await swarm.notify_listen_close(LISTEN_MADDR)
76+
assert await wait_for_event(events, Event.ListenClose)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from typing import cast
2+
3+
import pytest
4+
import trio
5+
6+
from libp2p.tools.utils import connect
7+
from tests.utils.factories import PubsubFactory
8+
9+
10+
@pytest.mark.trio
11+
async def test_connected_enqueues_and_adds_peer():
12+
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
13+
await connect(p0.host, p1.host)
14+
await p0.wait_until_ready()
15+
# Wait until peer is added via queue processing
16+
with trio.fail_after(1.0):
17+
while p1.my_id not in p0.peers:
18+
await trio.sleep(0.01)
19+
assert p1.my_id in p0.peers
20+
21+
22+
@pytest.mark.trio
23+
async def test_disconnected_enqueues_and_removes_peer():
24+
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
25+
await connect(p0.host, p1.host)
26+
await p0.wait_until_ready()
27+
# Ensure present first
28+
with trio.fail_after(1.0):
29+
while p1.my_id not in p0.peers:
30+
await trio.sleep(0.01)
31+
# Now disconnect and expect removal via dead peer queue
32+
await p0.host.get_network().close_peer(p1.host.get_id())
33+
with trio.fail_after(1.0):
34+
while p1.my_id in p0.peers:
35+
await trio.sleep(0.01)
36+
assert p1.my_id not in p0.peers
37+
38+
39+
@pytest.mark.trio
40+
async def test_channel_closed_is_swallowed_in_notifee(monkeypatch) -> None:
41+
# Ensure PubsubNotifee catches BrokenResourceError from its send channel
42+
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
43+
# Find the PubsubNotifee registered on the network
44+
from libp2p.pubsub.pubsub_notifee import PubsubNotifee
45+
46+
network = p0.host.get_network()
47+
notifees = getattr(network, "notifees", [])
48+
target = None
49+
for nf in notifees:
50+
if isinstance(nf, cast(type, PubsubNotifee)):
51+
target = nf
52+
break
53+
assert target is not None, "PubsubNotifee not found on network"
54+
55+
async def failing_send(_peer_id): # type: ignore[no-redef]
56+
raise trio.BrokenResourceError
57+
58+
# Make initiator queue send fail; PubsubNotifee should swallow
59+
monkeypatch.setattr(target.initiator_peers_queue, "send", failing_send)
60+
61+
# Connect peers; if exceptions are swallowed, service stays running
62+
await connect(p0.host, p1.host)
63+
await p0.wait_until_ready()
64+
assert True
65+
66+
67+
@pytest.mark.trio
68+
async def test_duplicate_connection_does_not_duplicate_peer_state():
69+
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
70+
await connect(p0.host, p1.host)
71+
await p0.wait_until_ready()
72+
with trio.fail_after(1.0):
73+
while p1.my_id not in p0.peers:
74+
await trio.sleep(0.01)
75+
# Connect again should not add duplicates
76+
await connect(p0.host, p1.host)
77+
await trio.sleep(0.1)
78+
assert list(p0.peers.keys()).count(p1.my_id) == 1
79+
80+
81+
@pytest.mark.trio
82+
async def test_blacklist_blocks_peer_added_by_notifee():
83+
async with PubsubFactory.create_batch_with_floodsub(2) as (p0, p1):
84+
# Blacklist before connecting
85+
p0.add_to_blacklist(p1.my_id)
86+
await connect(p0.host, p1.host)
87+
await p0.wait_until_ready()
88+
# Give handler a chance to run
89+
await trio.sleep(0.1)
90+
assert p1.my_id not in p0.peers

0 commit comments

Comments
 (0)