Skip to content

Commit 95e1f62

Browse files
authored
Merge pull request #845 from bomanaps/feat/implement-listen_close-mynotifee
Implement listen_close notification and tests
2 parents cff0bfc + 13379e3 commit 95e1f62

File tree

2 files changed

+190
-8
lines changed

2 files changed

+190
-8
lines changed

libp2p/network/swarm.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,16 @@ async def close(self) -> None:
330330

331331
# Close all listeners
332332
if hasattr(self, "listeners"):
333-
for listener in self.listeners.values():
333+
for maddr_str, listener in self.listeners.items():
334334
await listener.close()
335+
# Notify about listener closure
336+
try:
337+
multiaddr = Multiaddr(maddr_str)
338+
await self.notify_listen_close(multiaddr)
339+
except Exception as e:
340+
logger.warning(
341+
f"Failed to notify listen_close for {maddr_str}: {e}"
342+
)
335343
self.listeners.clear()
336344

337345
# Close the transport if it exists and has a close method
@@ -420,7 +428,9 @@ async def notify_closed_stream(self, stream: INetStream) -> None:
420428
nursery.start_soon(notifee.closed_stream, self, stream)
421429

422430
async def notify_listen_close(self, multiaddr: Multiaddr) -> None:
423-
raise NotImplementedError
431+
async with trio.open_nursery() as nursery:
432+
for notifee in self.notifees:
433+
nursery.start_soon(notifee.listen_close, self, multiaddr)
424434

425435
# Generic notifier used by NetStream._notify_closed
426436
async def notify_all(self, notifier: Callable[[INotifee], Awaitable[None]]) -> None:

tests/core/network/test_notify.py

