diff --git a/Cargo.lock b/Cargo.lock index 421ae65f53f..7177f505d1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,9 +280,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.88" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", @@ -500,6 +500,29 @@ dependencies = [ "generic-array", ] +[[package]] +name = "browser-to-browser-webrtc-example" +version = "0.1.0" +dependencies = [ + "console_error_panic_hook", + "futures", + "getrandom 0.2.15", + "js-sys", + "libp2p", + "libp2p-core 0.43.1", + "libp2p-identity", + "libp2p-relay", + "libp2p-swarm", + "libp2p-webrtc-utils 0.3.0", + "libp2p-webrtc-websys", + "libp2p-websocket-websys", + "tracing", + "tracing-wasm", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "browser-webrtc-example" version = "0.1.0" @@ -1465,7 +1488,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ - "gloo-timers", + "gloo-timers 0.2.6", "send_wrapper 0.4.0", ] @@ -1608,6 +1631,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" @@ -2227,7 +2262,7 @@ dependencies = [ "futures-timer", "libp2p", "libp2p-mplex", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-tls", "libp2p-webrtc", "libp2p-webrtc-websys", @@ -2424,7 +2459,7 @@ dependencies = [ "libp2p-allow-block-list", "libp2p-autonat", "libp2p-connection-limits", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-dcutr", "libp2p-dns", "libp2p-floodsub", @@ -2436,7 +2471,7 @@ dependencies = [ "libp2p-memory-connection-limits", "libp2p-metrics", "libp2p-mplex", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-ping", "libp2p-plaintext", "libp2p-pnet", @@ -2456,7 +2491,7 @@ dependencies = [ "libp2p-yamux", "multiaddr", "pin-project", - "rw-stream-sink", + "rw-stream-sink 0.4.0", "thiserror 2.0.12", "tokio", "tracing-subscriber", @@ -2466,7 +2501,7 @@ dependencies = [ name = "libp2p-allow-block-list" version = "0.6.0" dependencies = [ - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-derive", @@ -2484,14 +2519,14 @@ dependencies = [ "futures", "futures-bounded", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identify", "libp2p-identity", "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-test", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "rand 0.8.5", "rand_core 0.6.4", "thiserror 2.0.12", @@ -2505,7 +2540,7 @@ dependencies = [ name = "libp2p-connection-limits" version = "0.6.0" dependencies = [ - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identify", "libp2p-identity", "libp2p-ping", @@ -2517,6 +2552,34 @@ dependencies = [ "tokio", ] +[[package]] +name = "libp2p-core" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a61f26c83ed111104cd820fe9bc3aaabbac5f1652a1d213ed6e900b7918a1298" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-identity", + "multiaddr", + "multihash", + "multistream-select 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", + "once_cell", + "parking_lot", + "pin-project", + "quick-protobuf", + "rand 0.8.5", + "rw-stream-sink 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec", + "thiserror 1.0.69", + "tracing", + "unsigned-varint 0.8.0", + "void", + "web-time 1.1.0", +] + [[package]] name = "libp2p-core" version = "0.43.1" @@ -2527,19 +2590,19 @@ dependencies = [ "futures-timer", "libp2p-identity", "libp2p-mplex", - "libp2p-noise", + "libp2p-noise 0.46.1", "multiaddr", "multihash", - "multistream-select", + "multistream-select 0.13.0", "parking_lot", "pin-project", "quick-protobuf", "rand 0.8.5", - "rw-stream-sink", + "rw-stream-sink 0.4.0", "thiserror 2.0.12", "tokio", "tracing", - "unsigned-varint", + "unsigned-varint 0.8.0", "web-time 1.1.0", ] @@ -2553,7 +2616,7 @@ dependencies = [ "futures-bounded", "futures-timer", "hashlink", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identify", "libp2p-identity", "libp2p-plaintext", @@ -2563,7 +2626,7 @@ dependencies = [ "libp2p-tcp", "libp2p-yamux", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "thiserror 2.0.12", "tokio", "tracing", @@ -2578,7 +2641,7 @@ dependencies = [ "async-trait", "futures", "hickory-resolver", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "parking_lot", "smallvec", @@ -2596,11 +2659,11 @@ dependencies = [ "cuckoofilter", "fnv", "futures", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "rand 0.8.5", "smallvec", "thiserror 2.0.12", @@ -2623,13 +2686,13 @@ dependencies = [ "getrandom 0.2.15", "hashlink", "hex_fmt", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", "prometheus-client", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "quickcheck-ext", "rand 0.8.5", "regex", @@ -2650,12 +2713,12 @@ dependencies = [ "futures", "futures-bounded", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "smallvec", "thiserror 2.0.12", "tokio", @@ -2701,15 +2764,15 @@ dependencies = [ "futures", "futures-bounded", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identify", "libp2p-identity", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-swarm", "libp2p-swarm-test", "libp2p-yamux", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "quickcheck-ext", "rand 0.8.5", "serde", @@ -2730,7 +2793,7 @@ dependencies = [ "futures", "hickory-proto", "if-watch", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", @@ -2746,7 +2809,7 @@ dependencies = [ name = "libp2p-memory-connection-limits" version = "0.5.0" dependencies = [ - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identify", "libp2p-identity", "libp2p-swarm", @@ -2763,7 +2826,7 @@ name = "libp2p-metrics" version = "0.17.1" dependencies = [ "futures", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-dcutr", "libp2p-gossipsub", "libp2p-identify", @@ -2785,7 +2848,7 @@ dependencies = [ "bytes", "criterion", "futures", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-muxer-test-harness", "libp2p-plaintext", @@ -2798,7 +2861,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", - "unsigned-varint", + "unsigned-varint 0.8.0", ] [[package]] @@ -2808,10 +2871,36 @@ dependencies = [ "futures", "futures-timer", "futures_ringbuf", - "libp2p-core", + "libp2p-core 0.43.1", "tracing", ] +[[package]] +name = "libp2p-noise" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36b137cb1ae86ee39f8e5d6245a296518912014eaa87427d24e6ff58cfc1b28c" +dependencies = [ + "asynchronous-codec", + "bytes", + "curve25519-dalek", + "futures", + "libp2p-core 0.42.0", + "libp2p-identity", + "multiaddr", + "multihash", + "once_cell", + "quick-protobuf", + "rand 0.8.5", + "sha2", + "snow", + "static_assertions", + "thiserror 1.0.69", + "tracing", + "x25519-dalek", + "zeroize", +] + [[package]] name = "libp2p-noise" version = "0.46.1" @@ -2820,7 +2909,7 @@ dependencies = [ "bytes", "futures", "futures_ringbuf", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "multiaddr", "multihash", @@ -2842,7 +2931,7 @@ version = "0.1.0" dependencies = [ "hashlink", "libp2p", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", @@ -2860,7 +2949,7 @@ dependencies = [ "futures-bounded", "futures-timer", "libp2p", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", @@ -2882,7 +2971,7 @@ version = "0.47.0" dependencies = [ "futures", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", @@ -2901,10 +2990,10 @@ dependencies = [ "bytes", "futures", "futures_ringbuf", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "quickcheck-ext", "tracing", "tracing-subscriber", @@ -2915,9 +3004,9 @@ name = "libp2p-pnet" version = "0.26.0" dependencies = [ "futures", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-swarm", "libp2p-tcp", "libp2p-websocket", @@ -2938,10 +3027,10 @@ dependencies = [ "futures", "futures-timer", "if-watch", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-muxer-test-harness", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-tcp", "libp2p-tls", "libp2p-yamux", @@ -2967,7 +3056,7 @@ dependencies = [ "futures", "futures-bounded", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-ping", "libp2p-plaintext", @@ -2975,7 +3064,7 @@ dependencies = [ "libp2p-swarm-test", "libp2p-yamux", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "quickcheck-ext", "rand 0.8.5", "static_assertions", @@ -2995,13 +3084,13 @@ dependencies = [ "bimap", "futures", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-test", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "rand 0.8.5", "thiserror 2.0.12", "tokio", @@ -3020,7 +3109,7 @@ dependencies = [ "futures", "futures-bounded", "futures_ringbuf", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", @@ -3056,7 +3145,7 @@ name = "libp2p-stream" version = "0.4.0-alpha" dependencies = [ "futures", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-swarm-test", @@ -3077,7 +3166,7 @@ dependencies = [ "futures-timer", "getrandom 0.2.15", "hashlink", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identify", "libp2p-identity", "libp2p-kad", @@ -3086,7 +3175,7 @@ dependencies = [ "libp2p-swarm-derive", "libp2p-swarm-test", "libp2p-yamux", - "multistream-select", + "multistream-select 0.13.0", "quickcheck-ext", "rand 0.8.5", "smallvec", @@ -3114,7 +3203,7 @@ dependencies = [ "async-trait", "futures", "futures-timer", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-plaintext", "libp2p-swarm", @@ -3131,7 +3220,7 @@ dependencies = [ "futures-timer", "if-watch", "libc", - "libp2p-core", + "libp2p-core 0.43.1", "socket2 0.6.0", "tokio", "tracing", @@ -3145,7 +3234,7 @@ dependencies = [ "futures", "futures-rustls", "hex-literal", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", "libp2p-swarm", "libp2p-yamux", @@ -3164,7 +3253,7 @@ name = "libp2p-uds" version = "0.43.0" dependencies = [ "futures", - "libp2p-core", + "libp2p-core 0.43.1", "tempfile", "tokio", "tracing", @@ -3177,7 +3266,7 @@ dependencies = [ "futures", "futures-timer", "igd-next", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-swarm", "tokio", "tracing", @@ -3192,10 +3281,10 @@ dependencies = [ "futures-timer", "hex", "if-watch", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", - "libp2p-noise", - "libp2p-webrtc-utils", + "libp2p-noise 0.46.1", + "libp2p-webrtc-utils 0.4.1", "multihash", "quickcheck", "rand 0.8.5", @@ -3211,18 +3300,41 @@ dependencies = [ [[package]] name = "libp2p-webrtc-utils" -version = "0.4.0" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4da43128cff2ef91579b9f8c3f30991821d4d6b40db00ac4f17e3c1f75bfa019" +dependencies = [ + "asynchronous-codec", + "bytes", + "futures", + "hex", + "libp2p-core 0.42.0", + "libp2p-identity", + "libp2p-noise 0.45.0", + "quick-protobuf", + "quick-protobuf-codec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.8.5", + "serde", + "sha2", + "thiserror 1.0.69", + "tinytemplate", + "tracing", +] + +[[package]] +name = "libp2p-webrtc-utils" +version = "0.4.1" dependencies = [ "asynchronous-codec", "bytes", "futures", "hex", "hex-literal", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", - "libp2p-noise", + "libp2p-noise 0.46.1", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "rand 0.8.5", "serde", "sha2", @@ -3232,22 +3344,37 @@ dependencies = [ [[package]] name = "libp2p-webrtc-websys" -version = "0.4.0" +version = "0.4.1" dependencies = [ + "async-trait", "bytes", "futures", "getrandom 0.2.15", + "gloo-timers 0.3.0", "hex", "js-sys", - "libp2p-core", + "libp2p-core 0.43.1", + "libp2p-identify", "libp2p-identity", - "libp2p-webrtc-utils", + "libp2p-noise 0.46.1", + "libp2p-ping", + "libp2p-relay", + "libp2p-swarm", + "libp2p-webrtc-utils 0.4.1", + "libp2p-websocket-websys", + "libp2p-yamux", + "multistream-select 0.13.0", + "oneshot", + "pin-project", + "quick-protobuf", "send_wrapper 0.6.0", + "serde_json", "thiserror 2.0.12", "tracing", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "web-time 1.1.0", ] [[package]] @@ -3257,14 +3384,14 @@ dependencies = [ "either", "futures", "futures-rustls", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-dns", "libp2p-identity", "libp2p-tcp", "parking_lot", "pin-project-lite", "rcgen", - "rw-stream-sink", + "rw-stream-sink 0.4.0", "soketto", "thiserror 2.0.12", "tokio", @@ -3280,9 +3407,9 @@ dependencies = [ "bytes", "futures", "js-sys", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-yamux", "send_wrapper 0.6.0", "thiserror 2.0.12", @@ -3297,9 +3424,9 @@ version = "0.5.1" dependencies = [ "futures", "js-sys", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", - "libp2p-noise", + "libp2p-noise 0.46.1", "multiaddr", "multibase", "multihash", @@ -3317,7 +3444,7 @@ version = "0.47.0" dependencies = [ "either", "futures", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-muxer-test-harness", "thiserror 2.0.12", "tokio", @@ -3536,7 +3663,7 @@ dependencies = [ "percent-encoding", "serde", "static_assertions", - "unsigned-varint", + "unsigned-varint 0.8.0", "url", ] @@ -3562,7 +3689,7 @@ dependencies = [ "quickcheck", "rand 0.8.5", "serde", - "unsigned-varint", + "unsigned-varint 0.8.0", ] [[package]] @@ -3574,13 +3701,27 @@ dependencies = [ "futures_ringbuf", "pin-project", "quickcheck-ext", - "rw-stream-sink", + "rw-stream-sink 0.4.0", "smallvec", "tokio", "tokio-util", "tracing", "tracing-subscriber", - "unsigned-varint", + "unsigned-varint 0.8.0", +] + +[[package]] +name = "multistream-select" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0df8e5eec2298a62b326ee4f0d7fe1a6b90a09dfcf9df37b38f947a8c42f19" +dependencies = [ + "bytes", + "futures", + "log", + "pin-project", + "smallvec", + "unsigned-varint 0.7.2", ] [[package]] @@ -3792,6 +3933,12 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "oneshot" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea" + [[package]] name = "oorandom" version = "11.1.5" @@ -4323,7 +4470,20 @@ dependencies = [ "quick-protobuf", "quickcheck-ext", "thiserror 2.0.12", - "unsigned-varint", + "unsigned-varint 0.8.0", +] + +[[package]] +name = "quick-protobuf-codec" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15a0580ab32b169745d7a39db2ba969226ca16738931be152a3209b409de2474" +dependencies = [ + "asynchronous-codec", + "bytes", + "quick-protobuf", + "thiserror 1.0.69", + "unsigned-varint 0.8.0", ] [[package]] @@ -4625,7 +4785,9 @@ dependencies = [ "clap", "futures", "libp2p", + "rand 0.8.5", "tokio", + "tracing", "tracing-subscriber", ] @@ -4948,6 +5110,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "rw-stream-sink" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8c9026ff5d2f23da5e45bbc283f156383001bfb09c4e44256d02c1a685fe9a1" +dependencies = [ + "futures", + "pin-project", + "static_assertions", +] + [[package]] name = "ryu" version = "1.0.20" @@ -6073,6 +6246,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsigned-varint" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" + [[package]] name = "unsigned-varint" version = "0.8.0" @@ -6161,6 +6340,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "waitgroup" version = "0.1.2" @@ -6572,9 +6757,9 @@ version = "0.1.0" dependencies = [ "futures", "getrandom 0.2.15", - "libp2p-core", + "libp2p-core 0.43.1", "libp2p-identity", - "libp2p-noise", + "libp2p-noise 0.46.1", "libp2p-webtransport-websys", "multiaddr", "multihash", diff --git a/Cargo.toml b/Cargo.toml index 36c148cffee..821d6c1e278 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "core", "examples/autonat", "examples/autonatv2", + "examples/browser-to-browser-webrtc", "examples/browser-webrtc", "examples/chat", "examples/dcutr", @@ -110,8 +111,8 @@ libp2p-tls = { version = "0.6.2", path = "transports/tls" } libp2p-uds = { version = "0.43.0", path = "transports/uds" } libp2p-upnp = { version = "0.6.0", path = "protocols/upnp" } libp2p-webrtc = { version = "0.9.0-alpha.2", path = "transports/webrtc" } -libp2p-webrtc-utils = { version = "0.4.0", path = "misc/webrtc-utils" } -libp2p-webrtc-websys = { version = "0.4.0", path = "transports/webrtc-websys" } +libp2p-webrtc-utils = { version = "0.4.1", path = "misc/webrtc-utils" } +libp2p-webrtc-websys = { version = "0.4.1", path = "transports/webrtc-websys" } libp2p-websocket = { version = "0.45.2", path = "transports/websocket" } libp2p-websocket-websys = { version = "0.5.0", path = "transports/websocket-websys" } libp2p-webtransport-websys = { version = "0.5.1", path = "transports/webtransport-websys" } diff --git a/examples/browser-to-browser-webrtc/Cargo.toml b/examples/browser-to-browser-webrtc/Cargo.toml new file mode 100644 index 00000000000..09cfb808431 --- /dev/null +++ b/examples/browser-to-browser-webrtc/Cargo.toml @@ -0,0 +1,38 @@ +[package] +authors = ["Elijah Hampton "] +description = "Example use of WebRTC transport in a browser wasm environment" +edition.workspace = true +license = "MIT" +name = "browser-to-browser-webrtc-example" +publish = false +repository = "https://github.com/libp2p/rust-libp2p" +rust-version = { workspace = true } +version = "0.1.0" + +[package.metadata.release] +release = false + +[lib] +crate-type = ["cdylib"] + +[dependencies] +futures = { workspace = true } +console_error_panic_hook = "0.1" +tracing = { workspace = true } +libp2p-swarm = { workspace = true } +libp2p = { path = "../../libp2p", features = ["relay", "noise", "yamux", "ed25519", "macros", "identify", "ping", "wasm-bindgen"] } +libp2p-identity = { version = "0.2" } +libp2p-webrtc-utils = { version = "0.3"} +libp2p-relay = { workspace = true} +libp2p-core = { workspace = true} +libp2p-webrtc-websys = { workspace = true } +libp2p-websocket-websys = { workspace = true } +tracing-wasm = "0.2" +getrandom = { version = "0.2", features = ["js"] } +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4" +web-sys = { version = "0.3", features = ["RtcPeerConnection", "RtcDataChannel", "RtcConfiguration", "Window", "Navigator", "MediaDevices"] } +js-sys = "0.3" + +[package.metadata.wasm-pack] +"wasm-pack" = ["cdylib"] diff --git a/examples/browser-to-browser-webrtc/src/lib.rs b/examples/browser-to-browser-webrtc/src/lib.rs new file mode 100644 index 00000000000..2bc23fb23c6 --- /dev/null +++ b/examples/browser-to-browser-webrtc/src/lib.rs @@ -0,0 +1,434 @@ +use std::{str::FromStr, sync::Arc}; + +use futures::{channel::mpsc, task::AtomicWaker, StreamExt}; +use js_sys::{Object, Reflect}; +use libp2p::{ + identify, identity::Keypair, multiaddr::Protocol, noise, ping, swarm::SwarmEvent, yamux, + Multiaddr, Swarm, Transport, +}; +use libp2p_core::{muxing::StreamMuxerBox, upgrade::Version}; +use libp2p_swarm::NetworkBehaviour; +use libp2p_webrtc_websys::browser::{ + self, Behaviour, SignalingConfig, Transport as BrowserWebrtcTransport, +}; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::spawn_local; + +#[wasm_bindgen(start)] +pub fn start() { + console_error_panic_hook::set_once(); +} + +#[wasm_bindgen] +pub fn initialize() { + tracing_wasm::set_as_global_default_with_config( + tracing_wasm::WASMLayerConfigBuilder::new() + .set_max_level(tracing::Level::DEBUG) + .set_console_config(tracing_wasm::ConsoleConfig::ReportWithConsoleColor) + .build(), + ); +} + +#[wasm_bindgen] +pub struct BrowserTransport { + cmd_sender: mpsc::UnboundedSender, + event_receiver: std::sync::Arc>>, + peer_id: String, +} + +enum Command { + ListenOnRelay { addr: Multiaddr }, + DialPeer { addr: Multiaddr }, + ListenForWebRTC, +} + +#[derive(Debug, Clone)] +enum Event { + ReservationCreated { address: String }, + RelayConnectionEstablished { peer_id: String }, + WebRTCConnectionEstablished { peer_id: String }, + PingSuccess { peer_id: String, rtt_ms: f64 }, + Error { msg: String }, +} + +#[derive(NetworkBehaviour)] +pub struct WebRTCBehaviour { + relay: libp2p_relay::client::Behaviour, + identify: identify::Behaviour, + webrtc: Behaviour, + ping: ping::Behaviour, +} + +#[wasm_bindgen] +impl BrowserTransport { + #[wasm_bindgen] + pub fn new() -> Result { + let keypair = Keypair::generate_ed25519(); + let local_peer_id = keypair.public().to_peer_id(); + + let transport_waker = Arc::new(AtomicWaker::new()); + + // Create a relay transport and behaviour for the relay connection + let (relay_transport, relay_behaviour) = libp2p_relay::client::new(local_peer_id); + + let relay_transport_upgraded = relay_transport + .upgrade(Version::V1) + .authenticate(noise::Config::new(&keypair)?) + .multiplex(yamux::Config::default()) + .boxed(); + + // Create a websocket transport to facilitate connection to the relay server + let ws_transport = libp2p_websocket_websys::Transport::default() + .upgrade(Version::V1) + .authenticate(noise::Config::new(&keypair)?) + .multiplex(yamux::Config::default()) + .boxed(); + + let webrtc_config = libp2p_webrtc_websys::browser::Config { + keypair: keypair.clone(), + }; + + let stun_servers = ["stun:stun.l.google.com:19302"]; + + let signaling_config = SignalingConfig::new( + 3, // max retries + 100, /* max ice gathering + * attempts */ + std::time::Duration::from_millis(0), // signaling delay + std::time::Duration::from_millis(100), // connection check delay + 300, // max connection checks (30 seconds) + std::time::Duration::from_secs(10), // ICE gathering timeout + keypair.public().to_peer_id(), // The local peer's peer_id + Some(stun_servers.iter().map(ToString::to_string).collect()), // stun servers + ); + + let (webrtc_transport, webrtc_behaviour) = + BrowserWebrtcTransport::new(webrtc_config, signaling_config, transport_waker.clone()); + let webrtc_transport_boxed = webrtc_transport.boxed(); + + // A WebRTC behaviour facilitating coordination between the relay connection and webrtc + // signaling + let behaviour = WebRTCBehaviour { + relay: relay_behaviour, + webrtc: webrtc_behaviour, + identify: identify::Behaviour::new(identify::Config::new( + "browser-to-browser-webrtc/1.0.0".into(), + keypair.public(), + )), + ping: ping::Behaviour::new( + ping::Config::new().with_interval(std::time::Duration::from_secs(15)), + ), + }; + + // The final transport consisting of a webrtc, relay and websocket transport + let final_transport = webrtc_transport_boxed + .or_transport(relay_transport_upgraded) + .or_transport(ws_transport) + .map(|either_output, _| match either_output { + // WebRTC output + futures::future::Either::Left(futures::future::Either::Left(( + peer_id, + connection, + ))) => (peer_id, StreamMuxerBox::new(connection)), + // Relay output + futures::future::Either::Left(futures::future::Either::Right(output)) => output, + // WebSocket output + futures::future::Either::Right(output) => output, + }) + .boxed(); + + let mut swarm = Swarm::new( + final_transport, + behaviour, + local_peer_id, + libp2p::swarm::Config::with_executor(Box::new(|fut| { + wasm_bindgen_futures::spawn_local(fut); + })) + .with_idle_connection_timeout(std::time::Duration::from_secs(300)), + ); + + #[allow(clippy::disallowed_methods)] + let (cmd_sender, mut cmd_receiver) = mpsc::unbounded(); + #[allow(clippy::disallowed_methods)] + let (event_sender, event_receiver) = mpsc::unbounded(); + + spawn_local(async move { + let mut relay_address: Option = None; + let mut webrtc_listening = false; + + loop { + futures::select! { + cmd = cmd_receiver.next() => { + if let Some(cmd) = cmd { + match cmd { + Command::ListenOnRelay { addr } => { + relay_address = Some(addr.clone()); + // Build the circuit address for reservation + let circuit_addr = addr.with(Protocol::P2pCircuit); + + match swarm.listen_on(circuit_addr.clone()) { + Ok(listener_id) => { + tracing::info!("Swarm successfully listening on address {} with listener id {}.", circuit_addr, listener_id); + } + Err(e) => { + tracing::error!("Swarm failed to listen on address {}: {}", circuit_addr, e); + let _ = event_sender.unbounded_send(Event::Error { + msg: format!("{}", e) + }); + } + } + } + Command::DialPeer { addr } => { + let addr_str = addr.to_string(); + + if addr_str.contains("/p2p-circuit") && addr_str.contains("/webrtc") { + tracing::info!("Dialing peer: {}", addr); + + let relay_circuit_addr_str = addr_str.replace("/webrtc", ""); + tracing::info!("Transformed to relay circuit address: {}", relay_circuit_addr_str); + + match relay_circuit_addr_str.parse::() { + Ok(relay_circuit_addr) => { + tracing::info!("Dialing relay circuit: {}", relay_circuit_addr); + + if let Err(e) = swarm.dial(relay_circuit_addr.clone()) { + tracing::error!("Failed to dial relay circuit: {:?}", e); + let _ = event_sender.unbounded_send(Event::Error { msg: format!("Dial failed to {}", relay_circuit_addr) }); + continue; + } + + // Also dial the WebRTC address (creates Future waiting for connection) + let peer_id_str = addr_str.split("/p2p/").last().unwrap_or(""); + let webrtc_addr_str = format!("/webrtc/p2p/{}", peer_id_str); + + tracing::info!("Also dialing WebRTC address: {}", webrtc_addr_str); + + match webrtc_addr_str.parse::() { + Ok(webrtc_addr) => { + // This will create a Future that waits for the connection from behaviour + if let Err(e) = swarm.dial(webrtc_addr) { + tracing::error!("Failed to dial WebRTC: {:?}", e); + } + } + Err(e) => { + tracing::error!("Invalid WebRTC multiaddr: {:?}", e); + } + } + } + Err(e) => { + tracing::error!("Invalid relay circuit multiaddr: {:?}", e); + } + } + } + } + Command::ListenForWebRTC => { + if !webrtc_listening { + let webrtc_listen_addr = "/webrtc".parse::().unwrap(); + + match swarm.listen_on(webrtc_listen_addr.clone()) { + Ok(listener_id) => { + tracing::info!("WebRTC listener created with id: {}", listener_id); + webrtc_listening = true; + + } + Err(e) => { + tracing::error!("Failed to create WebRTC listener: {}", e); + } + } + } + } + } + } + } + + event = swarm.select_next_some() => { + match event { + SwarmEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } => { + let remote_addr = endpoint.get_remote_address().to_string(); + + if remote_addr.contains("/webrtc") && !remote_addr.contains("/p2p-circuit") { + tracing::info!( + "Direct WebRTC connection established with: {} via {} (connection_id: {})", + peer_id, remote_addr, connection_id + ); + + let _ = event_sender.unbounded_send(Event::WebRTCConnectionEstablished { + peer_id: peer_id.to_string() + }); + } else if remote_addr.contains("/p2p-circuit") { + tracing::info!( + "Relay connection established with: {} via {} (connection_id: {})", + peer_id, remote_addr, connection_id + ); + + let _ = event_sender.unbounded_send(Event::RelayConnectionEstablished { + peer_id: peer_id.to_string() + }); + } + } + SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, connection_id, .. } => { + tracing::info!("Connection on endpoint {:?} closed with cause: {:?}", endpoint, cause); + + let remaining = swarm.network_info().num_peers(); + + let still_connected = swarm.is_connected(&peer_id); + + tracing::info!( + "Connection {} closed to {} (still_connected: {}, total_peers: {})", + connection_id, + peer_id, + still_connected, + remaining + ); + + if !still_connected { + tracing::warn!("Peer {} fully disconnected - all connections closed", peer_id); + } + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + tracing::error!("Outgoing connection error to {:?}: {}", peer_id, error); + let _ = event_sender.unbounded_send(Event::Error { + msg: format!("Connection error: {}", error) + }); + } + SwarmEvent::NewListenAddr { address, .. } => { + tracing::info!("Listening on: {}", address); + if address.iter().any(|p| matches!(p, Protocol::P2pCircuit)) { + // Generate the complete WebRTC address + if let Some(relay_addr) = &relay_address { + let webrtc_addr = format!( + "{}/p2p-circuit/webrtc/p2p/{}", + relay_addr, + local_peer_id + ); + tracing::info!("WebRTC address: {}", webrtc_addr); + let _ = event_sender.unbounded_send(Event::ReservationCreated { + address: webrtc_addr, + }); + } + } + } + SwarmEvent::Behaviour(event) => { + match event { + WebRTCBehaviourEvent::Webrtc(webrtc_event) => { + match webrtc_event { + browser::SignalingEvent::NewWebRTCConnection { peer_id } => { + tracing::info!("WebRTC connection established with peer: {}", peer_id); + } + browser::SignalingEvent::WebRTCConnectionError { peer_id, error } => { + tracing::error!("Failed to establish WebRTC connection with {}: {:?}", peer_id, error); + let _ = event_sender.unbounded_send(Event::Error { + msg: format!("WebRTC error with {}: {}", peer_id, error) + }); + } + } + } + WebRTCBehaviourEvent::Ping(ping_event) => { + match ping_event { + ping::Event { peer, result: Ok(rtt), .. } => { + let rtt_ms = rtt.as_millis() as f64; + let _ = event_sender.unbounded_send(Event::PingSuccess { + peer_id: peer.to_string(), + rtt_ms, + }); + } + ping::Event { peer, result: Err(e), .. } => { + let _ = event_sender.unbounded_send(Event::Error { + msg: format!("Ping failed to {}: {}", peer, e) + }); + } + } + } + _ => {} + } + } + _ => {} + } + } + } + } + }); + + Ok(BrowserTransport { + cmd_sender, + event_receiver: std::sync::Arc::new(futures::lock::Mutex::new(event_receiver)), + peer_id: local_peer_id.to_string(), + }) + } + + #[wasm_bindgen] + pub async fn listen_on_relay(&self, relay_addr: &str) -> Result<(), JsValue> { + let addr = Multiaddr::from_str(relay_addr) + .map_err(|e| JsValue::from_str(&format!("Invalid relay addr: {}", e)))?; + + self.cmd_sender + .unbounded_send(Command::ListenOnRelay { addr }) + .map_err(|e| JsValue::from_str(&format!("Failed to send command: {}", e)))?; + + self.cmd_sender + .unbounded_send(Command::ListenForWebRTC) + .map_err(|e| JsValue::from_str(&format!("Failed to setup WebRTC listener: {}", e)))?; + + Ok(()) + } + + #[wasm_bindgen(getter)] + pub fn peer_id(&self) -> String { + self.peer_id.clone() + } + + #[wasm_bindgen] + pub async fn dial(&self, peer_addr: &str) -> Result<(), JsValue> { + let addr = Multiaddr::from_str(peer_addr) + .map_err(|e| JsValue::from_str(&format!("Invalid peer addr: {}", e)))?; + + self.cmd_sender + .unbounded_send(Command::DialPeer { addr }) + .map_err(|e| JsValue::from_str(&format!("Failed to send command: {}", e)))?; + + Ok(()) + } + + #[wasm_bindgen] + pub async fn next_event(&self) -> Result { + let mut receiver = self.event_receiver.lock().await; + + if let Some(event) = receiver.next().await { + let obj = Object::new(); + + match event { + Event::ReservationCreated { address } => { + Reflect::set(&obj, &"type".into(), &"reservationCreated".into())?; + Reflect::set(&obj, &"address".into(), &address.into())?; + } + Event::RelayConnectionEstablished { peer_id } => { + Reflect::set(&obj, &"type".into(), &"relayConnectionEstablished".into())?; + Reflect::set(&obj, &"peer_id".into(), &peer_id.into())?; + } + Event::WebRTCConnectionEstablished { peer_id } => { + Reflect::set(&obj, &"type".into(), &"webrtcConnectionEstablished".into())?; + Reflect::set(&obj, &"peer_id".into(), &peer_id.into())?; + } + Event::PingSuccess { peer_id, rtt_ms } => { + Reflect::set(&obj, &"type".into(), &"pingSuccess".into())?; + Reflect::set(&obj, &"peer_id".into(), &peer_id.into())?; + Reflect::set(&obj, &"rtt_ms".into(), &rtt_ms.into())?; + } + Event::Error { msg } => { + Reflect::set(&obj, &"type".into(), &"error".into())?; + Reflect::set(&obj, &"msg".into(), &msg.into())?; + } + } + + Ok(obj.into()) + } else { + Err(JsValue::from_str("No events available")) + } + } +} + +#[wasm_bindgen] +pub fn generate_peer_id() -> String { + let keypair = Keypair::generate_ed25519(); + keypair.public().to_peer_id().to_string() +} diff --git a/examples/browser-to-browser-webrtc/static/index.html b/examples/browser-to-browser-webrtc/static/index.html new file mode 100644 index 00000000000..702aa4d9e47 --- /dev/null +++ b/examples/browser-to-browser-webrtc/static/index.html @@ -0,0 +1,499 @@ + + + + + + + libp2p WebRTC Browser-to-Browser Demo + + + + +

libp2p WebRTC Browser-to-Browser Demo

+ +
+ + +
+ + +
+
+ Listener Instructions: +
    +
  1. Initialize the listener
  2. +
  3. Enter relay address and click "Listen on Relay"
  4. +
  5. Copy your address and share it with the dialer
  6. +
  7. Wait for connection (relay will establish first, then WebRTC)
  8. +
  9. Test relay independence by shutting down the relay server
  10. +
+
+ +
+ Listener Peer ID: Not initialized + Disconnected +
+ +
+ Connection Status: No connections +
+ +
+

Initialize

+ +
+ +
+

Listen on Relay

+ + +
+ +
+

Your Address

+ + +
+ +

Logs

+
+
+ + +
+
+ Dialer Instructions: +
    +
  1. Initialize the dialer
  2. +
  3. Get the listener's address from the other tab
  4. +
  5. Paste the address and click "Dial Peer"
  6. +
  7. Start chatting once WebRTC connection is established
  8. +
  9. Test relay independence by shutting down the relay server
  10. +
+
+ +
+ Dialer Peer ID: Not initialized + Disconnected +
+ +
+ Connection Status: No connections +
+ +
+

Initialize

+ +
+ +
+

Dial Peer

+ + +
+ +

Logs

+
+
+ + + + + diff --git a/examples/relay-server/Cargo.toml b/examples/relay-server/Cargo.toml index 1e0e2a780ba..e6dc2f6b632 100644 --- a/examples/relay-server/Cargo.toml +++ b/examples/relay-server/Cargo.toml @@ -12,8 +12,10 @@ release = false clap = { version = "4.5.6", features = ["derive"] } tokio = { version = "1.37.0", features = ["full"] } futures = { workspace = true } -libp2p = { path = "../../libp2p", features = ["tokio", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay", "quic"] } +libp2p = { path = "../../libp2p", features = ["tokio", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay", "quic", "websocket", "dns"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing = "0.1" +rand = "0.8" [lints] workspace = true diff --git a/examples/relay-server/src/main.rs b/examples/relay-server/src/main.rs index c5742b8fe7f..a2bcbbb45e0 100644 --- a/examples/relay-server/src/main.rs +++ b/examples/relay-server/src/main.rs @@ -32,7 +32,7 @@ use libp2p::{ core::{multiaddr::Protocol, Multiaddr}, identify, identity, noise, ping, relay, swarm::{NetworkBehaviour, SwarmEvent}, - tcp, yamux, + tcp, yamux, PeerId, Swarm, }; use tracing_subscriber::EnvFilter; @@ -46,6 +46,8 @@ async fn main() -> Result<(), Box> { // Create a static known PeerId based on given secret let local_key: identity::Keypair = generate_ed25519(opt.secret_key_seed); + let local_peer_id = PeerId::from(local_key.public()); + tracing::info!(?local_peer_id, "Local peer id"); let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_key) .with_tokio() @@ -55,6 +57,9 @@ async fn main() -> Result<(), Box> { yamux::Config::default, )? .with_quic() + .with_dns()? + .with_websocket(noise::Config::new, yamux::Config::default) + .await? .with_behaviour(|key| Behaviour { relay: relay::Behaviour::new(key.public().to_peer_id(), Default::default()), ping: ping::Behaviour::new(ping::Config::new()), @@ -83,6 +88,8 @@ async fn main() -> Result<(), Box> { .with(Protocol::QuicV1); swarm.listen_on(listen_addr_quic)?; + listen_on_websocket(&mut swarm, &opt)?; + loop { match swarm.next().await.expect("Infinite Stream.") { SwarmEvent::Behaviour(event) => { @@ -96,12 +103,29 @@ async fn main() -> Result<(), Box> { println!("{event:?}") } - SwarmEvent::NewListenAddr { address, .. } => { + SwarmEvent::NewListenAddr { mut address, .. } => { + address.push(Protocol::P2p(local_peer_id)); println!("Listening on {address:?}"); } _ => {} } } + + fn listen_on_websocket(swarm: &mut Swarm, opt: &Opt) -> Result<(), Box> { + match opt.ws_port { + Some(port) => { + let address = Multiaddr::from(Ipv4Addr::UNSPECIFIED) + .with(Protocol::Tcp(port)) + .with(Protocol::Ws(std::borrow::Cow::Borrowed("/"))); + + swarm.listen_on(address.clone())?; + } + None => { + tracing::info!("Does not use websocket"); + } + } + Ok(()) + } } #[derive(NetworkBehaviour)] @@ -132,4 +156,8 @@ struct Opt { /// The port used to listen on all interfaces #[arg(long)] port: u16, + + /// The websocket port used to listen on all interfaces + #[arg(long)] + ws_port: Option, } diff --git a/misc/webrtc-utils/CHANGELOG.md b/misc/webrtc-utils/CHANGELOG.md index 992f8354bba..ae5f6ccb5b4 100644 --- a/misc/webrtc-utils/CHANGELOG.md +++ b/misc/webrtc-utils/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.4.1 +- Implements `Default` trait for `Fingerprint` type. + ## 0.4.0 diff --git a/misc/webrtc-utils/Cargo.toml b/misc/webrtc-utils/Cargo.toml index 8c6eaedd1e3..6b35965afc5 100644 --- a/misc/webrtc-utils/Cargo.toml +++ b/misc/webrtc-utils/Cargo.toml @@ -7,7 +7,7 @@ license = "MIT" name = "libp2p-webrtc-utils" repository = "https://github.com/libp2p/rust-libp2p" rust-version = { workspace = true } -version = "0.4.0" +version = "0.4.1" publish = true [dependencies] diff --git a/misc/webrtc-utils/src/transport.rs b/misc/webrtc-utils/src/transport.rs index 60b1934082f..91dd803893f 100644 --- a/misc/webrtc-utils/src/transport.rs +++ b/misc/webrtc-utils/src/transport.rs @@ -24,6 +24,7 @@ pub fn parse_webrtc_dial_addr(addr: &Multiaddr) -> Option<(SocketAddr, Fingerpri (port, fingerprint) } + _ => return None, }; diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index fde8a2a6807..861cf622d56 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,6 +2,8 @@ - reduce allocations by replacing `get_or_insert` with `get_or_insert_with` See [PR 6136](https://github.com/libp2p/rust-libp2p/pull/6136) +- Export StatusCode type for public use. +- Fixed early return for non-relayed multiaddrs in transport to avoid unecessary processing. ## 0.21.0 @@ -32,7 +34,7 @@ - Fix manual closure of relayed listener. See [PR 5491](https://github.com/libp2p/rust-libp2p/pull/5491) - Add resource limits to `CircuitReq` to be set - See [PR 5493](https://github.com/libp2p/rust-libp2p/pull/5493) + See [PR 5493](https://github.com/libp2p/rust-libp2p/pull/5493) ## 0.17.2 diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index c5c17fd5137..45c8d05a4d5 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -166,6 +166,7 @@ impl libp2p_core::Transport for Transport { is_closed: false, waker: None, }; + self.listeners.push(listener); Ok(()) } @@ -192,6 +193,11 @@ impl libp2p_core::Transport for Transport { return Err(TransportError::MultiaddrNotSupported(addr)); } + if !addr.is_relayed() { + // This is not a relay address at all, pass it to next transport + return Err(TransportError::MultiaddrNotSupported(addr)); + } + let RelayedMultiaddr { relay_peer_id, relay_addr, diff --git a/transports/webrtc-websys/CHANGELOG.md b/transports/webrtc-websys/CHANGELOG.md index 5a550a5e8b1..35ee596c581 100644 --- a/transports/webrtc-websys/CHANGELOG.md +++ b/transports/webrtc-websys/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.4.1 +- Implement browser-to-browser WebRTC connection establishment over circuit relay v2. +- Add `SignalingBehaviour` for coordinating WebRTC signaling between browser peers. +- Add `SignalingHandler` for managing WebRTC offer/answer exchange over relay connections. +- Add support for automatic WebRTC connection upgrade from relay connections. +- Add configurable signaling parameters (retry attempts, timeouts, ICE gathering). + ## 0.4.0 - Cut stable release. diff --git a/transports/webrtc-websys/Cargo.toml b/transports/webrtc-websys/Cargo.toml index e5596da90e4..651dfd2c8a3 100644 --- a/transports/webrtc-websys/Cargo.toml +++ b/transports/webrtc-websys/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT" name = "libp2p-webrtc-websys" repository = "https://github.com/libp2p/rust-libp2p" rust-version = { workspace = true } -version = "0.4.0" +version = "0.4.1" publish = true [dependencies] @@ -18,14 +18,58 @@ getrandom = { workspace = true, features = ["js"] } hex = "0.4.3" js-sys = { version = "0.3" } libp2p-core = { workspace = true } +libp2p-identify = { workspace = true } +libp2p-ping = { workspace = true } libp2p-identity = { workspace = true } libp2p-webrtc-utils = { workspace = true } +libp2p-relay = { workspace = true } +libp2p-swarm = { workspace = true } +libp2p-yamux = { workspace = true } +libp2p-noise = { workspace = true } +libp2p-websocket-websys = { workspace = true } send_wrapper = { version = "0.6.0", features = ["futures"] } thiserror = { workspace = true } -tracing = { workspace = true } +tracing = { workspace = true, features = ["attributes"] } wasm-bindgen = { version = "0.2.90" } wasm-bindgen-futures = { version = "0.4.42" } -web-sys = { version = "0.3.70", features = ["Document", "Location", "MessageEvent", "Navigator", "RtcCertificate", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelEvent", "RtcDataChannelInit", "RtcDataChannelState", "RtcDataChannelType", "RtcPeerConnection", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", "Window"] } +web-sys = { version = "0.3.70", features = [ + "RtcPeerConnection", + "RtcConfiguration", + "RtcSessionDescription", + "RtcSessionDescriptionInit", + "RtcSdpType", + "RtcDataChannel", + "RtcDataChannelEvent", + "RtcDataChannelInit", + "RtcDataChannelState", + "RtcDataChannelType", + "RtcIceConnectionState", + "RtcIceGatheringState", + "RtcIceCandidate", + "RtcIceCandidateInit", + "RtcIceServer", + "RtcPeerConnectionIceEvent", + "Event", + "EventTarget", + "MessageEvent", + "Window", + "Document", + "Navigator", + "RtcCertificate", + "RtcPeerConnectionState", + "RtcSignalingState", + "Location", + "HtmlDocument", + "WebSocket" +]} +oneshot = "0.1.11" +multistream-select.workspace = true +pin-project = "1.1.10" +gloo-timers = { version = "0.3.0", features = ["futures"] } +serde_json = "1.0.140" +async-trait = "0.1.89" +quick-protobuf = "0.8.1" +web-time.workspace = true [lints] workspace = true diff --git a/transports/webrtc-websys/src/browser/behaviour.rs b/transports/webrtc-websys/src/browser/behaviour.rs new file mode 100644 index 00000000000..ed1cf63403e --- /dev/null +++ b/transports/webrtc-websys/src/browser/behaviour.rs @@ -0,0 +1,347 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, + task::Poll, + time::Duration, +}; + +use futures::{channel::oneshot, lock::Mutex, task::AtomicWaker}; +use libp2p_core::{multiaddr::Protocol, PeerId}; +use libp2p_swarm::{ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm}; +use tracing::info; +use web_time::Instant; + +use crate::browser::{ + handler::{FromBehaviourEvent, SignalingHandler, ToBehaviourEvent}, + transport::ConnectionRequest, +}; + +#[derive(Clone, Debug)] +pub struct SignalingConfig { + pub(crate) max_signaling_retries: u8, + pub(crate) max_ice_gathering_attempts: u8, + pub(crate) signaling_delay: Duration, + pub(crate) connection_establishment_delay_in_millis: Duration, + pub(crate) max_connection_establishment_checks: u32, + #[allow(dead_code)] + pub(crate) ice_gathering_timeout: Duration, + pub(crate) local_peer_id: PeerId, + pub(crate) stun_servers: Option>, +} + +impl SignalingConfig { + #[allow(clippy::too_many_arguments)] + pub fn new( + max_retries: u8, + max_ice_gathering_attempts: u8, + signaling_delay: Duration, + connection_establishment_delay_in_millis: Duration, + max_connection_establishment_checks: u32, + ice_gathering_timeout: Duration, + local_peer_id: PeerId, + stun_servers: Option>, + ) -> Self { + Self { + max_signaling_retries: max_retries, + max_ice_gathering_attempts, + signaling_delay, + connection_establishment_delay_in_millis, + max_connection_establishment_checks, + ice_gathering_timeout, + local_peer_id, + stun_servers, + } + } +} + +/// Signaling events returned to the swarm. +#[derive(Debug)] +pub enum SignalingEvent { + /// A new WebRTC connection has been established + NewWebRTCConnection { peer_id: PeerId }, + /// WebRTC connection establishment failed + WebRTCConnectionError { + peer_id: PeerId, + error: crate::Error, + }, +} + +/// State for tracking signaling with a specific peer +#[derive(Debug)] +struct PeerSignalingState { + discovered_at: Instant, + initiated: bool, + connection_id: ConnectionId, + is_relay_connection: bool, +} + +/// A [`Behaviour`] used to coordinate signaling between peers over a relay connection. +pub struct Behaviour { + /// Queued events to send to the swarm. + queued_events: VecDeque>, + /// Configuration parameters for the signaling process. + signaling_config: SignalingConfig, + /// Tracking state of peers involved in signaling. + peers: HashMap, + /// Pending dial requests from the transport + pending_dials: Arc>>>, + /// Established connections to inject + established_connections: Arc>>, + /// Track which connections are WebRTC connections + webrtc_connections: HashMap, + /// Established relay connections + established_relay_connections: HashMap, + transport_waker: Arc, +} + +impl Behaviour { + pub fn new( + config: SignalingConfig, + pending_dials: Arc>>>, + established_connections: Arc>>, + transport_waker: Arc, + ) -> Self { + Self { + queued_events: VecDeque::new(), + signaling_config: config, + peers: HashMap::new(), + pending_dials, + established_connections, + webrtc_connections: HashMap::new(), + established_relay_connections: HashMap::new(), + transport_waker, + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = SignalingHandler; + type ToSwarm = SignalingEvent; + + fn handle_established_inbound_connection( + &mut self, + connection_id: libp2p_swarm::ConnectionId, + peer: PeerId, + _local_addr: &libp2p_core::Multiaddr, + remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + let is_webrtc_conn = remote_addr.iter().any(|p| matches!(p, Protocol::WebRTC)); + + if is_webrtc_conn { + tracing::trace!("WebRTC connection established (inbound) with peer {}", peer); + self.webrtc_connections.insert(connection_id, peer); + + // Return a no-op handler for established WebRTC connections + Ok(SignalingHandler::new_established_webrtc( + self.signaling_config.local_peer_id, + peer, + )) + } else { + Ok(SignalingHandler::new( + self.signaling_config.local_peer_id, + peer, + self.signaling_config.clone(), + false, + )) + } + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: libp2p_swarm::ConnectionId, + peer: PeerId, + addr: &libp2p_core::Multiaddr, + _endpoint: libp2p_core::Endpoint, + _port_use: libp2p_core::transport::PortUse, + ) -> Result, libp2p_swarm::ConnectionDenied> { + let has_circuit = addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)); + let has_webrtc = addr.iter().any(|p| matches!(p, Protocol::WebRTC)); + + if has_webrtc && !has_circuit { + tracing::trace!( + "Direct WebRTC connection established (outbound) with peer {}", + peer + ); + self.webrtc_connections.insert(connection_id, peer); + + // Return a no-op handler for established WebRTC connections + Ok(SignalingHandler::new_established_webrtc( + self.signaling_config.local_peer_id, + peer, + )) + } else if has_circuit { + info!( + "Outbound relay connection to peer {} - preparing for WebRTC signaling", + peer + ); + + Ok(SignalingHandler::new( + self.signaling_config.local_peer_id, + peer, + self.signaling_config.clone(), + false, + )) + } else { + tracing::info!( + "Other outbound connection to peer {} on addr {}", + peer, + addr + ); + Ok(SignalingHandler::new( + self.signaling_config.local_peer_id, + peer, + self.signaling_config.clone(), + true, + )) + } + } + + fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { + match event { + libp2p_swarm::FromSwarm::ConnectionEstablished(connection_established) => { + let peer_id = connection_established.peer_id; + let connection_id = connection_established.connection_id; + let endpoint = connection_established.endpoint; + + if endpoint.is_relayed() && !self.webrtc_connections.contains_key(&connection_id) { + self.established_relay_connections + .insert(peer_id, connection_id); + + self.peers.insert( + peer_id, + PeerSignalingState { + discovered_at: Instant::now(), + initiated: false, + connection_id, + is_relay_connection: true, + }, + ); + } + } + libp2p_swarm::FromSwarm::ConnectionClosed(connection_closed) => { + let peer_id = connection_closed.peer_id; + let connection_id = connection_closed.connection_id; + + if self.webrtc_connections.remove(&connection_id).is_some() { + info!( + "WebRTC connection {} with peer {} closed", + connection_id, peer_id + ); + } else if self + .peers + .get(&peer_id) + .map(|state| state.connection_id == connection_id && state.is_relay_connection) + .unwrap_or(false) + { + tracing::debug!( + "Relay connection {} with peer {} closed (signaling state removed)", + connection_id, + peer_id + ); + + self.peers.remove(&peer_id); + } + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + _connection_id: libp2p_swarm::ConnectionId, + event: libp2p_swarm::THandlerOutEvent, + ) { + match event { + ToBehaviourEvent::WebRTCConnectionSuccess(connection) => { + let pending_dials = self.pending_dials.clone(); + let established_connections = self.established_connections.clone(); + let transport_waker = self.transport_waker.clone(); + + wasm_bindgen_futures::spawn_local(async move { + let mut pending = pending_dials.lock().await; + if let Some(sender) = pending.remove(&peer_id) { + let _ = sender.send(connection); + } else { + tracing::info!( + "No pending dial found, treating as inbound for peer {}", + peer_id + ); + + let mut established = established_connections.lock().await; + established.push_back(ConnectionRequest { + peer_id, + connection, + }); + + transport_waker.wake(); + } + }); + + self.peers.remove(&peer_id); + + self.queued_events.push_back(ToSwarm::GenerateEvent( + SignalingEvent::NewWebRTCConnection { peer_id }, + )); + } + ToBehaviourEvent::WebRTCConnectionFailure(error) => { + self.queued_events.push_back(ToSwarm::GenerateEvent( + SignalingEvent::WebRTCConnectionError { peer_id, error }, + )); + + self.peers.remove(&peer_id); + + // Notify any waiting dial + let pending_dials = self.pending_dials.clone(); + wasm_bindgen_futures::spawn_local(async move { + let mut pending = pending_dials.lock().await; + pending.remove(&peer_id); + }); + } + ToBehaviourEvent::SignalingRetry => { + if let Some(state) = self.peers.get_mut(&peer_id) { + state.initiated = false; + state.discovered_at = Instant::now(); + } + } + } + } + + fn poll( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + let now = Instant::now(); + let delay = self.signaling_config.signaling_delay; + + for (peer_id, state) in self.peers.iter_mut() { + tracing::trace!( + " Peer {}: initiated={}, is_relay={}, time_elapsed={:?}", + peer_id, + state.initiated, + state.is_relay_connection, + now.duration_since(state.discovered_at) + ); + + if !state.initiated + && state.is_relay_connection + && now.duration_since(state.discovered_at) >= delay + { + state.initiated = true; + + return Poll::Ready(ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(state.connection_id), + event: FromBehaviourEvent::InitiateSignaling, + }); + } + } + + Poll::Pending + } +} diff --git a/transports/webrtc-websys/src/browser/handler.rs b/transports/webrtc-websys/src/browser/handler.rs new file mode 100644 index 00000000000..eecf0db2bd2 --- /dev/null +++ b/transports/webrtc-websys/src/browser/handler.rs @@ -0,0 +1,412 @@ +use std::task::Poll; + +use futures::FutureExt; +use libp2p_core::{upgrade::ReadyUpgrade, PeerId}; +use libp2p_swarm::{ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol}; +use tracing::instrument; + +use crate::browser::{ + protocol::signaling::ProtocolHandler, Signaling, SignalingConfig, SignalingStream, + SIGNALING_STREAM_PROTOCOL, +}; + +/// Events sent from the Handler to the Behaviour. +#[derive(Debug)] +pub enum ToBehaviourEvent { + WebRTCConnectionSuccess(crate::Connection), + WebRTCConnectionFailure(crate::Error), + SignalingRetry, +} + +/// Events sent from the Behaviour to the Handler. +#[derive(Debug)] +pub enum FromBehaviourEvent { + /// Start signaling with this peer + InitiateSignaling, +} + +/// The current status of the signaling process for this handler. +#[derive(Debug, PartialEq)] +pub(crate) enum SignalingStatus { + /// Relay connection has been established but no signaling attempts have been made + Idle, + /// Currently signaling (either as initiator or responder) + Negotiating, + /// Awaiting the initiator to start signaling + AwaitingInitiation, + /// Waiting before signaling retry + WaitingForRetry, + /// Signaling completed + Complete, + /// Signaling failed + Fail, +} + +#[derive(Debug, PartialEq)] +enum SignalingRole { + Initiator, + Responder, +} + +/// Defines a HandlerType for the various situations in which the SignalingHandler can exist. +#[derive(Debug)] +enum HandlerType { + /// Handler for signaling over relay + Signaling { + role: Option, + retry_count: u8, + signaling_config: SignalingConfig, + signaling_status: SignalingStatus, + signaling_result_receiver: + Option>>, + }, + /// Handler for established WebRTC connection + EstablishedWebRTC, + /// No-op handler + Noop, +} + +/// Handles connection, behaviour and signaling related events. +#[derive(Debug)] +pub struct SignalingHandler { + local_peer_id: PeerId, + peer: PeerId, + handler_type: HandlerType, +} + +impl SignalingHandler { + pub fn new( + local_peer_id: PeerId, + peer: PeerId, + config: SignalingConfig, + is_noop: bool, + ) -> Self { + let handler_type = if is_noop { + HandlerType::Noop + } else { + HandlerType::Signaling { + role: None, + retry_count: 0, + signaling_config: config, + signaling_status: SignalingStatus::Idle, + signaling_result_receiver: None, + } + }; + + Self { + local_peer_id, + peer, + handler_type, + } + } + + /// Create a handler for an already established WebRTC connection. + pub fn new_established_webrtc(local_peer_id: PeerId, peer: PeerId) -> Self { + Self { + local_peer_id, + peer, + handler_type: HandlerType::EstablishedWebRTC, + } + } + + /// Defines whether a peer should be the initiator of the signaling process. + fn should_be_initiator(&self, remote_peer: &PeerId) -> bool { + self.local_peer_id < *remote_peer + } + + /// Check if signaling should be retried based on the error and current state. + fn should_retry(&self, _error: &crate::Error) -> bool { + match &self.handler_type { + HandlerType::Signaling { + role, + retry_count, + signaling_config, + .. + } => { + *role == Some(SignalingRole::Initiator) + && *retry_count < signaling_config.max_signaling_retries + } + _ => false, + } + } + + /// Handle successful signaling completion. + fn handle_signaling_success( + &mut self, + connection: crate::Connection, + ) -> ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + > { + if let HandlerType::Signaling { + signaling_status, + signaling_result_receiver, + .. + } = &mut self.handler_type + { + *signaling_status = SignalingStatus::Complete; + *signaling_result_receiver = None; + } + + ConnectionHandlerEvent::NotifyBehaviour(ToBehaviourEvent::WebRTCConnectionSuccess( + connection, + )) + } + + /// Handle signaling failure. Signaling will be retried up to a configurable max number of + /// retries. + fn handle_signaling_failure( + &mut self, + error: crate::Error, + ) -> ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + > { + if self.should_retry(&error) { + if let HandlerType::Signaling { + retry_count, + signaling_config, + .. + } = &self.handler_type + { + tracing::info!( + "Retrying signaling attempt {} of {} with peer {}", + retry_count + 1, + signaling_config.max_signaling_retries, + self.peer + ); + } + + self.reset_for_retry(); + ConnectionHandlerEvent::NotifyBehaviour(ToBehaviourEvent::SignalingRetry) + } else { + tracing::error!("WebRTC signaling failed {:?}", error); + + if let HandlerType::Signaling { + signaling_status, .. + } = &mut self.handler_type + { + *signaling_status = SignalingStatus::Fail; + } + + ConnectionHandlerEvent::NotifyBehaviour(ToBehaviourEvent::WebRTCConnectionFailure( + error, + )) + } + } + + /// Reset handler state for another retry attempt. + fn reset_for_retry(&mut self) { + if let HandlerType::Signaling { + signaling_status, + retry_count, + role, + signaling_result_receiver, + .. + } = &mut self.handler_type + { + *signaling_status = SignalingStatus::WaitingForRetry; + *retry_count += 1; + *role = None; + *signaling_result_receiver = None; + } + } +} + +impl ConnectionHandler for SignalingHandler { + type FromBehaviour = FromBehaviourEvent; + type ToBehaviour = ToBehaviourEvent; + type InboundProtocol = ReadyUpgrade; + type OutboundProtocol = ReadyUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol( + &self, + ) -> libp2p_swarm::SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(SIGNALING_STREAM_PROTOCOL), ()) + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll< + libp2p_swarm::ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + >, + > { + match &mut self.handler_type { + HandlerType::Noop | HandlerType::EstablishedWebRTC => Poll::Pending, + HandlerType::Signaling { + signaling_status, + signaling_result_receiver, + .. + } => { + if *signaling_status == SignalingStatus::Complete { + return Poll::Pending; + } + + if let Some(mut receiver) = signaling_result_receiver.take() { + match receiver.poll_unpin(cx) { + Poll::Ready(Ok(Ok(connection))) => { + return Poll::Ready(self.handle_signaling_success(connection)); + } + Poll::Ready(Ok(Err(err))) => { + return Poll::Ready(self.handle_signaling_failure(err)); + } + Poll::Ready(Err(_)) => { + tracing::error!("Signaling result channel dropped"); + let error = crate::Error::Connection( + "Signaling channel dropped unexpectedly".to_string(), + ); + return Poll::Ready(self.handle_signaling_failure(error)); + } + Poll::Pending => { + *signaling_result_receiver = Some(receiver); + } + } + } + + // If the signaling status is AwaitingInitiation request to open a new substream + // on the relay connection and start negotiation, i.e. signaling. This will not work + // if the relay connection is closed immediately after the WebRTC connection is + // established. + // + // In order to ensure signaling can happen the relay server will need to stay alive + // until signaling is complete. + if *signaling_status == SignalingStatus::AwaitingInitiation { + *signaling_status = SignalingStatus::Negotiating; + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + ReadyUpgrade::new(SIGNALING_STREAM_PROTOCOL), + (), + ), + }); + } + + Poll::Pending + } + } + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + let should_be_initiator = self.should_be_initiator(&self.peer); + if let ( + HandlerType::Signaling { + signaling_status, .. + }, + FromBehaviourEvent::InitiateSignaling, + ) = (&mut self.handler_type, event) + { + if *signaling_status == SignalingStatus::Idle + || *signaling_status == SignalingStatus::WaitingForRetry + { + if should_be_initiator { + *signaling_status = SignalingStatus::AwaitingInitiation; + tracing::trace!("Signaling status updated to `AwaitingInitiation`"); + } else { + *signaling_status = SignalingStatus::Negotiating; + tracing::trace!("Signaling status updated to `Negotiating`"); + } + } + } + } + + #[instrument(skip(self))] + fn on_connection_event( + &mut self, + event: libp2p_swarm::handler::ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match &mut self.handler_type { + HandlerType::Noop | HandlerType::EstablishedWebRTC => return, + HandlerType::Signaling { + role, + signaling_status, + signaling_config, + signaling_result_receiver, + .. + } => match event { + libp2p_swarm::handler::ConnectionEvent::FullyNegotiatedInbound( + fully_negotiated_inbound, + ) => { + if role.is_some() { + return; + } + + *role = Some(SignalingRole::Responder); + *signaling_status = SignalingStatus::Negotiating; + + let substream = fully_negotiated_inbound.protocol; + let signaling_protocol = ProtocolHandler::new(signaling_config.clone()); + + let (tx, rx) = futures::channel::oneshot::channel(); + *signaling_result_receiver = Some(rx); + + wasm_bindgen_futures::spawn_local(async move { + let signaling_result = signaling_protocol + .signaling_as_responder(SignalingStream::new(substream)) + .await; + + let _ = tx.send(signaling_result); + }); + } + libp2p_swarm::handler::ConnectionEvent::FullyNegotiatedOutbound( + fully_negotiated_outbound, + ) => { + if role.is_some() { + return; + } + + *role = Some(SignalingRole::Initiator); + let substream = fully_negotiated_outbound.protocol; + let signaling_protocol = ProtocolHandler::new(signaling_config.clone()); + + let (tx, rx) = futures::channel::oneshot::channel(); + *signaling_result_receiver = Some(rx); + + wasm_bindgen_futures::spawn_local(async move { + let signaling_result = signaling_protocol + .signaling_as_initiator(SignalingStream::new(substream)) + .await; + + let _ = tx.send(signaling_result); + }); + } + libp2p_swarm::handler::ConnectionEvent::DialUpgradeError(error) => { + if *role == Some(SignalingRole::Initiator) + || *signaling_status == SignalingStatus::Negotiating + { + tracing::error!( + "Outbound signaling upgrade failed with peer {}: {:?}", + self.peer, + error + ); + *signaling_status = SignalingStatus::Fail; + } + } + libp2p_swarm::handler::ConnectionEvent::ListenUpgradeError(error) => { + if *role == Some(SignalingRole::Responder) + || *signaling_status == SignalingStatus::Negotiating + { + tracing::error!( + "Inbound signaling upgrade failed with peer {}: {:?}", + self.peer, + error + ); + *signaling_status = SignalingStatus::Fail; + } + } + _ => {} + }, + } + } +} diff --git a/transports/webrtc-websys/src/browser/mod.rs b/transports/webrtc-websys/src/browser/mod.rs new file mode 100644 index 00000000000..b86e1505fbe --- /dev/null +++ b/transports/webrtc-websys/src/browser/mod.rs @@ -0,0 +1,13 @@ +pub(crate) mod behaviour; +pub(crate) mod handler; +pub(crate) mod protocol; +pub(crate) mod stream; +pub(crate) mod transport; + +pub use behaviour::{Behaviour, SignalingConfig, SignalingEvent}; +pub use protocol::{ + ProtocolHandler, Signaling, SignalingProtocolUpgrade, SIGNALING_PROTOCOL_ID, + SIGNALING_STREAM_PROTOCOL, +}; +pub use stream::SignalingStream; +pub use transport::{Config, Transport}; diff --git a/transports/webrtc-websys/src/browser/protocol/mod.rs b/transports/webrtc-websys/src/browser/protocol/mod.rs new file mode 100644 index 00000000000..44e72ded6f6 --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/mod.rs @@ -0,0 +1,10 @@ +pub(crate) mod proto; +pub(crate) mod signaling; +pub(crate) mod stream_protocol; +pub(crate) mod upgrade; + +pub use signaling::{ProtocolHandler, Signaling}; +pub use stream_protocol::{SIGNALING_PROTOCOL_ID, SIGNALING_STREAM_PROTOCOL}; +pub use upgrade::SignalingProtocolUpgrade; + +pub(crate) use self::signaling::ConnectionCallbacks; diff --git a/transports/webrtc-websys/src/browser/protocol/proto/message.proto b/transports/webrtc-websys/src/browser/protocol/proto/message.proto new file mode 100644 index 00000000000..db987a7b365 --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/proto/message.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +package signaling; + +message SignalingMessage { + enum Type { + SDP_OFFER = 0; + SDP_ANSWER = 1; + ICE_CANDIDATE = 2; + } + + Type type = 1; + string data = 2; +} \ No newline at end of file diff --git a/transports/webrtc-websys/src/browser/protocol/proto/mod.rs b/transports/webrtc-websys/src/browser/protocol/proto/mod.rs new file mode 100644 index 00000000000..316ba39f72b --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/proto/mod.rs @@ -0,0 +1 @@ +pub(crate) mod signaling; diff --git a/transports/webrtc-websys/src/browser/protocol/proto/signaling.rs b/transports/webrtc-websys/src/browser/protocol/proto/signaling.rs new file mode 100644 index 00000000000..b77115f4414 --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/proto/signaling.rs @@ -0,0 +1,90 @@ +// Automatically generated rust module for 'message.proto' file + +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(unused_imports)] +#![allow(unknown_lints)] +#![allow(clippy::all)] +#![cfg_attr(rustfmt, rustfmt_skip)] + + +use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; +use quick_protobuf::sizeofs::*; +use super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct SignalingMessage { + pub type_pb: signaling::mod_SignalingMessage::Type, + pub data: String, +} + +impl<'a> MessageRead<'a> for SignalingMessage { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.type_pb = r.read_enum(bytes)?, + Ok(18) => msg.data = r.read_string(bytes)?.to_owned(), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for SignalingMessage { + fn get_size(&self) -> usize { + 0 + + if self.type_pb == signaling::mod_SignalingMessage::Type::SDP_OFFER { 0 } else { 1 + sizeof_varint(*(&self.type_pb) as u64) } + + if self.data == String::default() { 0 } else { 1 + sizeof_len((&self.data).len()) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.type_pb != signaling::mod_SignalingMessage::Type::SDP_OFFER { w.write_with_tag(8, |w| w.write_enum(*&self.type_pb as i32))?; } + if self.data != String::default() { w.write_with_tag(18, |w| w.write_string(&**&self.data))?; } + Ok(()) + } +} + +pub(crate) mod mod_SignalingMessage { + + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Type { + SDP_OFFER = 0, + SDP_ANSWER = 1, + ICE_CANDIDATE = 2, +} + +impl Default for Type { + fn default() -> Self { + Type::SDP_OFFER + } +} + +impl From for Type { + fn from(i: i32) -> Self { + match i { + 0 => Type::SDP_OFFER, + 1 => Type::SDP_ANSWER, + 2 => Type::ICE_CANDIDATE, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for Type { + fn from(s: &'a str) -> Self { + match s { + "SDP_OFFER" => Type::SDP_OFFER, + "SDP_ANSWER" => Type::SDP_ANSWER, + "ICE_CANDIDATE" => Type::ICE_CANDIDATE, + _ => Self::default(), + } + } +} + +} diff --git a/transports/webrtc-websys/src/browser/protocol/signaling.rs b/transports/webrtc-websys/src/browser/protocol/signaling.rs new file mode 100644 index 00000000000..63eec021f38 --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/signaling.rs @@ -0,0 +1,695 @@ +use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use futures::{lock::Mutex, AsyncRead, AsyncWrite}; +use tracing::{info, instrument}; +use wasm_bindgen::{prelude::Closure, JsCast, JsValue}; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + RtcIceCandidate, RtcIceCandidateInit, RtcIceConnectionState, RtcIceGatheringState, + RtcPeerConnectionIceEvent, RtcPeerConnectionState, RtcSdpType, RtcSessionDescriptionInit, + RtcSignalingState, +}; + +use crate::{ + browser::{ + protocol::proto::signaling::{mod_SignalingMessage::Type, SignalingMessage}, + SignalingConfig, SignalingStream, + }, + connection::RtcPeerConnection, + error::Error, + Connection, +}; + +// Implementation of WebRTC signaling protocol. This implementation follows +// the specification here: https://github.com/libp2p/specs/blob/master/webrtc/webrtc.md. + +/// Connection states for ICE connection, ICE gathering, signaling +/// and the peer connection. +#[derive(Clone, Debug)] +struct ConnectionState { + pub(crate) ice_connection: RtcIceConnectionState, + pub(crate) ice_gathering: RtcIceGatheringState, + pub(crate) signaling: RtcSignalingState, + pub(crate) peer_connection: RtcPeerConnectionState, +} + +/// Callbacks for ICE connection, ICE gathering, peer connection signaling +/// and ice candidate retrieval. +#[derive(Debug)] +pub(crate) struct ConnectionCallbacks { + _ice_connection_callback: Closure, + _ice_gathering_callback: Closure, + _peer_connection_callback: Closure, + _signaling_callback: Closure, +} + +/// Collected ICE candidates during the gathering phase. +#[derive(Debug, Clone)] +struct IceCandidateCollection { + candidates: Vec, + gathering_complete: bool, +} + +impl IceCandidateCollection { + fn new() -> Self { + Self { + candidates: Vec::new(), + gathering_complete: false, + } + } + + /// Adds an ice candidate to the candidates collection. + fn add_candidate(&mut self, candidate: Option) { + match candidate { + Some(candidate) => { + tracing::trace!("Collected ICE candidate: {}", candidate.candidate()); + self.candidates.push(candidate); + } + None => { + tracing::info!("ICE gathering complete"); + self.gathering_complete = true; + } + } + } + + /// Removes and returns candidates from the candidates vector. + fn drain_candidates(&mut self) -> Vec { + self.candidates.drain(..).collect() + } +} + +/// Helper function to safely convert ice connection state +fn safe_ice_connection_state_from_js(js_val: JsValue) -> RtcIceConnectionState { + if let Some(state_str) = js_val.as_string() { + match state_str.as_str() { + "new" => RtcIceConnectionState::New, + "checking" => RtcIceConnectionState::Checking, + "connected" => RtcIceConnectionState::Connected, + "completed" => RtcIceConnectionState::Completed, + "failed" => RtcIceConnectionState::Failed, + "disconnected" => RtcIceConnectionState::Disconnected, + "closed" => RtcIceConnectionState::Closed, + _ => { + tracing::warn!( + "Unknown ICE connection state: '{}', defaulting to New", + state_str + ); + RtcIceConnectionState::New + } + } + } else { + tracing::warn!("ICE connection state is not a string: {:?}", js_val); + RtcIceConnectionState::New + } +} + +/// Helper function to safely convert ice gathering state +fn safe_ice_gathering_state_from_js(js_val: JsValue) -> RtcIceGatheringState { + if let Some(state_str) = js_val.as_string() { + match state_str.as_str() { + "new" => RtcIceGatheringState::New, + "gathering" => RtcIceGatheringState::Gathering, + "complete" => RtcIceGatheringState::Complete, + _ => { + tracing::warn!( + "Unknown ICE gathering state: '{}', defaulting to New", + state_str + ); + RtcIceGatheringState::New + } + } + } else { + tracing::warn!("ICE gathering state is not a string: {:?}", js_val); + RtcIceGatheringState::New + } +} + +/// Helper function to safely convert peer connection state +fn safe_peer_connection_state_from_js(js_val: JsValue) -> RtcPeerConnectionState { + if let Some(state_str) = js_val.as_string() { + match state_str.as_str() { + "new" => RtcPeerConnectionState::New, + "connecting" => RtcPeerConnectionState::Connecting, + "connected" => RtcPeerConnectionState::Connected, + "disconnected" => RtcPeerConnectionState::Disconnected, + "failed" => RtcPeerConnectionState::Failed, + "closed" => RtcPeerConnectionState::Closed, + _ => { + tracing::warn!( + "Unknown peer connection state: '{}', defaulting to New", + state_str + ); + RtcPeerConnectionState::New + } + } + } else { + tracing::warn!("Peer connection state is not a string: {:?}", js_val); + RtcPeerConnectionState::New + } +} + +/// Helper function to safely convert signaling state +fn safe_signaling_state_from_js(js_val: JsValue) -> RtcSignalingState { + if let Some(state_str) = js_val.as_string() { + match state_str.as_str() { + "stable" => RtcSignalingState::Stable, + "have-local-offer" => RtcSignalingState::HaveLocalOffer, + "have-remote-offer" => RtcSignalingState::HaveRemoteOffer, + "have-local-pranswer" => RtcSignalingState::HaveLocalPranswer, + "have-remote-pranswer" => RtcSignalingState::HaveRemotePranswer, + "closed" => RtcSignalingState::Closed, + _ => { + tracing::warn!( + "Unknown signaling state: '{}', defaulting to Closed", + state_str + ); + RtcSignalingState::Closed + } + } + } else { + tracing::warn!("Signaling state is not a string: {:?}", js_val); + RtcSignalingState::Closed + } +} + +#[async_trait(?Send)] +pub trait Signaling { + /// Performs WebRTC signaling as the initiator. + async fn signaling_as_initiator( + &self, + stream: SignalingStream, + ) -> Result; + + /// Performs WebRTC signaling as the responder. + async fn signaling_as_responder( + &self, + stream: SignalingStream, + ) -> Result; +} + +#[derive(Debug)] +pub struct ProtocolHandler { + states: send_wrapper::SendWrapper>>, + config: SignalingConfig, +} + +impl ProtocolHandler { + pub fn new(config: SignalingConfig) -> Self { + Self { + states: send_wrapper::SendWrapper::new(Rc::new(RefCell::new(ConnectionState { + ice_connection: RtcIceConnectionState::New, + ice_gathering: RtcIceGatheringState::New, + signaling: RtcSignalingState::Closed, + peer_connection: RtcPeerConnectionState::Closed, + }))), + config, + } + } +} + +impl ProtocolHandler { + /// Sets up the peer connection state callbacks including ICE connection, ICE gathering, + /// peer connection and signaling. + fn setup_peer_connection_state_callbacks( + &self, + connection: &web_sys::RtcPeerConnection, + ) -> ConnectionCallbacks { + tracing::trace!("Setting up peer connection state callbacks"); + + // Setup callbacks for state management + // ICE connection state callback + let states = self.states.clone(); + let ice_connection_callback = Closure::wrap(Box::new(move |event: web_sys::Event| { + if let Some(target) = event.target() { + if let Some(pc) = target.dyn_ref::() { + let state_js = pc.ice_connection_state(); + let state = safe_ice_connection_state_from_js(state_js.into()); + + tracing::debug!("ICE connection state changed to: {:?}", state); + states.borrow_mut().ice_connection = state; + } + } + }) as Box); + + connection + .set_oniceconnectionstatechange(Some(ice_connection_callback.as_ref().unchecked_ref())); + + // ICE gathering state callback + let states = self.states.clone(); + let ice_gathering_callback = Closure::wrap(Box::new(move |event: web_sys::Event| { + if let Some(target) = event.target() { + if let Some(pc) = target.dyn_ref::() { + let state_js = pc.ice_gathering_state(); + let state = safe_ice_gathering_state_from_js(state_js.into()); + + tracing::debug!("ICE gathering state changed to: {:?}", state); + states.borrow_mut().ice_gathering = state; + } + } + }) as Box); + + connection + .set_onicegatheringstatechange(Some(ice_gathering_callback.as_ref().unchecked_ref())); + + // Peer connection state callback + let states = self.states.clone(); + let peer_connection_callback = Closure::wrap(Box::new(move |event: web_sys::Event| { + if let Some(target) = event.target() { + if let Some(pc) = target.dyn_ref::() { + let state_js = pc.connection_state(); + let state = safe_peer_connection_state_from_js(state_js.into()); + + tracing::debug!("Peer connection state changed to: {:?}", state); + states.borrow_mut().peer_connection = state; + } + } + }) as Box); + + connection + .set_onconnectionstatechange(Some(peer_connection_callback.as_ref().unchecked_ref())); + + // Signaling state callback + let states = self.states.clone(); + let signaling_callback = Closure::wrap(Box::new(move |event: web_sys::Event| { + if let Some(target) = event.target() { + if let Some(pc) = target.dyn_ref::() { + let state_js = pc.signaling_state(); + let state = safe_signaling_state_from_js(state_js.into()); + + tracing::debug!("Signaling state changed to: {:?}", state); + states.borrow_mut().signaling = state; + } + } + }) as Box); + + connection.set_onsignalingstatechange(Some(signaling_callback.as_ref().unchecked_ref())); + + // Create the callbacks struct to keep closures alive. Callbacks will be stored in the + // Connection to be dropped when the Connection drops. + ConnectionCallbacks { + _ice_connection_callback: ice_connection_callback, + _ice_gathering_callback: ice_gathering_callback, + _peer_connection_callback: peer_connection_callback, + _signaling_callback: signaling_callback, + } + } + + /// Sets up ICE candidate collection for finite signaling + fn setup_ice_candidate_collection( + &self, + connection: &web_sys::RtcPeerConnection, + ) -> ( + Rc>, + Closure, + ) { + let ice_candidates = Rc::new(RefCell::new(IceCandidateCollection::new())); + + let ice_candidate_callback = { + let ice_candidates = ice_candidates.clone(); + Closure::wrap(Box::new(move |event: RtcPeerConnectionIceEvent| { + ice_candidates.borrow_mut().add_candidate(event.candidate()); + }) as Box) + }; + + connection.set_onicecandidate(Some(ice_candidate_callback.as_ref().unchecked_ref())); + + (ice_candidates, ice_candidate_callback) + } + + /// Waits for ICE gathering to complete + async fn wait_for_ice_gathering_complete(&self) -> Result<(), Error> { + let mut attempts = 0; + tracing::trace!("Waiting for ice gathering to complete"); + + loop { + let current_state = self.states.borrow().ice_gathering; + + match current_state { + RtcIceGatheringState::Complete => { + tracing::info!("ICE gathering completed"); + break; + } + RtcIceGatheringState::New | RtcIceGatheringState::Gathering => { + attempts += 1; + if attempts >= self.config.max_ice_gathering_attempts { + tracing::warn!("ICE gathering timeout, proceeding anyway"); + break; + } + gloo_timers::future::sleep(Duration::from_millis(100)).await; + } + _ => { + tracing::warn!("Unexpected ICE gathering state: {:?}", current_state); + break; + } + } + } + + Ok(()) + } + + /// Waits for the RtcPeerConnectionState to change to a `Connected` state for a + /// configurable max number of attempts. + async fn wait_for_established_conn(&self) -> Result<(), Error> { + let mut attempts = 0; + info!("Waiting for connection to establish"); + loop { + let current_states = self.states.borrow().clone(); + + tracing::debug!( + "Connection status check #{}: ICE={:?}, Peer={:?}, Signaling={:?}, Gathering={:?}", + attempts + 1, + current_states.ice_connection, + current_states.peer_connection, + current_states.signaling, + current_states.ice_gathering + ); + + match current_states.peer_connection { + RtcPeerConnectionState::Connected => { + tracing::info!("Peer connection state transitioned to connected"); + break; + } + RtcPeerConnectionState::Failed => { + tracing::error!("Peer connection state transitioned to failed"); + return Err(Error::Connection("Peer connection failed".to_string())); + } + _ => { + attempts += 1; + if attempts >= self.config.max_connection_establishment_checks { + tracing::error!( + "Final states: ICE={:?}, Peer={:?}, Signaling={:?}, Gathering={:?}", + current_states.ice_connection, + current_states.peer_connection, + current_states.signaling, + current_states.ice_gathering + ); + return Err(Error::Connection("Connection timeout".to_string())); + } + + gloo_timers::future::sleep( + self.config.connection_establishment_delay_in_millis, + ) + .await; + } + } + } + + let current_states = self.states.borrow().clone(); + if current_states.peer_connection != web_sys::RtcPeerConnectionState::Connected { + tracing::error!("Rtc peer connection failed. Connection not properly established."); + return Err(Error::Connection( + "Connection not properly established".to_string(), + )); + } + + Ok(()) + } + + /// Parse ICE candidate from JSON message + fn parse_ice_candidate(message: &SignalingMessage) -> Option { + if let Ok(candidate_json) = serde_json::from_str::(&message.data) { + if let Some(candidate_str) = candidate_json.get("candidate").and_then(|v| v.as_str()) { + let candidate_init = RtcIceCandidateInit::new(candidate_str); + + if let Some(sdp_mid) = candidate_json.get("sdpMid").and_then(|v| v.as_str()) { + candidate_init.set_sdp_mid(Some(sdp_mid)); + } + + if let Some(sdp_m_line_index) = + candidate_json.get("sdpMLineIndex").and_then(|v| v.as_u64()) + { + candidate_init.set_sdp_m_line_index(Some(sdp_m_line_index as u16)); + } + + return Some(candidate_init); + } + } + + None + } + + /// Sends collected ICE candidates through the signaling stream + async fn send_ice_candidates( + stream: &Arc>>, + candidates: Vec, + ) -> Result<(), Error> { + tracing::info!("Sending {} ICE candidates", candidates.len()); + + for candidate in candidates { + let candidate_json = candidate.to_json(); + let candidate_as_str = js_sys::JSON::stringify(&candidate_json) + .unwrap() + .as_string() + .unwrap_or_default(); + + let message = SignalingMessage { + type_pb: Type::ICE_CANDIDATE, + data: candidate_as_str, + }; + + stream + .lock() + .await + .write(message) + .await + .map_err(|_| Error::Js("Failed to send ICE candidate".to_string()))?; + } + + // Send end-of-candidates marker as an empty JSON object + let end_message = SignalingMessage { + type_pb: Type::ICE_CANDIDATE, + data: "{}".to_string(), + }; + + stream + .lock() + .await + .write(end_message) + .await + .map_err(|_| Error::Js("Failed to send end-of-candidates marker".to_string()))?; + + tracing::info!("ICE candidate transmission complete"); + Ok(()) + } + + /// Receives ICE candidates from the signaling stream until end marker + async fn receive_ice_candidates( + stream: &Arc>>, + connection: &web_sys::RtcPeerConnection, + ) -> Result<(), Error> { + tracing::info!("Receiving ICE candidates from remote peer"); + + loop { + let message = stream + .lock() + .await + .read() + .await + .map_err(|_| Error::Js("Failed to read ICE candidate".to_string()))?; + + if message.type_pb != Type::ICE_CANDIDATE { + return Err(Error::Connection( + "Expected ICE candidate message".to_string(), + )); + } + + // Check for end-of-candidates marker + if message.data == "{}" { + tracing::info!("Received end-of-candidates marker"); + break; + } + + tracing::trace!("Received remote ICE candidate: {}", message.data); + + if let Some(candidate_init) = Self::parse_ice_candidate(&message) { + if let Err(e) = JsFuture::from( + connection + .add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&candidate_init)), + ) + .await + { + tracing::error!("Failed to add remote ICE candidate: {:?}", e); + } + } else { + tracing::warn!("Failed to parse ICE candidate: {}", message.data); + } + } + + tracing::info!("ICE candidate reception complete"); + Ok(()) + } +} + +#[async_trait(?Send)] +impl Signaling for ProtocolHandler { + #[instrument(skip(stream), fields(initiator = false))] + async fn signaling_as_responder( + &self, + stream: SignalingStream, + ) -> Result { + tracing::info!("Starting WebRTC signaling as responder"); + + let rtc_conn = + RtcPeerConnection::new("sha-256".to_string(), self.config.stun_servers.clone()).await?; + let connection = rtc_conn.inner(); + + let pb_stream = Arc::new(Mutex::new(stream)); + let callbacks = self.setup_peer_connection_state_callbacks(connection); + let (ice_candidates, ice_callback) = self.setup_ice_candidate_collection(connection); + + // Read SDP offer + let offer_message = pb_stream.lock().await.read().await.map_err(|_| { + Error::Js("Failure to read SDP offer from signaling stream".to_string()) + })?; + + if offer_message.type_pb != Type::SDP_OFFER { + return Err(Error::Connection("Expected SDP offer".to_string())); + } + + // Set remote description with remote offer + let offer_init = RtcSessionDescriptionInit::new(RtcSdpType::Offer); + offer_init.set_sdp(&offer_message.data); + + JsFuture::from(connection.set_remote_description(&offer_init)) + .await + .map_err(|_| Error::Js("Could not set remote description".to_string()))?; + + // Create answer and set local description + let answer = JsFuture::from(connection.create_answer()).await?; + let answer_sdp = js_sys::Reflect::get(&answer, &JsValue::from_str("sdp"))? + .as_string() + .ok_or_else(|| Error::Js("Could not extract SDP from answer".to_string()))?; + + let answer_init = RtcSessionDescriptionInit::new(RtcSdpType::Answer); + answer_init.set_sdp(&answer_sdp); + + JsFuture::from(connection.set_local_description(&answer_init)) + .await + .map_err(|_| Error::Js("Could not set local description".to_string()))?; + + // Send SDP answer + let answer_message = SignalingMessage { + type_pb: Type::SDP_ANSWER, + data: answer_sdp, + }; + + pb_stream + .lock() + .await + .write(answer_message) + .await + .map_err(|_| { + Error::Js("Failure to write SDP answer to signaling stream".to_string()) + })?; + + self.wait_for_ice_gathering_complete().await?; + + let candidates = ice_candidates.borrow_mut().drain_candidates(); + Self::send_ice_candidates(&pb_stream, candidates).await?; + Self::receive_ice_candidates(&pb_stream, connection).await?; + + self.wait_for_established_conn().await?; + + tracing::info!( + "WebRTC connection established - Current state: {:?}", + connection.connection_state() + ); + + // Clean up callbacks and close signaling stream. ice_callbacks is only used during the + // signaling process so its not saved in `ConnectionCallbacks`, but dropped immediately + drop(ice_callback); + drop(pb_stream); + + tracing::info!("Completed signaling."); + Ok(Connection::new(rtc_conn, Some(callbacks))) + } + + #[instrument(skip(stream), fields(initiator = true))] + async fn signaling_as_initiator( + &self, + stream: SignalingStream, + ) -> Result { + tracing::info!("Starting WebRTC signaling as initiator"); + + let rtc_conn = + RtcPeerConnection::new("sha-256".to_string(), self.config.stun_servers.clone()).await?; + let connection = rtc_conn.inner(); + + let pb_stream = Arc::new(Mutex::new(stream)); + let callbacks = self.setup_peer_connection_state_callbacks(connection); + let (ice_candidates, _ice_callback) = self.setup_ice_candidate_collection(connection); + + // Create a data channel to ensure ICE information is shared in the SDP + tracing::trace!("Creating init data channel"); + let data_channel = connection.create_data_channel("init"); + + // Create offer + let offer = JsFuture::from(connection.create_offer()).await?; + let offer_sdp = js_sys::Reflect::get(&offer, &JsValue::from_str("sdp"))? + .as_string() + .ok_or_else(|| Error::Js("Could not extract SDP from offer".to_string()))?; + + let offer_init = RtcSessionDescriptionInit::new(RtcSdpType::Offer); + offer_init.set_sdp(&offer_sdp); + + JsFuture::from(connection.set_local_description(&offer_init)) + .await + .map_err(|_| Error::Js("Could not set local description".to_string()))?; + + // Write SDP offer to the signaling stream + let message = SignalingMessage { + type_pb: Type::SDP_OFFER, + data: offer_sdp, + }; + + pb_stream + .lock() + .await + .write(message) + .await + .map_err(|_| Error::Js("Failure to write SDP offer to signaling stream".to_string()))?; + + // Read SDP answer + let answer_message = pb_stream.lock().await.read().await.map_err(|_| { + Error::Js("Failure to read SDP answer from signaling stream".to_string()) + })?; + + if answer_message.type_pb != Type::SDP_ANSWER { + return Err(Error::Connection("Expected SDP answer".to_string())); + } + + let answer_init = RtcSessionDescriptionInit::new(RtcSdpType::Answer); + answer_init.set_sdp(&answer_message.data); + + // Set answer as remote description + JsFuture::from(connection.set_remote_description(&answer_init)) + .await + .map_err(|_| Error::Js("Could not set remote description".to_string()))?; + + self.wait_for_ice_gathering_complete().await?; + + let candidates = ice_candidates.borrow_mut().drain_candidates(); + Self::send_ice_candidates(&pb_stream, candidates).await?; + Self::receive_ice_candidates(&pb_stream, connection).await?; + + self.wait_for_established_conn().await?; + + tracing::info!( + "WebRTC connection established - Current state: {:?}", + connection.connection_state() + ); + + // Clean up callbacks, close data channel, and close signaling stream + + // Clean up callbacks, close data channel and close signaling stream. ice_callbacks is only + // used during the signaling process so its not saved in `ConnectionCallbacks`, but + // dropped immediately + data_channel.close(); + drop(_ice_callback); + drop(pb_stream); + + tracing::info!("Signaling complete."); + Ok(Connection::new(rtc_conn, Some(callbacks))) + } +} diff --git a/transports/webrtc-websys/src/browser/protocol/stream_protocol.rs b/transports/webrtc-websys/src/browser/protocol/stream_protocol.rs new file mode 100644 index 00000000000..e276502dd8b --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/stream_protocol.rs @@ -0,0 +1,4 @@ +use libp2p_swarm::StreamProtocol; + +pub const SIGNALING_PROTOCOL_ID: &str = "/webrtc-signaling/0.0.1"; +pub const SIGNALING_STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new(SIGNALING_PROTOCOL_ID); diff --git a/transports/webrtc-websys/src/browser/protocol/upgrade.rs b/transports/webrtc-websys/src/browser/protocol/upgrade.rs new file mode 100644 index 00000000000..e6f6d4d3485 --- /dev/null +++ b/transports/webrtc-websys/src/browser/protocol/upgrade.rs @@ -0,0 +1,44 @@ +use std::{future::Future, pin::Pin}; + +use futures::{AsyncRead, AsyncWrite}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_swarm::StreamProtocol; + +use crate::browser::{stream::SignalingStream, SIGNALING_STREAM_PROTOCOL}; + +pub struct SignalingProtocolUpgrade; + +impl UpgradeInfo for SignalingProtocolUpgrade { + type Info = StreamProtocol; + type InfoIter = std::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::once(SIGNALING_STREAM_PROTOCOL) + } +} + +impl InboundUpgrade for SignalingProtocolUpgrade +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = SignalingStream; + type Error = std::io::Error; + type Future = Pin> + Send>>; + + fn upgrade_inbound(self, socket: T, _info: Self::Info) -> Self::Future { + Box::pin(async move { Ok(SignalingStream::new(socket)) }) + } +} + +impl OutboundUpgrade for SignalingProtocolUpgrade +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = SignalingStream; + type Error = std::io::Error; + type Future = Pin> + Send>>; + + fn upgrade_outbound(self, socket: T, _info: Self::Info) -> Self::Future { + Box::pin(async move { Ok(SignalingStream::new(socket)) }) + } +} diff --git a/transports/webrtc-websys/src/browser/stream.rs b/transports/webrtc-websys/src/browser/stream.rs new file mode 100644 index 00000000000..154ab7bed73 --- /dev/null +++ b/transports/webrtc-websys/src/browser/stream.rs @@ -0,0 +1,45 @@ +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use quick_protobuf::{BytesReader, Error, MessageRead, MessageWrite, Writer}; + +use crate::browser::protocol::proto::signaling::SignalingMessage; + +/// A wrapper over an async stream for reading and writing a SignalingMessage. +pub struct SignalingStream { + inner: T, +} + +impl SignalingStream +where + T: AsyncRead + AsyncWrite + Unpin, +{ + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Encodes and writes a signaling message to the stream. + pub async fn write(&mut self, message: SignalingMessage) -> Result<(), Error> { + let mut buf = Vec::new(); + let mut writer = Writer::new(&mut buf); + message.write_message(&mut writer)?; + let len = buf.len() as u32; + + self.inner.write_all(&len.to_be_bytes()).await?; + self.inner.write_all(&buf).await?; + self.inner.flush().await?; + Ok(()) + } + + /// Reads and decodes a signaling message from the stream. + pub async fn read(&mut self) -> Result { + let mut len_buf = [0u8; 4]; + self.inner.read_exact(&mut len_buf).await?; + let len = u32::from_be_bytes(len_buf) as usize; + + let mut buf = vec![0u8; len]; + self.inner.read_exact(&mut buf).await?; + + let mut reader = BytesReader::from_bytes(&buf); + let message = SignalingMessage::from_reader(&mut reader, &buf)?; + Ok(message) + } +} diff --git a/transports/webrtc-websys/src/browser/transport.rs b/transports/webrtc-websys/src/browser/transport.rs new file mode 100644 index 00000000000..1e18add0e72 --- /dev/null +++ b/transports/webrtc-websys/src/browser/transport.rs @@ -0,0 +1,192 @@ +use std::{ + collections::{HashMap, VecDeque}, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::{channel::oneshot, lock::Mutex, task::AtomicWaker}; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + muxing::StreamMuxerBox, + transport::{Boxed, DialOpts, ListenerId, Transport as _, TransportError, TransportEvent}, +}; +use libp2p_identity::{Keypair, PeerId}; + +use crate::{ + browser::{behaviour::Behaviour, SignalingConfig}, + Connection, Error, +}; + +/// Config for the [`Transport`]. +#[derive(Debug, Clone)] +pub struct Config { + pub keypair: Keypair, +} + +/// Connection request from behavior to transport representing +/// a [`PeerId`] over a [`Connection`]. +pub struct ConnectionRequest { + pub peer_id: PeerId, + pub connection: Connection, +} + +/// A WebRTC [`Transport`](libp2p_core::Transport) that facilitates a `RtcPeerConnection` via +/// web-sys. +pub struct Transport { + #[allow(unused)] + config: Config, + /// Pending connection receivers waiting for WebRTC connections to be established + pending_dials: Arc>>>, + /// Established WebRTC connections ready to be injected into the swarm + established_connections: Arc>>, + /// Listeners for incoming WebRTC connections + listeners: HashMap, + poll_waker: Arc, +} + +impl Transport { + /// Constructs a new [`Transport`] with the given [`Config`] and [`Behaviour`] for Signaling. + pub fn new( + config: Config, + signaling_config: SignalingConfig, + poll_waker: Arc, + ) -> (Self, Behaviour) { + let pending_dials = Arc::new(Mutex::new(HashMap::new())); + let established_connections = Arc::new(Mutex::new(VecDeque::new())); + + let transport = Self { + config: config.clone(), + pending_dials: pending_dials.clone(), + established_connections: established_connections.clone(), + listeners: HashMap::new(), + poll_waker: poll_waker.clone(), + }; + + let behaviour = Behaviour::new( + signaling_config, + pending_dials.clone(), + established_connections.clone(), + poll_waker, + ); + + (transport, behaviour) + } + + /// Wraps `Transport` in and makes it ready to be consumed by SwarmBuilder. + pub fn boxed(self) -> Boxed<(PeerId, StreamMuxerBox)> { + self.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer))) + .boxed() + } + + /// Checks if a [`Multiaddr`] is a WebRTC address + fn is_webrtc_addr(addr: &Multiaddr) -> bool { + addr.iter().any(|p| matches!(p, Protocol::WebRTC)) + } + + /// Extracts peer ID from multiaddr + fn extract_peer_id(addr: &Multiaddr) -> Option { + addr.iter().find_map(|proto| { + if let Protocol::P2p(peer_id) = proto { + Some(peer_id) + } else { + None + } + }) + } +} + +impl libp2p_core::Transport for Transport { + type Output = (PeerId, crate::Connection); + type Error = Error; + type ListenerUpgrade = Pin> + Send>>; + type Dial = Pin> + Send>>; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + // Check if this is a WebRTC listener address + if Self::is_webrtc_addr(&addr) { + self.listeners.insert(id, addr.clone()); + Ok(()) + } else { + Err(TransportError::MultiaddrNotSupported(addr)) + } + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + self.listeners.remove(&id).is_some() + } + + fn dial( + &mut self, + addr: Multiaddr, + _opts: DialOpts, + ) -> Result> { + let has_circuit = addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)); + if has_circuit { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + if !Self::is_webrtc_addr(&addr) { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let peer_id = Self::extract_peer_id(&addr) + .ok_or_else(|| TransportError::Other(Error::InvalidMultiaddr(addr.to_string())))?; + + let pending_dials = self.pending_dials.clone(); + + Ok(Box::pin(async move { + let (tx, rx) = oneshot::channel(); + { + let mut dials = pending_dials.lock().await; + dials.insert(peer_id, tx); + } + + let connection = rx.await.map_err(|_| { + Error::Connection("WebRTC connection establishment cancelled".to_string()) + })?; + + Ok((peer_id, connection)) + })) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.poll_waker.register(cx.waker()); + + let established_connections = self.established_connections.clone(); + + let Some(mut connections) = established_connections.try_lock() else { + cx.waker().wake_by_ref(); + return Poll::Pending; + }; + + if let Some(conn_request) = connections.pop_front() { + if let Some((&listener_id, _)) = self.listeners.iter().next() { + let peer_id = conn_request.peer_id; + + let webrtc_addr = format!("/webrtc/p2p/{}", peer_id) + .parse::() + .expect("valid webrtc address"); + + let upgrade = Box::pin(async move { Ok((peer_id, conn_request.connection)) }); + + return Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade, + local_addr: webrtc_addr.clone(), + send_back_addr: webrtc_addr, + }); + } + } + + Poll::Pending + } +} diff --git a/transports/webrtc-websys/src/connection.rs b/transports/webrtc-websys/src/connection.rs index 01c1a8b3b60..ac82d36f957 100644 --- a/transports/webrtc-websys/src/connection.rs +++ b/transports/webrtc-websys/src/connection.rs @@ -14,16 +14,17 @@ use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use web_sys::{ RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelInit, RtcDataChannelType, - RtcSessionDescriptionInit, + RtcIceServer, RtcSessionDescriptionInit, }; use super::{Error, Stream}; -use crate::stream::DropListener; +use crate::{browser::protocol::ConnectionCallbacks, stream::DropListener}; /// A WebRTC Connection. /// /// All connections need to be [`Send`] which is why some fields are wrapped in [`SendWrapper`]. /// This is safe because WASM is single-threaded. +#[derive(Debug)] pub struct Connection { /// The [RtcPeerConnection] that is used for the WebRTC Connection inner: SendWrapper, @@ -40,11 +41,17 @@ pub struct Connection { no_drop_listeners_waker: Option, _ondatachannel_closure: SendWrapper>, + /// WebRTC event based callbacks which will be dropped once the Connection object has been + /// dropped. + _callbacks: Option>, } impl Connection { /// Create a new inner WebRTC Connection - pub(crate) fn new(peer_connection: RtcPeerConnection) -> Self { + pub(crate) fn new( + peer_connection: RtcPeerConnection, + callbacks: Option, + ) -> Self { // An ondatachannel Future enables us to poll for incoming data channel events in // poll_incoming let (mut tx_ondatachannel, rx_ondatachannel) = mpsc::channel(4); // we may get more than one data channel opened on a single peer connection @@ -74,6 +81,7 @@ impl Connection { no_drop_listeners_waker: None, inbound_data_channels: SendWrapper::new(rx_ondatachannel), _ondatachannel_closure: SendWrapper::new(ondatachannel_closure), + _callbacks: callbacks.map(SendWrapper::new), } } @@ -93,7 +101,6 @@ impl Connection { /// if they are used. fn close_connection(&mut self) { if !self.closed { - tracing::trace!("connection::close_connection"); self.inner.inner.close(); self.closed = true; } @@ -148,8 +155,6 @@ impl StreamMuxer for Connection { mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll> { - tracing::trace!("connection::poll_close"); - self.close_connection(); Poll::Ready(Ok(())) } @@ -173,12 +178,16 @@ impl StreamMuxer for Connection { } } +#[derive(Debug)] pub(crate) struct RtcPeerConnection { inner: web_sys::RtcPeerConnection, } impl RtcPeerConnection { - pub(crate) async fn new(algorithm: String) -> Result { + pub(crate) async fn new( + algorithm: String, + stun_servers: Option>, + ) -> Result { let algo: Object = Object::new(); Reflect::set(&algo, &"name".into(), &"ECDSA".into()).unwrap(); Reflect::set(&algo, &"namedCurve".into(), &"P-256".into()).unwrap(); @@ -196,11 +205,28 @@ impl RtcPeerConnection { certificate_arr.push(&certificate); config.set_certificates(&certificate_arr); + // configure rtc configuration with stun servers, if any + if let Some(servers) = stun_servers { + let ice_servers = js_sys::Array::new(); + for server in servers { + let stun_server = RtcIceServer::new(); + stun_server.set_url(&server); + ice_servers.push(&stun_server); + } + + config.set_ice_servers(&ice_servers.into()); + } + let inner = web_sys::RtcPeerConnection::new_with_configuration(&config)?; Ok(Self { inner }) } + /// Returns a reference to the underlying RtcPeerConnection. + pub(crate) fn inner(&self) -> &web_sys::RtcPeerConnection { + &self.inner + } + /// Creates the stream for the initial noise handshake. /// /// The underlying data channel MUST have `negotiated` set to `true` and carry the ID 0. diff --git a/transports/webrtc-websys/src/error.rs b/transports/webrtc-websys/src/error.rs index a2df1a182ea..ccb74bc63c4 100644 --- a/transports/webrtc-websys/src/error.rs +++ b/transports/webrtc-websys/src/error.rs @@ -5,7 +5,7 @@ use wasm_bindgen::{JsCast, JsValue}; #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Invalid multiaddr: {0}")] - InvalidMultiaddr(&'static str), + InvalidMultiaddr(String), #[error("JavaScript error: {0}")] Js(String), diff --git a/transports/webrtc-websys/src/lib.rs b/transports/webrtc-websys/src/lib.rs index 07207eb0ae8..c2dee88faa0 100644 --- a/transports/webrtc-websys/src/lib.rs +++ b/transports/webrtc-websys/src/lib.rs @@ -1,5 +1,6 @@ #![doc = include_str!("../README.md")] +pub mod browser; mod connection; mod error; mod sdp; @@ -8,6 +9,7 @@ mod transport; mod upgrade; pub use self::{ + browser::{Config as BrowserConfig, Transport as BrowserTransport}, connection::Connection, error::Error, stream::Stream, diff --git a/transports/webrtc-websys/src/upgrade.rs b/transports/webrtc-websys/src/upgrade.rs index b1de908ae82..cb2045b2991 100644 --- a/transports/webrtc-websys/src/upgrade.rs +++ b/transports/webrtc-websys/src/upgrade.rs @@ -24,7 +24,7 @@ async fn outbound_inner( remote_fingerprint: Fingerprint, id_keys: Keypair, ) -> Result<(PeerId, Connection), Error> { - let rtc_peer_connection = RtcPeerConnection::new(remote_fingerprint.algorithm()).await?; + let rtc_peer_connection = RtcPeerConnection::new(remote_fingerprint.algorithm(), None).await?; // Create stream for Noise handshake // Must create data channel before Offer is created for it to be included in the SDP @@ -53,5 +53,5 @@ async fn outbound_inner( tracing::debug!(peer=%peer_id, "Remote peer identified"); - Ok((peer_id, Connection::new(rtc_peer_connection))) + Ok((peer_id, Connection::new(rtc_peer_connection, None))) }