Skip to content

Commit f7757fa

Browse files
committed
docs: add documentation and examples for new NetStream state management
1 parent 5bc4d01 commit f7757fa

File tree

4 files changed

+340
-8
lines changed

4 files changed

+340
-8
lines changed
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
"""
2+
Enhanced NetStream Example for py-libp2p with State Management
3+
4+
This example demonstrates the new NetStream features including:
5+
- State tracking and transitions
6+
- Proper error handling and validation
7+
- Resource cleanup and event notifications
8+
- Thread-safe operations with Trio locks
9+
10+
Based on the standard echo demo but enhanced to show NetStream state management.
11+
"""
12+
13+
import argparse
14+
import random
15+
import secrets
16+
17+
import multiaddr
18+
import trio
19+
20+
from libp2p import (
21+
new_host,
22+
)
23+
from libp2p.crypto.secp256k1 import (
24+
create_new_key_pair,
25+
)
26+
from libp2p.custom_types import (
27+
TProtocol,
28+
)
29+
from libp2p.network.stream.exceptions import (
30+
StreamClosed,
31+
StreamEOF,
32+
StreamReset,
33+
)
34+
from libp2p.network.stream.net_stream import (
35+
NetStream,
36+
StreamState,
37+
)
38+
from libp2p.peer.peerinfo import (
39+
info_from_p2p_addr,
40+
)
41+
42+
PROTOCOL_ID = TProtocol("/echo/1.0.0")
43+
44+
45+
async def enhanced_echo_handler(stream: NetStream) -> None:
46+
"""
47+
Enhanced echo handler that demonstrates NetStream state management.
48+
"""
49+
print(f"New connection established: {stream}")
50+
print(f"Initial stream state: {await stream.state}")
51+
52+
try:
53+
# Verify stream is in expected initial state
54+
assert await stream.state == StreamState.OPEN
55+
assert await stream.is_readable()
56+
assert await stream.is_writable()
57+
print("✓ Stream initialized in OPEN state")
58+
59+
# Read incoming data with proper state checking
60+
print("Waiting for client data...")
61+
62+
while await stream.is_readable():
63+
try:
64+
# Read data from client
65+
data = await stream.read(1024)
66+
if not data:
67+
print("Received empty data, client may have closed")
68+
break
69+
70+
print(f"Received: {data.decode('utf-8').strip()}")
71+
72+
# Check if we can still write before echoing
73+
if await stream.is_writable():
74+
await stream.write(data)
75+
print(f"Echoed: {data.decode('utf-8').strip()}")
76+
else:
77+
print("Cannot echo - stream not writable")
78+
break
79+
80+
except StreamEOF:
81+
print("Client closed their write side (EOF)")
82+
break
83+
except StreamReset:
84+
print("Stream was reset by client")
85+
return
86+
except StreamClosed as e:
87+
print(f"Stream operation failed: {e}")
88+
break
89+
90+
# Demonstrate graceful closure
91+
current_state = await stream.state
92+
print(f"Current state before close: {current_state}")
93+
94+
if current_state not in [StreamState.CLOSE_BOTH, StreamState.RESET]:
95+
await stream.close()
96+
print("Server closed write side")
97+
98+
final_state = await stream.state
99+
print(f"Final stream state: {final_state}")
100+
101+
except Exception as e:
102+
print(f"Handler error: {e}")
103+
# Reset stream on unexpected errors
104+
if await stream.state not in [StreamState.RESET, StreamState.CLOSE_BOTH]:
105+
await stream.reset()
106+
print("Stream reset due to error")
107+
108+
109+
async def enhanced_client_demo(stream: NetStream) -> None:
110+
"""
111+
Enhanced client that demonstrates various NetStream state scenarios.
112+
"""
113+
print(f"Client stream established: {stream}")
114+
print(f"Initial state: {await stream.state}")
115+
116+
try:
117+
# Verify initial state
118+
assert await stream.state == StreamState.OPEN
119+
print("✓ Client stream in OPEN state")
120+
121+
# Scenario 1: Normal communication
122+
message = b"Hello from enhanced NetStream client!\n"
123+
124+
if await stream.is_writable():
125+
await stream.write(message)
126+
print(f"Sent: {message.decode('utf-8').strip()}")
127+
else:
128+
print("Cannot write - stream not writable")
129+
return
130+
131+
# Close write side to signal EOF to server
132+
await stream.close()
133+
print("Client closed write side")
134+
135+
# Verify state transition
136+
state_after_close = await stream.state
137+
print(f"State after close: {state_after_close}")
138+
assert state_after_close == StreamState.CLOSE_WRITE
139+
assert await stream.is_readable() # Should still be readable
140+
assert not await stream.is_writable() # Should not be writable
141+
142+
# Try to write (should fail)
143+
try:
144+
await stream.write(b"This should fail")
145+
print("ERROR: Write succeeded when it should have failed!")
146+
except StreamClosed as e:
147+
print(f"✓ Expected error when writing to closed stream: {e}")
148+
149+
# Read the echo response
150+
if await stream.is_readable():
151+
try:
152+
response = await stream.read()
153+
print(f"Received echo: {response.decode('utf-8').strip()}")
154+
except StreamEOF:
155+
print("Server closed their write side")
156+
except StreamReset:
157+
print("Stream was reset")
158+
159+
# Check final state
160+
final_state = await stream.state
161+
print(f"Final client state: {final_state}")
162+
163+
except Exception as e:
164+
print(f"Client error: {e}")
165+
# Reset on error
166+
await stream.reset()
167+
print("Client reset stream due to error")
168+
169+
170+
async def run_enhanced_demo(
171+
port: int, destination: str, seed: int | None = None
172+
) -> None:
173+
"""
174+
Run enhanced echo demo with NetStream state management.
175+
"""
176+
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
177+
178+
# Generate or use provided key
179+
if seed:
180+
random.seed(seed)
181+
secret_number = random.getrandbits(32 * 8)
182+
secret = secret_number.to_bytes(length=32, byteorder="big")
183+
else:
184+
secret = secrets.token_bytes(32)
185+
186+
host = new_host(key_pair=create_new_key_pair(secret))
187+
188+
async with host.run(listen_addrs=[listen_addr]):
189+
print(f"Host ID: {host.get_id().to_string()}")
190+
print("=" * 60)
191+
192+
if not destination: # Server mode
193+
print("🖥️ ENHANCED ECHO SERVER MODE")
194+
print("=" * 60)
195+
196+
# Set the enhanced stream handler
197+
host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler)
198+
199+
print(
200+
"Run client from another console:\n"
201+
f"python3 example_net_stream.py "
202+
f"-d {host.get_addrs()[0]}\n"
203+
)
204+
print("Waiting for connections...")
205+
print("Press Ctrl+C to stop server")
206+
await trio.sleep_forever()
207+
208+
else: # Client mode
209+
print("📱 ENHANCED ECHO CLIENT MODE")
210+
print("=" * 60)
211+
212+
# Connect to server
213+
maddr = multiaddr.Multiaddr(destination)
214+
info = info_from_p2p_addr(maddr)
215+
await host.connect(info)
216+
print(f"Connected to server: {info.peer_id.pretty()}")
217+
218+
# Create stream and run enhanced demo
219+
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
220+
await enhanced_client_demo(stream)
221+
222+
print("\n" + "=" * 60)
223+
print("CLIENT DEMO COMPLETE")
224+
225+
226+
def main() -> None:
227+
example_maddr = (
228+
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
229+
)
230+
231+
parser = argparse.ArgumentParser(
232+
formatter_class=argparse.RawDescriptionHelpFormatter
233+
)
234+
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
235+
parser.add_argument(
236+
"-d",
237+
"--destination",
238+
type=str,
239+
help=f"destination multiaddr string, e.g. {example_maddr}",
240+
)
241+
parser.add_argument(
242+
"-s",
243+
"--seed",
244+
type=int,
245+
help="seed for deterministic peer ID generation",
246+
)
247+
parser.add_argument(
248+
"--demo-states", action="store_true", help="run state transition demo only"
249+
)
250+
251+
args = parser.parse_args()
252+
253+
try:
254+
trio.run(run_enhanced_demo, args.port, args.destination, args.seed)
255+
except KeyboardInterrupt:
256+
print("\n👋 Demo interrupted by user")
257+
except Exception as e:
258+
print(f"❌ Demo failed: {e}")
259+
260+
261+
if __name__ == "__main__":
262+
main()

