-
Notifications
You must be signed in to change notification settings - Fork 175
Add WebSocket transport support #781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
GautamBytes
wants to merge
22
commits into
libp2p:main
Choose a base branch
from
GautamBytes:add-ws-transport
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
1874183
added WebSocket transport support
GautamBytes 227a5c6
small tweak
GautamBytes fa0b64d
Merge branch 'main' into add-ws-transport
GautamBytes 4fb7132
Prevent crash in JS interop test
GautamBytes b6c3637
Merge branch 'main' into add-ws-transport
seetadev 7469238
Merge branch 'main' into add-ws-transport
seetadev 8a21435
Merge branch 'main' into add-ws-transport
GautamBytes 1997777
Fix IPv6 host bracketing in WebSocket transport
GautamBytes 53a16d0
Merge branch 'add-ws-transport' of https://github.com/GautamBytes/py-…
GautamBytes 65faa21
Merge branch 'main' into add-ws-transport
seetadev 7a1aa54
Merge branch 'main' into add-ws-transport
seetadev 651bf0f
Merge branch 'main' into add-ws-transport
seetadev a6f8569
Merge upstream/main into add-ws-transport
acul71 64107b4
feat: implement WebSocket transport with transport registry system - …
acul71 19c1f5e
Merge branch 'main' into add-ws-transport
seetadev 167dfdc
Merge branch 'main' into add-ws-transport
seetadev fe4c17e
Fix typecheck errors and improve WebSocket transport implementation
acul71 a5b0db1
Merge remote-tracking branch 'origin/add-ws-transport' into add-ws-tr…
acul71 9573ab5
Merge branch 'main' into add-ws-transport
seetadev f1872bb
Merge branch 'main' into add-ws-transport
seetadev f3cf06c
Merge branch 'main' into add-ws-transport
seetadev 3baf886
Merge branch 'main' into add-ws-transport
seetadev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from .tcp.tcp import TCP | ||
from .websocket.transport import WebsocketTransport | ||
|
||
__all__ = [ | ||
"TCP", | ||
"WebsocketTransport", | ||
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from trio.abc import Stream | ||
|
||
from libp2p.io.abc import ReadWriteCloser | ||
from libp2p.io.exceptions import IOException | ||
|
||
|
||
class P2PWebSocketConnection(ReadWriteCloser): | ||
""" | ||
Wraps a raw trio.abc.Stream from an established websocket connection. | ||
This bypasses message-framing issues and provides the raw stream | ||
that libp2p protocols expect. | ||
""" | ||
|
||
_stream: Stream | ||
|
||
def __init__(self, stream: Stream): | ||
self._stream = stream | ||
|
||
async def write(self, data: bytes) -> None: | ||
try: | ||
await self._stream.send_all(data) | ||
except Exception as e: | ||
raise IOException from e | ||
|
||
async def read(self, n: int | None = None) -> bytes: | ||
""" | ||
Read up to n bytes (if n is given), else read up to 64KiB. | ||
""" | ||
try: | ||
if n is None: | ||
# read a reasonable chunk | ||
return await self._stream.receive_some(2**16) | ||
return await self._stream.receive_some(n) | ||
except Exception as e: | ||
raise IOException from e | ||
|
||
async def close(self) -> None: | ||
await self._stream.aclose() | ||
|
||
def get_remote_address(self) -> tuple[str, int] | None: | ||
sock = getattr(self._stream, "socket", None) | ||
if sock: | ||
try: | ||
addr = sock.getpeername() | ||
if isinstance(addr, tuple) and len(addr) >= 2: | ||
return str(addr[0]), int(addr[1]) | ||
except OSError: | ||
return None | ||
return None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import logging | ||
import socket | ||
from typing import Any | ||
|
||
from multiaddr import Multiaddr | ||
import trio | ||
from trio_typing import TaskStatus | ||
from trio_websocket import serve_websocket | ||
|
||
from libp2p.abc import IListener | ||
from libp2p.custom_types import THandler | ||
from libp2p.network.connection.raw_connection import RawConnection | ||
|
||
from .connection import P2PWebSocketConnection | ||
|
||
logger = logging.getLogger("libp2p.transport.websocket.listener") | ||
|
||
|
||
class WebsocketListener(IListener): | ||
""" | ||
Listen on /ip4/.../tcp/.../ws addresses, handshake WS, wrap into RawConnection. | ||
""" | ||
|
||
def __init__(self, handler: THandler) -> None: | ||
self._handler = handler | ||
self._server = None | ||
|
||
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool: | ||
addr_str = str(maddr) | ||
if addr_str.endswith("/wss"): | ||
raise NotImplementedError("/wss (TLS) not yet supported") | ||
|
||
host = ( | ||
maddr.value_for_protocol("ip4") | ||
or maddr.value_for_protocol("ip6") | ||
or maddr.value_for_protocol("dns") | ||
or maddr.value_for_protocol("dns4") | ||
or maddr.value_for_protocol("dns6") | ||
or "0.0.0.0" | ||
) | ||
port = int(maddr.value_for_protocol("tcp")) | ||
|
||
async def serve( | ||
task_status: TaskStatus[Any] = trio.TASK_STATUS_IGNORED, | ||
) -> None: | ||
# positional ssl_context=None | ||
self._server = await serve_websocket( | ||
self._handle_connection, host, port, None | ||
) | ||
task_status.started() | ||
await self._server.wait_closed() | ||
|
||
await nursery.start(serve) | ||
return True | ||
|
||
async def _handle_connection(self, websocket: Any) -> None: | ||
try: | ||
# use raw transport_stream | ||
conn = P2PWebSocketConnection(websocket.stream) | ||
raw = RawConnection(conn, initiator=False) | ||
await self._handler(raw) | ||
except Exception as e: | ||
logger.debug("WebSocket connection error: %s", e) | ||
|
||
def get_addrs(self) -> tuple[Multiaddr, ...]: | ||
if not self._server or not self._server.sockets: | ||
return () | ||
addrs = [] | ||
for sock in self._server.sockets: | ||
host, port = sock.getsockname()[:2] | ||
if sock.family == socket.AF_INET6: | ||
addr = Multiaddr(f"/ip6/{host}/tcp/{port}/ws") | ||
else: | ||
addr = Multiaddr(f"/ip4/{host}/tcp/{port}/ws") | ||
addrs.append(addr) | ||
return tuple(addrs) | ||
|
||
async def close(self) -> None: | ||
if self._server: | ||
self._server.close() | ||
await self._server.wait_closed() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from multiaddr import Multiaddr | ||
from trio_websocket import open_websocket_url | ||
|
||
from libp2p.abc import IListener, ITransport | ||
from libp2p.custom_types import THandler | ||
from libp2p.network.connection.raw_connection import RawConnection | ||
from libp2p.transport.exceptions import OpenConnectionError | ||
|
||
from .connection import P2PWebSocketConnection | ||
from .listener import WebsocketListener | ||
|
||
|
||
class WebsocketTransport(ITransport): | ||
""" | ||
Libp2p WebSocket transport: dial and listen on /ip4/.../tcp/.../ws | ||
""" | ||
|
||
async def dial(self, maddr: Multiaddr) -> RawConnection: | ||
text = str(maddr) | ||
if text.endswith("/wss"): | ||
raise NotImplementedError("/wss (TLS) not yet supported") | ||
if not text.endswith("/ws"): | ||
raise ValueError(f"WebsocketTransport only supports /ws, got {maddr}") | ||
|
||
host = ( | ||
maddr.value_for_protocol("ip4") | ||
or maddr.value_for_protocol("ip6") | ||
or maddr.value_for_protocol("dns") | ||
or maddr.value_for_protocol("dns4") | ||
or maddr.value_for_protocol("dns6") | ||
) | ||
if host is None: | ||
raise ValueError(f"No host protocol found in {maddr}") | ||
|
||
port = int(maddr.value_for_protocol("tcp")) | ||
uri = f"ws://{host}:{port}" | ||
|
||
try: | ||
async with open_websocket_url(uri, ssl_context=None) as ws: | ||
conn = P2PWebSocketConnection(ws.stream) # type: ignore[attr-defined] | ||
return RawConnection(conn, initiator=True) | ||
except Exception as e: | ||
raise OpenConnectionError(f"Failed to dial WebSocket {maddr}: {e}") from e | ||
|
||
def create_listener(self, handler: THandler) -> IListener: # type: ignore[override] | ||
""" | ||
The type checker is incorrectly reporting this as an inconsistent override. | ||
""" | ||
return WebsocketListener(handler) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"name": "src", | ||
"version": "1.0.0", | ||
"main": "ping.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
}, | ||
"keywords": [], | ||
"author": "", | ||
"license": "ISC", | ||
"description": "", | ||
"dependencies": { | ||
"@libp2p/ping": "^2.0.36", | ||
"@libp2p/websockets": "^9.2.18", | ||
"libp2p": "^2.9.0", | ||
"multiaddr": "^10.0.1" | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import { createLibp2p } from 'libp2p' | ||
import { webSockets } from '@libp2p/websockets' | ||
import { ping } from '@libp2p/ping' | ||
import { plaintext } from '@libp2p/insecure' | ||
import { mplex } from '@libp2p/mplex' | ||
|
||
async function main() { | ||
const node = await createLibp2p({ | ||
transports: [ webSockets() ], | ||
connectionEncryption: [ plaintext() ], | ||
streamMuxers: [ mplex() ], | ||
services: { | ||
// installs /ipfs/ping/1.0.0 handler | ||
ping: ping() | ||
}, | ||
addresses: { | ||
listen: ['/ip4/127.0.0.1/tcp/0/ws'] | ||
} | ||
}) | ||
|
||
await node.start() | ||
|
||
console.log(node.peerId.toString()) | ||
for (const addr of node.getMultiaddrs()) { | ||
console.log(addr.toString()) | ||
} | ||
|
||
// Keep the process alive | ||
await new Promise(() => {}) | ||
} | ||
|
||
main().catch(err => { | ||
console.error(err) | ||
process.exit(1) | ||
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import os | ||
import signal | ||
import subprocess | ||
|
||
import pytest | ||
from multiaddr import Multiaddr | ||
import trio | ||
from trio.lowlevel import open_process | ||
|
||
from libp2p.crypto.secp256k1 import create_new_key_pair | ||
from libp2p.custom_types import TProtocol | ||
from libp2p.host.basic_host import BasicHost | ||
from libp2p.network.swarm import Swarm | ||
from libp2p.peer.id import ID | ||
from libp2p.peer.peerinfo import PeerInfo | ||
from libp2p.peer.peerstore import PeerStore | ||
from libp2p.security.insecure.transport import InsecureTransport | ||
from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex | ||
from libp2p.transport.upgrader import TransportUpgrader | ||
from libp2p.transport.websocket.transport import WebsocketTransport | ||
|
||
PLAINTEXT_PROTOCOL_ID = "/plaintext/1.0.0" | ||
|
||
|
||
@pytest.mark.trio | ||
async def test_ping_with_js_node(): | ||
# Path to the JS node script | ||
js_node_dir = os.path.join(os.path.dirname(__file__), "js_libp2p", "js_node", "src") | ||
script_name = "./ws_ping_node.mjs" | ||
|
||
# Launch the JS libp2p node (long-running) | ||
proc = await open_process( | ||
["node", script_name], | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
cwd=js_node_dir, | ||
) | ||
try: | ||
# Read first two lines (PeerID and multiaddr) | ||
buffer = b"" | ||
with trio.fail_after(10): | ||
while buffer.count(b"\n") < 2: | ||
chunk = await proc.stdout.receive_some(1024) # type: ignore | ||
if not chunk: | ||
break | ||
buffer += chunk | ||
|
||
# Split and filter out any empty lines | ||
lines = [line for line in buffer.decode().splitlines() if line.strip()] | ||
if len(lines) < 2: | ||
stderr_output = "" | ||
if proc.stderr is not None: | ||
stderr_output = (await proc.stderr.receive_some(2048)).decode() | ||
pytest.fail( | ||
"JS node did not produce expected PeerID and multiaddr.\n" | ||
f"Stdout: {buffer.decode()!r}\n" | ||
f"Stderr: {stderr_output!r}" | ||
) | ||
peer_id_line, addr_line = lines[0], lines[1] | ||
peer_id = ID.from_base58(peer_id_line) | ||
maddr = Multiaddr(addr_line) | ||
|
||
# Set up Python host | ||
key_pair = create_new_key_pair() | ||
py_peer_id = ID.from_pubkey(key_pair.public_key) | ||
peer_store = PeerStore() | ||
peer_store.add_key_pair(py_peer_id, key_pair) | ||
|
||
upgrader = TransportUpgrader( | ||
secure_transports_by_protocol={ | ||
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair) | ||
}, | ||
muxer_transports_by_protocol={TProtocol(MPLEX_PROTOCOL_ID): Mplex}, | ||
) | ||
transport = WebsocketTransport() | ||
swarm = Swarm(py_peer_id, peer_store, upgrader, transport) | ||
host = BasicHost(swarm) | ||
|
||
# Connect to JS node | ||
peer_info = PeerInfo(peer_id, [maddr]) | ||
await host.connect(peer_info) | ||
assert host.get_network().connections.get(peer_id) is not None | ||
await trio.sleep(0.1) | ||
|
||
# Ping protocol | ||
stream = await host.new_stream(peer_id, [TProtocol("/ipfs/ping/1.0.0")]) | ||
await stream.write(b"ping") | ||
data = await stream.read(4) | ||
assert data == b"pong" | ||
|
||
# Cleanup | ||
await host.close() | ||
finally: | ||
proc.send_signal(signal.SIGTERM) | ||
await trio.sleep(0) |
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we leaving support for ipv6 for later on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the catch!
I'll update the dial method to properly enclose IPv6 hosts in brackets.