Skip to content

Commit 3455690

Browse files
guha-rahullla-daneseetadev
committed
feat: add interop btw rust and pylibp2p (#2)
Co-authored-by: lla-dane <[email protected]> Co-authored-by: Manu Sheel Gupta <[email protected]>
1 parent b5b1375 commit 3455690

File tree

8 files changed

+160
-22
lines changed

8 files changed

+160
-22
lines changed

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@
290290
]
291291

292292
# Prevent autodoc from trying to import module from tests.factories
293-
autodoc_mock_imports = ["tests.factories"]
293+
autodoc_mock_imports = ["tests.factories", "redis"]
294294

295295
# Documents to append as an appendix to all manuals.
296296
# texinfo_appendices = []

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The Python implementation of the libp2p networking stack
1818

1919
Examples <examples>
2020
API <libp2p>
21+
Interop <interop>
2122

2223
.. toctree::
2324
:maxdepth: 1

docs/interop.rst

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
interop package
2+
===============
3+
4+
Submodules
5+
----------
6+
7+
interop.arch module
8+
-------------------
9+
10+
.. automodule:: interop.arch
11+
:members:
12+
:show-inheritance:
13+
:undoc-members:
14+
15+
interop.lib module
16+
------------------
17+
18+
.. automodule:: interop.lib
19+
:members:
20+
:show-inheritance:
21+
:undoc-members:
22+
23+
Module contents
24+
---------------
25+
26+
.. automodule:: interop
27+
:members:
28+
:show-inheritance:
29+
:undoc-members:

examples/ping/ping.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import argparse
2+
import logging
23

34
import multiaddr
45
import trio
@@ -55,8 +56,8 @@ async def send_ping(stream: INetStream) -> None:
5556

5657

5758
async def run(port: int, destination: str) -> None:
58-
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
59-
host = new_host(listen_addrs=[listen_addr])
59+
listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")
60+
host = new_host()
6061

6162
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
6263
if not destination:
@@ -103,8 +104,20 @@ def main() -> None:
103104
type=str,
104105
help=f"destination multiaddr string, e.g. {example_maddr}",
105106
)
107+
parser.add_argument(
108+
"-v", "--verbose", action="store_true", help="enable verbose logging"
109+
)
110+
106111
args = parser.parse_args()
107112

113+
if args.verbose:
114+
# Enable even more detailed logging
115+
logging.getLogger("libp2p").setLevel(logging.DEBUG)
116+
logging.getLogger("libp2p.network").setLevel(logging.DEBUG)
117+
logging.getLogger("libp2p.transport").setLevel(logging.DEBUG)
118+
logging.getLogger("libp2p.security").setLevel(logging.DEBUG)
119+
logging.getLogger("libp2p.stream_muxer").setLevel(logging.DEBUG)
120+
108121
try:
109122
trio.run(run, *(args.port, args.destination))
110123
except KeyboardInterrupt:

libp2p/stream_muxer/yamux/yamux.py

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
from libp2p.io.exceptions import (
3434
IncompleteReadError,
3535
)
36+
from libp2p.io.utils import (
37+
read_exactly,
38+
)
3639
from libp2p.network.connection.exceptions import (
3740
RawConnError,
3841
)
@@ -127,20 +130,22 @@ async def write(self, data: bytes) -> None:
127130
"Timed out waiting for window update after 5 seconds."
128131
)
129132

130-
if self.closed:
131-
raise MuxedStreamError("Stream is closed")
133+
if self.closed:
134+
raise MuxedStreamError("Stream is closed")
135+
132136

133-
# Calculate how much we can send now
137+
# Calculate how much we can send now
138+
async with self.window_lock:
134139
to_send = min(self.send_window, total_len - sent)
135140
chunk = data[sent : sent + to_send]
136141
self.send_window -= to_send
137142

138-
# Send the data
139-
header = struct.pack(
140-
YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk)
141-
)
142-
await self.conn.secured_conn.write(header + chunk)
143-
sent += to_send
143+
# Send the data
144+
header = struct.pack(
145+
YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk)
146+
)
147+
await self.conn.secured_conn.write(header + chunk)
148+
sent += to_send
144149