libp2p/network/stream/net_stream.py

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,67 @@ class StreamState(Enum):
3939

4040

4141
class NetStream(INetStream):
42-
"""Class representing NetStream Handler"""
42+
"""
43+
Summary
44+
_______
45+
A Network stream implementation.
46+
47+
NetStream wraps a muxed stream and provides proper state tracking, resource cleanup,
48+
and event notification capabilities.
49+
50+
State Machine
51+
_____________
52+
53+
.. code:: markdown
54+
55+
[CREATED] → OPEN → CLOSE_READ → CLOSE_BOTH → [CLEANUP]
56+
↓ ↗ ↗
57+
CLOSE_WRITE → ← ↗
58+
↓ ↗
59+
RESET → → → → → → → →
60+
61+
State Transitions
62+
_________________
63+
- OPEN → CLOSE_READ: EOF encountered during read()
64+
- OPEN → CLOSE_WRITE: Explicit close() call
65+
- OPEN → RESET: reset() call or critical stream error
66+
- CLOSE_READ → CLOSE_BOTH: Explicit close() call
67+
- CLOSE_WRITE → CLOSE_BOTH: EOF encountered during read()
68+
- Any state → RESET: reset() call
69+
70+
Terminal States (trigger cleanup)
71+
_________________________________
72+
- CLOSE_BOTH: Stream fully closed, triggers resource cleanup
73+
- RESET: Stream reset/terminated, triggers resource cleanup
74+
75+
Operation Validity by State
76+
___________________________
77+
OPEN: read() ✓ write() ✓ close() ✓ reset() ✓
78+
CLOSE_READ: read() ✗ write() ✓ close() ✓ reset() ✓
79+
CLOSE_WRITE: read() ✓ write() ✗ close() ✓ reset() ✓
80+
CLOSE_BOTH: read() ✗ write() ✗ close() ✓ reset() ✓
81+
RESET: read() ✗ write() ✗ close() ✓ reset() ✓
82+
83+
Cleanup Process (triggered by CLOSE_BOTH or RESET)
84+
__________________________________________________
85+
1. Remove stream from SwarmConn
86+
2. Notify all listeners with ClosedStream event
87+
3. Decrement reference counter
88+
4. Background cleanup via nursery (if provided)
89+
90+
Thread Safety
91+
_____________
92+
All state operations are protected by trio.Lock() for safe concurrent access.
93+
State checks and modifications are atomic operations.
94+
95+
Example: See :file:`examples/doc-examples/example_net_stream.py`
96+
97+
:param muxed_stream (IMuxedStream): The underlying muxed stream
98+
:param nursery (Optional[trio.Nursery]): Nursery for background cleanup tasks
99+
:raises StreamClosed: When attempting invalid operations on closed streams
100+
:raises StreamEOF: When EOF is encountered during read operations
101+
:raises StreamReset: When the underlying stream has been reset
102+
"""
43103

