Skip to content

Commit 286752c

Browse files
authored
Merge pull request #658 from AkMo3/main
fix: add connection state for net stream and gracefully handle failure
2 parents 4e9fa87 + 390ac2e commit 286752c

File tree

5 files changed

+503
-14
lines changed

5 files changed

+503
-14
lines changed
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
"""
2+
Enhanced NetStream Example for py-libp2p with State Management
3+
4+
This example demonstrates the new NetStream features including:
5+
- State tracking and transitions
6+
- Proper error handling and validation
7+
- Resource cleanup and event notifications
8+
- Thread-safe operations with Trio locks
9+
10+
Based on the standard echo demo but enhanced to show NetStream state management.
11+
"""
12+
13+
import argparse
14+
import random
15+
import secrets
16+
17+
import multiaddr
18+
import trio
19+
20+
from libp2p import (
21+
new_host,
22+
)
23+
from libp2p.crypto.secp256k1 import (
24+
create_new_key_pair,
25+
)
26+
from libp2p.custom_types import (
27+
TProtocol,
28+
)
29+
from libp2p.network.stream.exceptions import (
30+
StreamClosed,
31+
StreamEOF,
32+
StreamReset,
33+
)
34+
from libp2p.network.stream.net_stream import (
35+
NetStream,
36+
StreamState,
37+
)
38+
from libp2p.peer.peerinfo import (
39+
info_from_p2p_addr,
40+
)
41+
42+
PROTOCOL_ID = TProtocol("/echo/1.0.0")
43+
44+
45+
async def enhanced_echo_handler(stream: NetStream) -> None:
46+
"""
47+
Enhanced echo handler that demonstrates NetStream state management.
48+
"""
49+
print(f"New connection established: {stream}")
50+
print(f"Initial stream state: {await stream.state}")
51+
52+
try:
53+
# Verify stream is in expected initial state
54+
assert await stream.state == StreamState.OPEN
55+
assert await stream.is_readable()
56+
assert await stream.is_writable()
57+
print("✓ Stream initialized in OPEN state")
58+
59+
# Read incoming data with proper state checking
60+
print("Waiting for client data...")
61+
62+
while await stream.is_readable():
63+
try:
64+
# Read data from client
65+
data = await stream.read(1024)
66+
if not data:
67+
print("Received empty data, client may have closed")
68+
break
69+
70+
print(f"Received: {data.decode('utf-8').strip()}")
71+
72+
# Check if we can still write before echoing
73+
if await stream.is_writable():
74+
await stream.write(data)
75+
print(f"Echoed: {data.decode('utf-8').strip()}")
76+
else:
77+
print("Cannot echo - stream not writable")
78+
break
79+
80+
except StreamEOF:
81+
print("Client closed their write side (EOF)")
82+
break
83+
except StreamReset:
84+
print("Stream was reset by client")
85+
return
86+
except StreamClosed as e:
87+
print(f"Stream operation failed: {e}")
88+
break
89+
90+
# Demonstrate graceful closure
91+
current_state = await stream.state
92+
print(f"Current state before close: {current_state}")
93+
94+
if current_state not in [StreamState.CLOSE_BOTH, StreamState.RESET]:
95+
await stream.close()
96+
print("Server closed write side")
97+
98+
final_state = await stream.state
99+
print(f"Final stream state: {final_state}")
100+
101+
except Exception as e:
102+
print(f"Handler error: {e}")
103+
# Reset stream on unexpected errors
104+
if await stream.state not in [StreamState.RESET, StreamState.CLOSE_BOTH]:
105+
await stream.reset()
106+
print("Stream reset due to error")
107+
108+
109+
async def enhanced_client_demo(stream: NetStream) -> None:
110+
"""
111+
Enhanced client that demonstrates various NetStream state scenarios.
112+
"""
113+
print(f"Client stream established: {stream}")
114+
print(f"Initial state: {await stream.state}")
115+
116+
try:
117+
# Verify initial state
118+
assert await stream.state == StreamState.OPEN
119+
print("✓ Client stream in OPEN state")
120+
121+
# Scenario 1: Normal communication
122+
message = b"Hello from enhanced NetStream client!\n"
123+
124+
if await stream.is_writable():
125+
await stream.write(message)
126+
print(f"Sent: {message.decode('utf-8').strip()}")
127+
else:
128+
print("Cannot write - stream not writable")
129+
return
130+
131+
# Close write side to signal EOF to server
132+
await stream.close()
133+
print("Client closed write side")
134+
135+
# Verify state transition
136+
state_after_close = await stream.state
137+
print(f"State after close: {state_after_close}")
138+
assert state_after_close == StreamState.CLOSE_WRITE
139+
assert await stream.is_readable() # Should still be readable
140+
assert not await stream.is_writable() # Should not be writable
141+
142+
# Try to write (should fail)
143+
try:
144+
await stream.write(b"This should fail")
145+
print("ERROR: Write succeeded when it should have failed!")
146+
except StreamClosed as e:
147+
print(f"✓ Expected error when writing to closed stream: {e}")
148+
149+
# Read the echo response
150+
if await stream.is_readable():
151+
try:
152+
response = await stream.read()
153+
print(f"Received echo: {response.decode('utf-8').strip()}")
154+
except StreamEOF:
155+
print("Server closed their write side")
156+
except StreamReset:
157+
print("Stream was reset")
158+
159+
# Check final state
160+
final_state = await stream.state
161+
print(f"Final client state: {final_state}")
162+
163+
except Exception as e:
164+
print(f"Client error: {e}")
165+
# Reset on error
166+
await stream.reset()
167+
print("Client reset stream due to error")
168+
169+
170+
async def run_enhanced_demo(
171+
port: int, destination: str, seed: int | None = None
172+
) -> None:
173+
"""
174+
Run enhanced echo demo with NetStream state management.
175+
"""
176+
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
177+
178+
# Generate or use provided key
179+
if seed:
180+
random.seed(seed)
181+
secret_number = random.getrandbits(32 * 8)
182+
secret = secret_number.to_bytes(length=32, byteorder="big")
183+
else:
184+
secret = secrets.token_bytes(32)
185+
186+
host = new_host(key_pair=create_new_key_pair(secret))
187+
188+
async with host.run(listen_addrs=[listen_addr]):
189+
print(f"Host ID: {host.get_id().to_string()}")
190+
print("=" * 60)
191+
192+
if not destination: # Server mode
193+
print("🖥️ ENHANCED ECHO SERVER MODE")
194+
print("=" * 60)
195+
196+
# type: ignore: Stream is type of NetStream
197+
host.set_stream_handler(PROTOCOL_ID, enhanced_echo_handler)
198+
199+
print(
200+
"Run client from another console:\n"
201+
f"python3 example_net_stream.py "
202+
f"-d {host.get_addrs()[0]}\n"
203+
)
204+
print("Waiting for connections...")
205+
print("Press Ctrl+C to stop server")
206+
await trio.sleep_forever()
207+
208+
else: # Client mode
209+
print("📱 ENHANCED ECHO CLIENT MODE")
210+
print("=" * 60)
211+
212+
# Connect to server
213+
maddr = multiaddr.Multiaddr(destination)
214+
info = info_from_p2p_addr(maddr)
215+
await host.connect(info)
216+
print(f"Connected to server: {info.peer_id.pretty()}")
217+
218+
# Create stream and run enhanced demo
219+
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
220+
if isinstance(stream, NetStream):
221+
await enhanced_client_demo(stream)
222+
223+
print("\n" + "=" * 60)
224+
print("CLIENT DEMO COMPLETE")
225+
226+
227+
def main() -> None:
228+
example_maddr = (
229+
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
230+
)
231+
232+
parser = argparse.ArgumentParser(
233+
formatter_class=argparse.RawDescriptionHelpFormatter
234+
)
235+
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
236+
parser.add_argument(
237+
"-d",
238+
"--destination",
239+
type=str,
240+
help=f"destination multiaddr string, e.g. {example_maddr}",
241+
)
242+
parser.add_argument(
243+
"-s",
244+
"--seed",
245+
type=int,
246+
help="seed for deterministic peer ID generation",
247+
)
248+
parser.add_argument(
249+
"--demo-states", action="store_true", help="run state transition demo only"
250+
)
251+
252+
args = parser.parse_args()
253+
254+
try:
255+
trio.run(run_enhanced_demo, args.port, args.destination, args.seed)
256+
except KeyboardInterrupt:
257+
print("\n👋 Demo interrupted by user")
258+
except Exception as e:
259+
print(f"❌ Demo failed: {e}")
260+
261+
262+
if __name__ == "__main__":
263+
main()

0 commit comments

Comments
 (0)