Lines changed: 178 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
Note: Listen event does not get hit because MyNotifee is passed
66
into network after network has already started listening
77
8-
TODO: Add tests for closed_stream, listen_close when those
9-
features are implemented in swarm
8+
Note: ClosedStream events are processed asynchronously and may not be
9+
immediately available due to the rapid nature of operations
1010
"""
1111

1212
import enum
13+
from unittest.mock import Mock
1314

1415
import pytest
1516
from multiaddr import Multiaddr
@@ -29,11 +30,11 @@
2930

3031
class Event(enum.Enum):
3132
OpenedStream = 0
32-
ClosedStream = 1 # Not implemented
33+
ClosedStream = 1
3334
Connected = 2
3435
Disconnected = 3
3536
Listen = 4
36-
ListenClose = 5 # Not implemented
37+
ListenClose = 5
3738

3839

3940
class MyNotifee(INotifee):
@@ -60,8 +61,11 @@ async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
6061
self.events.append(Event.Listen)
6162

6263
async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
63-
# TODO: It is not implemented yet.
64-
pass
64+
if network is None:
65+
raise ValueError("network parameter cannot be None")
66+
if multiaddr is None:
67+
raise ValueError("multiaddr parameter cannot be None")
68+
self.events.append(Event.ListenClose)
6569

6670

6771
@pytest.mark.trio
@@ -123,3 +127,171 @@ async def wait_for_event(events_list, event, timeout=1.0):
123127
assert await wait_for_event(events_1_1, Event.OpenedStream, 1.0)
124128
assert await wait_for_event(events_1_1, Event.ClosedStream, 1.0)
125129
assert await wait_for_event(events_1_1, Event.Disconnected, 1.0)
130+
131+
# Note: ListenClose events are triggered when swarm closes during cleanup
132+
# The test framework automatically closes listeners, triggering ListenClose
133+
# notifications
134+
135+
136+
async def wait_for_event(events_list, event, timeout=1.0):
137+
"""Helper to wait for a specific event to appear in the events list."""
138+
with trio.move_on_after(timeout):
139+
while event not in events_list:
140+
await trio.sleep(0.01)
141+
return True
142+
return False
143+
144+
145+
@pytest.mark.trio
146+
async def test_notify_with_closed_stream_and_listen_close():
147+
"""Test that closed_stream and listen_close events are properly triggered."""
148+
# Event lists for notifees
149+
events_0 = []
150+
events_1 = []
151+
152+
# Create two swarms
153+
async with SwarmFactory.create_batch_and_listen(2) as swarms:
154+
# Register notifees
155+
notifee_0 = MyNotifee(events_0)
156+
notifee_1 = MyNotifee(events_1)
157+
158+
swarms[0].register_notifee(notifee_0)
159+
swarms[1].register_notifee(notifee_1)
160+
161+
# Connect swarms
162+
await connect_swarm(swarms[0], swarms[1])
163+
164+
# Create and close a stream to trigger closed_stream event
165+
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
166+
await stream.close()
167+
168+
# Note: Events are processed asynchronously and may not be immediately available
169+
# due to the rapid nature of operations
170+
171+
172+
@pytest.mark.trio
173+
async def test_notify_edge_cases():
174+
"""Test edge cases for notify system."""
175+
events = []
176+
177+
async with SwarmFactory.create_batch_and_listen(2) as swarms:
178+
notifee = MyNotifee(events)
179+
swarms[0].register_notifee(notifee)
180+
181+
# Connect swarms first
182+
await connect_swarm(swarms[0], swarms[1])
183+
184+
# Test 1: Multiple rapid stream operations
185+
streams = []
186+
for _ in range(5):
187+
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
188+
streams.append(stream)
189+
190+
# Close all streams rapidly
191+
for stream in streams:
192+
await stream.close()
193+
194+
195+
@pytest.mark.trio
196+
async def test_my_notifee_error_handling():
197+
"""Test error handling for invalid parameters in MyNotifee methods."""
198+
events = []
199+
notifee = MyNotifee(events)
200+
201+
# Mock objects for testing
202+
mock_network = Mock(spec=INetwork)
203+
mock_stream = Mock(spec=INetStream)
204+
mock_multiaddr = Mock(spec=Multiaddr)
205+
206+
# Test closed_stream with None parameters
207+
with pytest.raises(ValueError, match="network parameter cannot be None"):
208+
await notifee.closed_stream(None, mock_stream) # type: ignore
209+
210+
with pytest.raises(ValueError, match="stream parameter cannot be None"):
211+
await notifee.closed_stream(mock_network, None) # type: ignore
212+
213+
# Test listen_close with None parameters
214+
with pytest.raises(ValueError, match="network parameter cannot be None"):
215+
await notifee.listen_close(None, mock_multiaddr) # type: ignore
216+
217+
with pytest.raises(ValueError, match="multiaddr parameter cannot be None"):
218+
await notifee.listen_close(mock_network, None) # type: ignore
219+
220+
# Verify no events were recorded due to errors
221+
assert len(events) == 0
222+
223+
224+
@pytest.mark.trio
225+
async def test_rapid_stream_operations():
226+
"""Test rapid stream open/close operations."""
227+
events_0 = []
228+
events_1 = []
229+
230+
async with SwarmFactory.create_batch_and_listen(2) as swarms:
231+
notifee_0 = MyNotifee(events_0)
232+
notifee_1 = MyNotifee(events_1)
233+
234+
swarms[0].register_notifee(notifee_0)
235+
swarms[1].register_notifee(notifee_1)
236+
237+
# Connect swarms
238+
await connect_swarm(swarms[0], swarms[1])
239+
240+
# Rapidly create and close multiple streams
241+
streams = []
242+
for _ in range(3):
243+
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
244+
streams.append(stream)
245+
246+
# Close all streams immediately
247+
for stream in streams:
248+
await stream.close()
249+
250+
# Verify OpenedStream events are recorded
251+
assert events_0.count(Event.OpenedStream) == 3
252+
assert events_1.count(Event.OpenedStream) == 3
253+
254+
# Close peer to trigger disconnection events
255+
await swarms[0].close_peer(swarms[1].get_peer_id())
256+
257+
258+
@pytest.mark.trio
259+
async def test_concurrent_stream_operations():
260+
"""Test concurrent stream operations using trio nursery."""
261+
events_0 = []
262+
events_1 = []
263+
264+
async with SwarmFactory.create_batch_and_listen(2) as swarms:
265+
notifee_0 = MyNotifee(events_0)
266+
notifee_1 = MyNotifee(events_1)
267+
268+
swarms[0].register_notifee(notifee_0)
269+
swarms[1].register_notifee(notifee_1)
270+
271+
# Connect swarms
272+
await connect_swarm(swarms[0], swarms[1])
273+
274+
async def create_and_close_stream():
275+
"""Create and immediately close a stream."""
276+
stream = await swarms[0].new_stream(swarms[1].get_peer_id())
277+
await stream.close()
278+
279+
# Run multiple stream operations concurrently
280+
async with trio.open_nursery() as nursery:
281+
for _ in range(4):
282+
nursery.start_soon(create_and_close_stream)
283+
284+
# Verify some OpenedStream events are recorded
285+
# (concurrent operations may not all succeed)
286+
opened_count_0 = events_0.count(Event.OpenedStream)
287+
opened_count_1 = events_1.count(Event.OpenedStream)
288+
289+
assert opened_count_0 > 0, (
290+
f"Expected some OpenedStream events, got {opened_count_0}"
291+
)
292+
assert opened_count_1 > 0, (
293+
f"Expected some OpenedStream events, got {opened_count_1}"
294+
)
295+
296+
# Close peer to trigger disconnection events
297+
await swarms[0].close_peer(swarms[1].get_peer_id())

0 commit comments

Comments
 (0)