145150
async def send_window_update(self, increment: int, skip_lock: bool = False) -> None:
146151
"""
@@ -397,7 +402,6 @@ async def close(self, error_code: int = GO_AWAY_NORMAL) -> None:
397402
else:
398403
if self.on_close is not None:
399404
await self.on_close()
400-
await trio.sleep(0.1)
401405

402406
@property
403407
def is_closed(self) -> bool:
@@ -537,13 +541,17 @@ async def handle_incoming(self) -> None:
537541
self.event_shutting_down.set()
538542
await self._cleanup_on_error()
539543
break
544+
545+
# Debug: log raw header bytes
546+
logging.debug(f"Raw header bytes: {header.hex()}")
547+
540548
version, typ, flags, stream_id, length = struct.unpack(
541549
YAMUX_HEADER_FORMAT, header
542550
)
543551
logger.debug(
544552
f"Received header for peer {self.peer_id}:"
545-
f"type={typ}, flags={flags}, stream_id={stream_id},"
546-
f"length={length}"
553+
f"version={version}, type={typ}, flags={flags}, "
554+
f"stream_id={stream_id}, length={length}"
547555
)
548556
if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN:
549557
async with self.streams_lock:
@@ -668,6 +676,85 @@ async def handle_incoming(self) -> None:
668676
f" increment: {increment}"
669677
)
670678
stream.send_window += increment
679+
elif typ == TYPE_DATA:
680+
async with self.streams_lock:
681+
if stream_id in self.streams:
682+
# Store data - ensure data is not None before extending
683+
if data is not None and len(data) > 0:
684+
self.stream_buffers[stream_id].extend(data)
685+
if stream_id in self.stream_events:
686+
self.stream_events[stream_id].set()
687+
# Handle flags
688+
if flags & FLAG_SYN:
689+
logging.debug(
690+
f"Received late SYN for stream {stream_id} "
691+
f"for peer {self.peer_id}"
692+
)
693+
if flags & FLAG_ACK:
694+
logging.debug(
695+
f"Received ACK for stream {stream_id} "
696+
f"for peer {self.peer_id}"
697+
)
698+
if flags & FLAG_FIN:
699+
logging.debug(
700+
f"Received FIN for stream {self.peer_id}:"
701+
f"{stream_id}, marking recv_closed"
702+
)
703+
self.streams[stream_id].recv_closed = True
704+
if self.streams[stream_id].send_closed:
705+
self.streams[stream_id].closed = True
706+
if stream_id in self.stream_events:
707+
self.stream_events[stream_id].set()
708+
if flags & FLAG_RST:
709+
logging.debug(
710+
f"Resetting stream {stream_id} "
711+
f"for peer {self.peer_id}"
712+
)
713+
self.streams[stream_id].closed = True
714+
self.streams[stream_id].reset_received = True
715+
if stream_id in self.stream_events:
716+
self.stream_events[stream_id].set()
717+
else:
718+
if flags & FLAG_SYN:
719+
if stream_id not in self.streams:
720+
stream = YamuxStream(stream_id, self, False)
721+
self.streams[stream_id] = stream
722+
# Initialize stream buffer
723+
buffer = bytearray()
724+
if data is not None and len(data) > 0:
725+
buffer.extend(data)
726+
self.stream_buffers[stream_id] = buffer
727+
self.stream_events[stream_id] = trio.Event()
728+
self.stream_events[stream_id].set()
729+
ack_header = struct.pack(
730+
YAMUX_HEADER_FORMAT,
731+
0,
732+
TYPE_DATA,
733+
FLAG_ACK,
734+
stream_id,
735+
0,
736+
)
737+
await self.secured_conn.write(ack_header)
738+
logging.debug(
739+
f"Sending stream {stream_id}"
740+
f"to channel for peer {self.peer_id}"
741+
)
742+
await self.new_stream_send_channel.send(stream)
743+
else:
744+
rst_header = struct.pack(
745+
YAMUX_HEADER_FORMAT,
746+
0,
747+
TYPE_DATA,
748+
FLAG_RST,
749+
stream_id,
750+
0,
751+
)
752+
await self.secured_conn.write(rst_header)
753+
else:
754+
logging.warning(
755+
f"Received data for unknown stream {stream_id} "
756+
f"from peer {self.peer_id} (length={length})"
757+
)
671758
except Exception as e:
672759
# Special handling for expected IncompleteReadError on stream close
673760
if isinstance(e, IncompleteReadError):
@@ -687,7 +774,7 @@ async def handle_incoming(self) -> None:
687774
else:
688775
logger.error(
689776
f"Error in handle_incoming for peer {self.peer_id}: "
690-
+ f"{type(e).__name__}: {str(e)}"
777+
f"{type(e).__name__}: {str(e)}"
691778
)
692779
else:
693780
# Handle RawConnError with more nuance
@@ -717,8 +804,6 @@ async def handle_incoming(self) -> None:
717804
):
718805
await self._cleanup_on_error()
719806
break
720-
# For other errors, log and continue
721-
await trio.sleep(0.01)
722807

723808
async def _cleanup_on_error(self) -> None:
724809
# Set shutdown flag first to prevent other operations
@@ -760,6 +845,6 @@ async def _cleanup_on_error(self) -> None:
760845
except Exception as callback_error:
761846
logger.error(f"Error in on_close callback: {callback_error}")
762847

763-
# Cancel nursery tasks
848+
# Cancel nursery tasks if available
764849
if self._nursery:
765850
self._nursery.cancel_scope.cancel()

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dependencies = [
3131
"trio-typing>=0.0.4",
3232
"trio>=0.26.0",
3333
"fastecdsa==2.3.2; sys_platform != 'win32'",
34+
"cryptography>=42.0.0; sys_platform == 'win32'", # Alternative for Windows
3435
"zeroconf (>=0.147.0,<0.148.0)",
3536
]
3637
classifiers = [
@@ -79,6 +80,7 @@ dev = [
7980
"factory-boy>=2.12.0,<3.0.0",
8081
"ruff>=0.11.10",
8182
"pyrefly (>=0.17.1,<0.18.0)",
83+
"pytest-timeout",
8284
]
8385
docs = [
8486
"sphinx>=6.0.0",
@@ -93,6 +95,7 @@ test = [
9395
"pytest-xdist>=2.4.0",
9496
"pytest-trio>=0.5.2",
9597
"factory-boy>=2.12.0,<3.0.0",
98+
"pytest-timeout",
9699
]
97100

98101
[tool.setuptools]

tests/core/stream_muxer/test_yamux.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ async def read(self, n: int | None = None) -> bytes:
5151
data = await self.receive_stream.receive_some(n)
5252
logging.debug(f"Read {len(data)} bytes")
5353
return data
54+
# Raise IncompleteReadError on timeout to simulate connection closed
55+
logging.debug("Read timed out after 2 seconds, raising IncompleteReadError")
56+
from libp2p.io.exceptions import IncompleteReadError
57+
58+
raise IncompleteReadError({"requested_count": n, "received_count": 0})
5459

5560
async def close(self) -> None:
5661
logging.debug("Closing stream")

tox.ini

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,16 @@ skip_install=true
6262
deps=
6363
wheel
6464
build[virtualenv]
65+
platform = win32
6566
allowlist_externals=
66-
bash.exe
67+
cmd.exe
6768
commands=
6869
python --version
6970
python -m pip install --upgrade pip
70-
bash.exe -c "rm -rf build dist"
71+
cmd.exe /c "if exist build rd /s /q build"
72+
cmd.exe /c "if exist dist rd /s /q dist"
7173
python -m build
72-
bash.exe -c 'python -m pip install --upgrade "$(ls dist/libp2p-*-py3-none-any.whl)" --progress-bar off'
74+
cmd.exe /c "for %i in (dist\libp2p-*-py3-none-any.whl) do python -m pip install --upgrade "%i" --progress-bar off"
7375
python -c "import libp2p"
7476
skip_install=true
7577

0 commit comments

Comments
 (0)