44104
muxed_stream: IMuxedStream
45105
protocol_id: Optional[TProtocol]
@@ -87,7 +147,10 @@ async def read(self, n: Optional[int] = None) -> bytes:
87147
Read from stream.
88148
89149
:param n: number of bytes to read
90-
:return: bytes of input
150+
:raises StreamClosed: If `NetStream` is closed for reading
151+
:raises StreamReset: If `NetStream` is reset
152+
:raises StreamEOF: If trying to read after reaching end of file
153+
:return: Bytes read from the stream
91154
"""
92155
async with self._state_lock:
93156
if self.__stream_state in [
@@ -126,6 +189,8 @@ async def write(self, data: bytes) -> None:
126189
Write to stream.
127190
128191
:param data: bytes to write
192+
:raises StreamClosed: If `NetStream` is closed for writing or reset
193+
:raises StreamClosed: If `StreamError` occurred while writing
129194
"""
130195
async with self._state_lock:
131196
if self.__stream_state in [
@@ -218,21 +283,24 @@ def get_remote_address(self) -> Optional[tuple[str, int]]:
218283
"""Delegate to the underlying muxed stream."""
219284
return self.muxed_stream.get_remote_address()
220285

221-
def is_closed(self) -> bool:
286+
async def is_closed(self) -> bool:
222287
"""Check if stream is closed."""
223-
return self.__stream_state in [StreamState.CLOSE_BOTH, StreamState.RESET]
288+
current_state = await self.state
289+
return current_state in [StreamState.CLOSE_BOTH, StreamState.RESET]
224290

225-
def is_readable(self) -> bool:
291+
async def is_readable(self) -> bool:
226292
"""Check if stream is readable."""
227-
return self.__stream_state not in [
293+
current_state = await self.state
294+
return current_state not in [
228295
StreamState.CLOSE_READ,
229296
StreamState.CLOSE_BOTH,
230297
StreamState.RESET,
231298
]
232299

233-
def is_writable(self) -> bool:
300+
async def is_writable(self) -> bool:
234301
"""Check if stream is writable."""
235-
return self.__stream_state not in [
302+
current_state = await self.state
303+
return current_state not in [
236304
StreamState.CLOSE_WRITE,
237305
StreamState.CLOSE_BOTH,
238306
StreamState.RESET,

newsfragments/300.breaking.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
The `NetStream.state` property is now async and requires `await`. Update any direct state access to use `await stream.state`.

newsfragments/300.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added proper state management and resource cleanup to `NetStream`, fixing memory leaks and improved error handling.

0 commit comments

Comments
 (0)