From f3243c9c43c2ea2bb2be70b7232832692f20c250 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Mon, 4 Aug 2025 18:08:19 +0200 Subject: [PATCH] Merge `quic` and `quic_datagram` links The approach taken in 0351b5a wasn't necessary; unreliable QUIC can be implemented within the same link. --- Cargo.lock | 178 +++++- Cargo.toml | 2 - io/zenoh-link/Cargo.toml | 2 - io/zenoh-link/src/lib.rs | 67 +- io/zenoh-links/zenoh-link-quic/Cargo.toml | 5 - io/zenoh-links/zenoh-link-quic/src/unicast.rs | 206 ++++-- io/zenoh-links/zenoh-link-quic/src/utils.rs | 522 ---------------- .../zenoh-link-quic_datagram/Cargo.toml | 50 -- .../zenoh-link-quic_datagram/README.md | 6 - .../zenoh-link-quic_datagram/src/lib.rs | 119 ---- .../zenoh-link-quic_datagram/src/unicast.rs | 585 ------------------ io/zenoh-transport/Cargo.toml | 1 - io/zenoh-transport/src/unicast/manager.rs | 46 +- io/zenoh-transport/tests/unicast_transport.rs | 45 +- zenoh/Cargo.toml | 2 - 15 files changed, 315 insertions(+), 1521 deletions(-) delete mode 100644 io/zenoh-links/zenoh-link-quic/src/utils.rs delete mode 100644 io/zenoh-links/zenoh-link-quic_datagram/Cargo.toml delete mode 100644 io/zenoh-links/zenoh-link-quic_datagram/README.md delete mode 100644 io/zenoh-links/zenoh-link-quic_datagram/src/lib.rs delete mode 100644 io/zenoh-links/zenoh-link-quic_datagram/src/unicast.rs diff --git a/Cargo.lock b/Cargo.lock index 1d0c9711a2..76a99e5c30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -586,6 +586,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "itertools 0.10.5", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.87", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -703,6 +723,16 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "cache-padded" version = "1.3.0" @@ -721,6 +751,8 @@ version = "1.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -730,6 +762,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -815,6 +856,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.17" @@ -1625,6 +1677,12 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "gloo-timers" version = "0.3.0" @@ -1977,6 +2035,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.70" @@ -2061,6 +2128,12 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "leb128" version = "0.2.5" @@ -2099,6 +2172,32 @@ dependencies = [ "libc", ] +[[package]] +name = "librocksdb-sys" +version = "0.17.1+9.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b7869a512ae9982f4d46ba482c2a304f1efd80c6412a3d4bf57bb79a619679f" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -2131,6 +2230,16 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lz4_flex" version = "0.11.3" @@ -2736,6 +2845,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "plotters" version = "0.3.7" @@ -2950,7 +3065,7 @@ dependencies = [ "pin-project-lite 0.2.14", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.0.0", "rustls", "socket2 0.5.7", "thiserror 1.0.69", @@ -2967,7 +3082,7 @@ dependencies = [ "bytes", "rand 0.8.5", "ring", - "rustc-hash", + "rustc-hash 2.0.0", "rustls", "rustls-platform-verifier", "slab", @@ -3206,6 +3321,16 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "rocksdb" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26ec73b20525cb235bad420f911473b69f9fe27cc856c5461bccd7e4af037f43" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "ron" version = "0.8.1" @@ -3250,6 +3375,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.0.0" @@ -4755,6 +4886,12 @@ dependencies = [ "sval_serde", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -5414,7 +5551,6 @@ dependencies = [ "zenoh-config", "zenoh-link-commons", "zenoh-link-quic", - "zenoh-link-quic_datagram", "zenoh-link-serial", "zenoh-link-tcp", "zenoh-link-tls", @@ -5462,30 +5598,6 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.5.0" -dependencies = [ - "async-trait", - "base64 0.22.1", - "quinn", - "rustls", - "rustls-pemfile", - "rustls-webpki", - "secrecy", - "time 0.3.36", - "tokio", - "tokio-util", - "tracing", - "webpki-roots", - "zenoh-config", - "zenoh-core", - "zenoh-link-commons", - "zenoh-protocol", - "zenoh-result", - "zenoh-util", -] - -[[package]] -name = "zenoh-link-quic_datagram" -version = "1.5.0" dependencies = [ "async-trait", "quinn", @@ -5674,11 +5786,13 @@ dependencies = [ "base64ct", "home", "jsonschema", + "librocksdb-sys", "lz4_flex", "pest", "pest_derive", "pest_generator", "pest_meta", + "rocksdb", "static_init", "thiserror 1.0.69", "twox-hash", @@ -5974,3 +6088,13 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zstd-sys" +version = "2.0.14+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fb060d4926e4ac3a3ad15d864e99ceb5f343c6b34f5bd6d81ae6ed417311be5" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 3962ff6caf..190ad4ecca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ members = [ "io/zenoh-link", "io/zenoh-link-commons", "io/zenoh-links/zenoh-link-quic/", - "io/zenoh-links/zenoh-link-quic_datagram/", "io/zenoh-links/zenoh-link-serial", "io/zenoh-links/zenoh-link-tcp/", "io/zenoh-links/zenoh-link-tls/", @@ -218,7 +217,6 @@ zenoh-keyexpr = { version = "=1.5.0", path = "commons/zenoh-keyexpr", default-fe zenoh-link = { version = "=1.5.0", path = "io/zenoh-link" } zenoh-link-commons = { version = "=1.5.0", path = "io/zenoh-link-commons" } zenoh-link-quic = { version = "=1.5.0", path = "io/zenoh-links/zenoh-link-quic" } -zenoh-link-quic_datagram = { version = "=1.5.0", path = "io/zenoh-links/zenoh-link-quic_datagram" } zenoh-link-serial = { version = "=1.5.0", path = "io/zenoh-links/zenoh-link-serial" } zenoh-link-tcp = { version = "=1.5.0", path = "io/zenoh-links/zenoh-link-tcp" } zenoh-link-tls = { version = "=1.5.0", path = "io/zenoh-links/zenoh-link-tls" } diff --git a/io/zenoh-link/Cargo.toml b/io/zenoh-link/Cargo.toml index 51d5dc1da2..21dcb99f04 100644 --- a/io/zenoh-link/Cargo.toml +++ b/io/zenoh-link/Cargo.toml @@ -26,7 +26,6 @@ version = { workspace = true } [features] transport_quic = ["zenoh-link-quic"] -transport_quic_datagram = ["zenoh-link-quic_datagram"] transport_serial = ["zenoh-link-serial"] transport_tcp = ["zenoh-config/transport_tcp", "zenoh-link-tcp"] transport_tls = ["zenoh-link-tls"] @@ -43,7 +42,6 @@ transport_ws = ["zenoh-link-ws"] zenoh-config = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-link-quic = { workspace = true, optional = true } -zenoh-link-quic_datagram = { workspace = true, optional = true } zenoh-link-serial = { workspace = true, optional = true } zenoh-link-tcp = { workspace = true, optional = true } zenoh-link-tls = { workspace = true, optional = true } diff --git a/io/zenoh-link/src/lib.rs b/io/zenoh-link/src/lib.rs index 91b32ed5e9..84ab40b1b5 100644 --- a/io/zenoh-link/src/lib.rs +++ b/io/zenoh-link/src/lib.rs @@ -27,14 +27,6 @@ pub use zenoh_link_quic as quic; use zenoh_link_quic::{ LinkManagerUnicastQuic, QuicConfigurator, QuicLocatorInspector, QUIC_LOCATOR_PREFIX, }; -#[cfg(feature = "transport_quic_datagram")] -pub use zenoh_link_quic_datagram as quic_datagram; -#[cfg(all(feature = "transport_quic_datagram", not(feature = "transport_quic")))] -use zenoh_link_quic_datagram::QUIC_DATAGRAM_LOCATOR_PREFIX; -#[cfg(feature = "transport_quic_datagram")] -use zenoh_link_quic_datagram::{ - LinkManagerUnicastQuicDatagram, QuicDatagramConfigurator, QuicDatagramLocatorInspector, -}; #[cfg(feature = "transport_serial")] pub use zenoh_link_serial as serial; #[cfg(feature = "transport_serial")] @@ -84,7 +76,6 @@ use zenoh_result::{bail, ZResult}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum LinkKind { Quic, - QuicDatagram, Serial, Tcp, Tls, @@ -107,15 +98,8 @@ impl LinkKind { UDP_LOCATOR_PREFIX => supported_links.push(LinkKind::Udp), #[cfg(feature = "transport_tls")] TLS_LOCATOR_PREFIX => supported_links.push(LinkKind::Tls), - #[cfg(all(feature = "transport_quic_datagram", not(feature = "transport_quic")))] - QUIC_DATAGRAM_LOCATOR_PREFIX => supported_links.push(LinkKind::QuicDatagram), - #[cfg(all(feature = "transport_quic", not(feature = "transport_quic_datagram")))] - QUIC_LOCATOR_PREFIX => supported_links.push(LinkKind::QuicDatagram), - #[cfg(all(feature = "transport_quic", feature = "transport_quic_datagram"))] - QUIC_LOCATOR_PREFIX => { - supported_links.push(LinkKind::Quic); - supported_links.push(LinkKind::QuicDatagram); - } + #[cfg(feature = "transport_quic")] + QUIC_LOCATOR_PREFIX => supported_links.push(LinkKind::Quic), #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] UNIXSOCKSTREAM_LOCATOR_PREFIX => supported_links.push(LinkKind::UnixsockStream), #[cfg(feature = "transport_ws")] @@ -146,30 +130,8 @@ impl TryFrom<&Locator> for LinkKind { UDP_LOCATOR_PREFIX => Ok(LinkKind::Udp), #[cfg(feature = "transport_tls")] TLS_LOCATOR_PREFIX => Ok(LinkKind::Tls), - #[cfg(all(feature = "transport_quic_datagram", not(feature = "transport_quic")))] - QUIC_DATAGRAM_LOCATOR_PREFIX => { - if !QuicDatagramLocatorInspector.is_reliable(locator)? { - Ok(LinkKind::QuicDatagram) - } else { - Err(zenoh_result::zerror!("Attempted to use a reliable QUIC link without enabling the transport_quic feature").into()) - } - } - #[cfg(all(feature = "transport_quic", not(feature = "transport_quic_datagram")))] - QUIC_LOCATOR_PREFIX => { - if QuicLocatorInspector.is_reliable(locator)? { - Ok(LinkKind::Quic) - } else { - Err(zenoh_result::zerror!("Cannot use unreliable QUIC without enabling the transport_quic_datagram feature").into()) - } - } - #[cfg(all(feature = "transport_quic", feature = "transport_quic_datagram"))] - QUIC_LOCATOR_PREFIX => { - if QuicLocatorInspector.is_reliable(locator)? { - Ok(LinkKind::Quic) - } else { - Ok(LinkKind::QuicDatagram) - } - } + #[cfg(feature = "transport_quic")] + QUIC_LOCATOR_PREFIX => Ok(LinkKind::Quic), #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] UNIXSOCKSTREAM_LOCATOR_PREFIX => Ok(LinkKind::UnixsockStream), #[cfg(feature = "transport_ws")] @@ -199,8 +161,6 @@ impl TryFrom<&EndPoint> for LinkKind { pub const ALL_SUPPORTED_LINKS: &[LinkKind] = &[ #[cfg(feature = "transport_quic")] LinkKind::Quic, - #[cfg(feature = "transport_quic_datagram")] - LinkKind::QuicDatagram, #[cfg(feature = "transport_tcp")] LinkKind::Tcp, #[cfg(feature = "transport_tls")] @@ -223,8 +183,6 @@ pub const ALL_SUPPORTED_LINKS: &[LinkKind] = &[ pub struct LocatorInspector { #[cfg(feature = "transport_quic")] quic_inspector: QuicLocatorInspector, - #[cfg(feature = "transport_quic_datagram")] - quic_datagram_inspector: QuicDatagramLocatorInspector, #[cfg(feature = "transport_tcp")] tcp_inspector: TcpLocatorInspector, #[cfg(feature = "transport_tls")] @@ -255,8 +213,6 @@ impl LocatorInspector { LinkKind::Tls => self.tls_inspector.is_reliable(locator), #[cfg(feature = "transport_quic")] LinkKind::Quic => self.quic_inspector.is_reliable(locator), - #[cfg(feature = "transport_quic_datagram")] - LinkKind::QuicDatagram => self.quic_datagram_inspector.is_reliable(locator), #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] LinkKind::UnixsockStream => self.unixsock_stream_inspector.is_reliable(locator), #[cfg(feature = "transport_ws")] @@ -284,8 +240,6 @@ impl LocatorInspector { LinkKind::Tls => self.tls_inspector.is_multicast(locator).await, #[cfg(feature = "transport_quic")] LinkKind::Quic => self.quic_inspector.is_multicast(locator).await, - #[cfg(feature = "transport_quic_datagram")] - LinkKind::QuicDatagram => self.quic_datagram_inspector.is_multicast(locator).await, #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] LinkKind::UnixsockStream => self.unixsock_stream_inspector.is_multicast(locator).await, #[cfg(feature = "transport_ws")] @@ -305,8 +259,6 @@ impl LocatorInspector { pub struct LinkConfigurator { #[cfg(feature = "transport_tcp")] tcp_inspector: TcpConfigurator, - #[cfg(feature = "transport_quic_datagram")] - quic_datagram_inspector: QuicDatagramConfigurator, #[cfg(feature = "transport_quic")] quic_inspector: QuicConfigurator, #[cfg(feature = "transport_tls")] @@ -338,13 +290,6 @@ impl LinkConfigurator { { insert_config(LinkKind::Tcp, self.tcp_inspector.inspect_config(config)); } - #[cfg(feature = "transport_quic_datagram")] - { - insert_config( - LinkKind::QuicDatagram, - self.quic_datagram_inspector.inspect_config(config), - ); - } #[cfg(feature = "transport_quic")] { insert_config(LinkKind::Quic, self.quic_inspector.inspect_config(config)); @@ -384,10 +329,6 @@ impl LinkManagerBuilderUnicast { LinkKind::Udp => Ok(std::sync::Arc::new(LinkManagerUnicastUdp::new(_manager))), #[cfg(feature = "transport_tls")] LinkKind::Tls => Ok(std::sync::Arc::new(LinkManagerUnicastTls::new(_manager))), - #[cfg(feature = "transport_quic_datagram")] - LinkKind::QuicDatagram => Ok(std::sync::Arc::new(LinkManagerUnicastQuicDatagram::new( - _manager, - ))), #[cfg(feature = "transport_quic")] LinkKind::Quic => Ok(std::sync::Arc::new(LinkManagerUnicastQuic::new(_manager))), #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] diff --git a/io/zenoh-links/zenoh-link-quic/Cargo.toml b/io/zenoh-links/zenoh-link-quic/Cargo.toml index 653fee4eb2..e35243fe40 100644 --- a/io/zenoh-links/zenoh-link-quic/Cargo.toml +++ b/io/zenoh-links/zenoh-link-quic/Cargo.toml @@ -26,12 +26,9 @@ version = { workspace = true } [dependencies] async-trait = { workspace = true } -base64 = { workspace = true } quinn = { workspace = true } rustls = { workspace = true } -rustls-pemfile = { workspace = true } rustls-webpki = { workspace = true } -secrecy = { workspace = true } time = { workspace = true } tokio = { workspace = true, features = [ "fs", @@ -43,8 +40,6 @@ tokio = { workspace = true, features = [ # tokio-rustls = { workspace = true } tokio-util = { workspace = true, features = ["rt"] } tracing = { workspace = true } -webpki-roots = { workspace = true } -zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true, features = ["quic"] } zenoh-protocol = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 99097d738d..037a9be1ad 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -25,7 +25,7 @@ use quinn::{ EndpointConfig, }; use tokio::sync::Mutex as AsyncMutex; -use tokio_util::sync::CancellationToken; +use tokio_util::{bytes::Bytes, sync::CancellationToken}; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, parse_dscp, @@ -36,7 +36,7 @@ use zenoh_link_commons::{ set_dscp, tls::expiration::{LinkCertExpirationManager, LinkWithCertExpiration}, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, - NewLinkChannelSender, BIND_INTERFACE, BIND_SOCKET, + LocatorInspector, NewLinkChannelSender, BIND_INTERFACE, BIND_SOCKET, }; use zenoh_protocol::{ core::{Address, EndPoint, Locator}, @@ -45,36 +45,56 @@ use zenoh_protocol::{ use zenoh_result::{bail, zerror, ZResult}; use super::{QUIC_ACCEPT_THROTTLE_TIME, QUIC_DEFAULT_MTU, QUIC_LOCATOR_PREFIX}; +use crate::QuicLocatorInspector; pub struct LinkUnicastQuic { connection: quinn::Connection, + kind: LinkUnicastQuicKind, src_addr: SocketAddr, src_locator: Locator, dst_locator: Locator, - send: AsyncMutex, - recv: AsyncMutex, auth_identifier: LinkAuthId, expiration_manager: Option, } +#[derive(Debug)] +enum LinkUnicastQuicKind { + Unreliable, + Reliable { + send: AsyncMutex, + recv: AsyncMutex, + }, +} + +impl LinkUnicastQuicKind { + async fn finish(&self) -> ZResult<()> { + if let LinkUnicastQuicKind::Reliable { send, .. } = &self { + let mut guard = zasynclock!(send); + if let Err(e) = guard.finish() { + tracing::trace!("Error closing QUIC stream {self:?}: {e}"); + } + } + + Ok(()) + } +} + impl LinkUnicastQuic { fn new( - connection: quinn::Connection, + conn: quinn::Connection, + kind: LinkUnicastQuicKind, src_addr: SocketAddr, dst_locator: Locator, - send: quinn::SendStream, - recv: quinn::RecvStream, auth_identifier: LinkAuthId, expiration_manager: Option, ) -> LinkUnicastQuic { // Build the Quic object LinkUnicastQuic { - connection, + connection: conn, + kind, src_addr, src_locator: Locator::new(QUIC_LOCATOR_PREFIX, src_addr.to_string(), "").unwrap(), dst_locator, - send: AsyncMutex::new(send), - recv: AsyncMutex::new(recv), auth_identifier, expiration_manager, } @@ -83,10 +103,7 @@ impl LinkUnicastQuic { async fn close(&self) -> ZResult<()> { tracing::trace!("Closing QUIC link: {}", self); // Flush the QUIC stream - let mut guard = zasynclock!(self.send); - if let Err(e) = guard.finish() { - tracing::trace!("Error closing QUIC stream {}: {}", self, e); - } + self.kind.finish().await?; self.connection.close(quinn::VarInt::from_u32(0), &[0]); Ok(()) } @@ -110,49 +127,91 @@ impl LinkUnicastTrait for LinkUnicastQuic { } async fn write(&self, buffer: &[u8]) -> ZResult { - let mut guard = zasynclock!(self.send); - guard.write(buffer).await.map_err(|e| { - tracing::trace!("Write error on QUIC link {}: {}", self, e); - zerror!(e).into() - }) + match &self.kind { + LinkUnicastQuicKind::Unreliable => { + let amt = buffer.len(); + self.connection + .send_datagram(Bytes::copy_from_slice(buffer))?; + Ok(amt) + } + LinkUnicastQuicKind::Reliable { send, .. } => { + let mut guard = zasynclock!(send); + guard.write(buffer).await.map_err(|e| { + tracing::trace!("Write error on QUIC link {}: {}", self, e); + zerror!(e).into() + }) + } + } } async fn write_all(&self, buffer: &[u8]) -> ZResult<()> { - let mut guard = zasynclock!(self.send); - guard.write_all(buffer).await.map_err(|e| { - tracing::trace!("Write error on QUIC link {}: {}", self, e); - zerror!(e).into() - }) + match &self.kind { + LinkUnicastQuicKind::Unreliable => { + self.connection + .send_datagram(Bytes::copy_from_slice(buffer))?; + Ok(()) + } + LinkUnicastQuicKind::Reliable { send, .. } => { + let mut guard = zasynclock!(send); + guard.write_all(buffer).await.map_err(|e| { + tracing::trace!("Write error on QUIC link {}: {}", self, e); + zerror!(e).into() + }) + } + } } async fn read(&self, buffer: &mut [u8]) -> ZResult { - let mut guard = zasynclock!(self.recv); - guard - .read(buffer) - .await - .map_err(|e| { - let e = zerror!("Read error on QUIC link {}: {}", self, e); - tracing::trace!("{}", &e); - e - })? - .ok_or_else(|| { - let e = zerror!( - "Read error on QUIC link {}: stream {} has been closed", - self, - guard.id() - ); - tracing::trace!("{}", &e); - e.into() - }) + match &self.kind { + LinkUnicastQuicKind::Unreliable => { + let bytes = self.connection.read_datagram().await?; + buffer + .get_mut(..bytes.len()) + .map(|buffer| buffer.copy_from_slice(&bytes)) + .ok_or_else(|| { + zerror!( + "QUIC datagram of len {} cannot fit in RX buffer of len {}", + bytes.len(), + buffer.len() + ) + })?; + Ok(bytes.len()) + } + LinkUnicastQuicKind::Reliable { recv, .. } => { + let mut guard = zasynclock!(recv); + guard + .read(buffer) + .await + .map_err(|e| { + let e = zerror!("Read error on QUIC link {}: {}", self, e); + tracing::trace!("{}", &e); + e + })? + .ok_or_else(|| { + let e = zerror!( + "Read error on QUIC link {}: stream {} has been closed", + self, + guard.id() + ); + tracing::trace!("{}", &e); + e.into() + }) + } + } } async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> { - let mut guard = zasynclock!(self.recv); - guard.read_exact(buffer).await.map_err(|e| { - let e = zerror!("Read error on QUIC link {}: {}", self, e); - tracing::trace!("{}", &e); - e.into() - }) + match &self.kind { + LinkUnicastQuicKind::Unreliable => unreachable!(), + LinkUnicastQuicKind::Reliable { recv, .. } => { + let mut guard = zasynclock!(recv); + guard.read_exact(buffer).await.map_err(|e| { + let e = zerror!("Read error on QUIC link {}: {}", self, e); + tracing::trace!("{}", &e); + e.into() + }) + } + } } #[inline(always)] @@ -177,12 +236,12 @@ impl LinkUnicastTrait for LinkUnicastQuic { #[inline(always)] fn is_reliable(&self) -> bool { - super::IS_RELIABLE + matches!(&self.kind, LinkUnicastQuicKind::Reliable { .. }) } #[inline(always)] fn is_streamed(&self) -> bool { - true + matches!(&self.kind, LinkUnicastQuicKind::Reliable { .. }) } #[inline(always)] @@ -326,10 +385,19 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { .await .map_err(|e| zerror!("Can not create a new QUIC link bound to {}: {}", host, e))?; - let (send, recv) = quic_conn - .open_bi() - .await - .map_err(|e| zerror!("Can not open Quic bi-directional channel {}: {}", host, e))?; + let kind = if QuicLocatorInspector.is_reliable(&endpoint.to_locator())? { + let (send, recv) = quic_conn + .open_bi() + .await + .map_err(|e| zerror!("Can not open Quic bi-directional channel {}: {}", host, e))?; + + LinkUnicastQuicKind::Reliable { + send: AsyncMutex::new(send), + recv: AsyncMutex::new(recv), + } + } else { + LinkUnicastQuicKind::Unreliable + }; let auth_id = get_cert_common_name(&quic_conn)?; let certchain_expiration_time = @@ -349,10 +417,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { } LinkUnicastQuic::new( quic_conn, + kind, src_addr, endpoint.into(), - send, - recv, auth_id.into(), expiration_manager, ) @@ -451,6 +518,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { let task = { let token = token.clone(); let manager = self.manager.clone(); + let is_reliable = QuicLocatorInspector.is_reliable(&locator)?; async move { accept_task( @@ -458,6 +526,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { token, manager, server_crypto.tls_close_link_on_expiration, + is_reliable, ) .await } @@ -491,6 +560,7 @@ async fn accept_task( token: CancellationToken, manager: NewLinkChannelSender, tls_close_link_on_expiration: bool, + is_reliable: bool, ) -> ZResult<()> { async fn accept(acceptor: quinn::Accept<'_>) -> ZResult { let qc = acceptor @@ -520,13 +590,22 @@ async fn accept_task( res = accept(quic_endpoint.accept()) => { match res { Ok(quic_conn) => { - // Get the bideractional streams. Note that we don't allow unidirectional streams. - let (send, recv) = match quic_conn.accept_bi().await { - Ok(stream) => stream, - Err(e) => { - tracing::warn!("QUIC connection has no streams: {:?}", e); - continue; + let kind = if is_reliable { + // Get the bideractional streams. Note that we don't allow unidirectional streams. + let (send, recv) = match quic_conn.accept_bi().await { + Ok(stream) => stream, + Err(e) => { + tracing::warn!("QUIC connection has no streams: {:?}", e); + continue; + } + }; + + LinkUnicastQuicKind::Reliable { + send: AsyncMutex::new(send), + recv: AsyncMutex::new(recv), } + } else { + LinkUnicastQuicKind::Unreliable }; // Get the right source address in case an unsepecified IP (i.e. 0.0.0.0 or [::]) is used @@ -571,10 +650,9 @@ async fn accept_task( } LinkUnicastQuic::new( quic_conn, + kind, src_addr, dst_locator, - send, - recv, auth_id.into(), expiration_manager, ) diff --git a/io/zenoh-links/zenoh-link-quic/src/utils.rs b/io/zenoh-links/zenoh-link-quic/src/utils.rs deleted file mode 100644 index 415c926cb3..0000000000 --- a/io/zenoh-links/zenoh-link-quic/src/utils.rs +++ /dev/null @@ -1,522 +0,0 @@ -// -// Copyright (c) 2024 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::{ - fs::File, - io, - io::{BufReader, Cursor}, - net::SocketAddr, - sync::Arc, -}; - -use rustls::{ - pki_types::{CertificateDer, PrivateKeyDer, TrustAnchor}, - server::WebPkiClientVerifier, - version::TLS13, - ClientConfig, RootCertStore, ServerConfig, -}; -use secrecy::ExposeSecret; -use webpki::anchor_from_trusted_cert; -use zenoh_config::Config as ZenohConfig; -use zenoh_link_commons::{ - tls::WebPkiVerifierAnyServerName, ConfigurationInspector, BIND_INTERFACE, -}; -use zenoh_protocol::core::{ - endpoint::{Address, Config}, - parameters, -}; -use zenoh_result::{bail, zerror, ZError, ZResult}; - -use crate::config::*; - -#[derive(Default, Clone, Copy, Debug)] -pub struct TlsConfigurator; - -impl ConfigurationInspector for TlsConfigurator { - fn inspect_config(&self, config: &ZenohConfig) -> ZResult { - let mut ps: Vec<(&str, &str)> = vec![]; - - let c = config.transport().link().tls(); - - match (c.root_ca_certificate(), c.root_ca_certificate_base64()) { - (Some(_), Some(_)) => { - bail!("Only one between 'root_ca_certificate' and 'root_ca_certificate_base64' can be present!") - } - (Some(ca_certificate), None) => { - ps.push((TLS_ROOT_CA_CERTIFICATE_FILE, ca_certificate)); - } - (None, Some(ca_certificate)) => { - ps.push(( - TLS_ROOT_CA_CERTIFICATE_BASE64, - ca_certificate.expose_secret(), - )); - } - _ => {} - } - - match (c.listen_private_key(), c.listen_private_key_base64()) { - (Some(_), Some(_)) => { - bail!("Only one between 'listen_private_key' and 'listen_private_key_base64' can be present!") - } - (Some(server_private_key), None) => { - ps.push((TLS_LISTEN_PRIVATE_KEY_FILE, server_private_key)); - } - (None, Some(server_private_key)) => { - ps.push(( - TLS_LISTEN_PRIVATE_KEY_BASE64, - server_private_key.expose_secret(), - )); - } - _ => {} - } - - match (c.listen_certificate(), c.listen_certificate_base64()) { - (Some(_), Some(_)) => { - bail!("Only one between 'listen_certificate' and 'listen_certificate_base64' can be present!") - } - (Some(server_certificate), None) => { - ps.push((TLS_LISTEN_CERTIFICATE_FILE, server_certificate)); - } - (None, Some(server_certificate)) => { - ps.push(( - TLS_LISTEN_CERTIFICATE_BASE64, - server_certificate.expose_secret(), - )); - } - _ => {} - } - - match c.enable_mtls().unwrap_or(TLS_ENABLE_MTLS_DEFAULT) { - true => ps.push((TLS_ENABLE_MTLS, "true")), - false => ps.push((TLS_ENABLE_MTLS, "false")), - } - - match (c.connect_private_key(), c.connect_private_key_base64()) { - (Some(_), Some(_)) => { - bail!("Only one between 'connect_private_key' and 'connect_private_key_base64' can be present!") - } - (Some(client_private_key), None) => { - ps.push((TLS_CONNECT_PRIVATE_KEY_FILE, client_private_key)); - } - (None, Some(client_private_key)) => { - ps.push(( - TLS_CONNECT_PRIVATE_KEY_BASE64, - client_private_key.expose_secret(), - )); - } - _ => {} - } - - match (c.connect_certificate(), c.connect_certificate_base64()) { - (Some(_), Some(_)) => { - bail!("Only one between 'connect_certificate' and 'connect_certificate_base64' can be present!") - } - (Some(client_certificate), None) => { - ps.push((TLS_CONNECT_CERTIFICATE_FILE, client_certificate)); - } - (None, Some(client_certificate)) => { - ps.push(( - TLS_CONNECT_CERTIFICATE_BASE64, - client_certificate.expose_secret(), - )); - } - _ => {} - } - - match c - .verify_name_on_connect() - .unwrap_or(TLS_VERIFY_NAME_ON_CONNECT_DEFAULT) - { - true => ps.push((TLS_VERIFY_NAME_ON_CONNECT, "true")), - false => ps.push((TLS_VERIFY_NAME_ON_CONNECT, "false")), - }; - - match c - .close_link_on_expiration() - .unwrap_or(TLS_CLOSE_LINK_ON_EXPIRATION_DEFAULT) - { - true => ps.push((TLS_CLOSE_LINK_ON_EXPIRATION, "true")), - false => ps.push((TLS_CLOSE_LINK_ON_EXPIRATION, "false")), - } - - Ok(parameters::from_iter(ps.drain(..))) - } -} - -pub(crate) struct TlsServerConfig<'a> { - pub(crate) server_config: ServerConfig, - pub(crate) tls_close_link_on_expiration: bool, - pub(crate) bind_iface: Option<&'a str>, -} - -impl<'a> TlsServerConfig<'a> { - pub async fn new(config: &'a Config<'_>) -> ZResult { - let tls_server_client_auth: bool = match config.get(TLS_ENABLE_MTLS) { - Some(s) => s - .parse() - .map_err(|_| zerror!("Unknown enable mTLS argument: {}", s))?, - None => TLS_ENABLE_MTLS_DEFAULT, - }; - let tls_close_link_on_expiration: bool = match config.get(TLS_CLOSE_LINK_ON_EXPIRATION) { - Some(s) => s - .parse() - .map_err(|_| zerror!("Unknown close on expiration argument: {}", s))?, - None => TLS_CLOSE_LINK_ON_EXPIRATION_DEFAULT, - }; - let tls_server_private_key = TlsServerConfig::load_tls_private_key(config).await?; - let tls_server_certificate = TlsServerConfig::load_tls_certificate(config).await?; - - let certs: Vec = - rustls_pemfile::certs(&mut Cursor::new(&tls_server_certificate)) - .collect::>() - .map_err(|err| zerror!("Error processing server certificate: {err}."))?; - - let mut keys: Vec = - rustls_pemfile::rsa_private_keys(&mut Cursor::new(&tls_server_private_key)) - .map(|x| x.map(PrivateKeyDer::from)) - .collect::>() - .map_err(|err| zerror!("Error processing server key: {err}."))?; - - if keys.is_empty() { - keys = rustls_pemfile::pkcs8_private_keys(&mut Cursor::new(&tls_server_private_key)) - .map(|x| x.map(PrivateKeyDer::from)) - .collect::>() - .map_err(|err| zerror!("Error processing server key: {err}."))?; - } - - if keys.is_empty() { - keys = rustls_pemfile::ec_private_keys(&mut Cursor::new(&tls_server_private_key)) - .map(|x| x.map(PrivateKeyDer::from)) - .collect::>() - .map_err(|err| zerror!("Error processing server key: {err}."))?; - } - - if keys.is_empty() { - bail!("No private key found for TLS server."); - } - - // Install ring based rustls CryptoProvider. - rustls::crypto::ring::default_provider() - // This can be called successfully at most once in any process execution. - // Call this early in your process to configure which provider is used for the provider. - // The configuration should happen before any use of ClientConfig::builder() or ServerConfig::builder(). - .install_default() - // Ignore the error here, because `rustls::crypto::ring::default_provider().install_default()` will inevitably be executed multiple times - // when there are multiple quic links, and all but the first execution will fail. - .ok(); - - let sc = if tls_server_client_auth { - let root_cert_store = load_trust_anchors(config)?.map_or_else( - || Err(zerror!("Missing root certificates while mTLS is enabled.")), - Ok, - )?; - let client_auth = WebPkiClientVerifier::builder(root_cert_store.into()).build()?; - ServerConfig::builder_with_protocol_versions(&[&TLS13]) - .with_client_cert_verifier(client_auth) - .with_single_cert(certs, keys.remove(0)) - .map_err(|e| zerror!(e))? - } else { - ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, keys.remove(0)) - .map_err(|e| zerror!(e))? - }; - Ok(TlsServerConfig { - server_config: sc, - tls_close_link_on_expiration, - bind_iface: config.get(BIND_INTERFACE), - }) - } - - async fn load_tls_private_key(config: &Config<'_>) -> ZResult> { - load_tls_key( - config, - TLS_LISTEN_PRIVATE_KEY_RAW, - TLS_LISTEN_PRIVATE_KEY_FILE, - TLS_LISTEN_PRIVATE_KEY_BASE64, - ) - .await - } - - async fn load_tls_certificate(config: &Config<'_>) -> ZResult> { - load_tls_certificate( - config, - TLS_LISTEN_CERTIFICATE_RAW, - TLS_LISTEN_CERTIFICATE_FILE, - TLS_LISTEN_CERTIFICATE_BASE64, - ) - .await - } -} - -pub(crate) struct TlsClientConfig<'a> { - pub(crate) client_config: ClientConfig, - pub(crate) tls_close_link_on_expiration: bool, - pub(crate) bind_iface: Option<&'a str>, -} - -impl<'a> TlsClientConfig<'a> { - pub async fn new(config: &'a Config<'_>) -> ZResult { - let tls_client_server_auth: bool = match config.get(TLS_ENABLE_MTLS) { - Some(s) => s - .parse() - .map_err(|_| zerror!("Unknown enable mTLS argument: {}", s))?, - None => TLS_ENABLE_MTLS_DEFAULT, - }; - - let tls_server_name_verification: bool = match config.get(TLS_VERIFY_NAME_ON_CONNECT) { - Some(s) => s - .parse() - .map_err(|_| zerror!("Unknown server name verification argument: {}", s))?, - None => TLS_VERIFY_NAME_ON_CONNECT_DEFAULT, - }; - if !tls_server_name_verification { - tracing::warn!("Skipping name verification of QUIC server"); - } - - let tls_close_link_on_expiration: bool = match config.get(TLS_CLOSE_LINK_ON_EXPIRATION) { - Some(s) => s - .parse() - .map_err(|_| zerror!("Unknown close on expiration argument: {}", s))?, - None => TLS_CLOSE_LINK_ON_EXPIRATION_DEFAULT, - }; - - // Allows mixed user-generated CA and webPKI CA - tracing::debug!("Loading default Web PKI certificates."); - let mut root_cert_store = RootCertStore { - roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(), - }; - - if let Some(custom_root_cert) = load_trust_anchors(config)? { - tracing::debug!("Loading user-generated certificates."); - root_cert_store.extend(custom_root_cert.roots); - } - - // Install ring based rustls CryptoProvider. - rustls::crypto::ring::default_provider() - // This can be called successfully at most once in any process execution. - // Call this early in your process to configure which provider is used for the provider. - // The configuration should happen before any use of ClientConfig::builder() or ServerConfig::builder(). - .install_default() - // Ignore the error here, because `rustls::crypto::ring::default_provider().install_default()` will inevitably be executed multiple times - // when there are multiple quic links, and all but the first execution will fail. - .ok(); - - let cc = if tls_client_server_auth { - tracing::debug!("Loading client authentication key and certificate..."); - let tls_client_private_key = TlsClientConfig::load_tls_private_key(config).await?; - let tls_client_certificate = TlsClientConfig::load_tls_certificate(config).await?; - - let certs: Vec = - rustls_pemfile::certs(&mut Cursor::new(&tls_client_certificate)) - .collect::>() - .map_err(|err| zerror!("Error processing client certificate: {err}."))?; - - let mut keys: Vec = - rustls_pemfile::rsa_private_keys(&mut Cursor::new(&tls_client_private_key)) - .map(|x| x.map(PrivateKeyDer::from)) - .collect::>() - .map_err(|err| zerror!("Error processing client key: {err}."))?; - - if keys.is_empty() { - keys = - rustls_pemfile::pkcs8_private_keys(&mut Cursor::new(&tls_client_private_key)) - .map(|x| x.map(PrivateKeyDer::from)) - .collect::>() - .map_err(|err| zerror!("Error processing client key: {err}."))?; - } - - if keys.is_empty() { - keys = rustls_pemfile::ec_private_keys(&mut Cursor::new(&tls_client_private_key)) - .map(|x| x.map(PrivateKeyDer::from)) - .collect::>() - .map_err(|err| zerror!("Error processing client key: {err}."))?; - } - - if keys.is_empty() { - bail!("No private key found for TLS client."); - } - - let builder = ClientConfig::builder_with_protocol_versions(&[&TLS13]); - - if tls_server_name_verification { - builder - .with_root_certificates(root_cert_store) - .with_client_auth_cert(certs, keys.remove(0)) - } else { - builder - .dangerous() - .with_custom_certificate_verifier(Arc::new(WebPkiVerifierAnyServerName::new( - root_cert_store, - ))) - .with_client_auth_cert(certs, keys.remove(0)) - } - .map_err(|e| zerror!("Bad certificate/key: {}", e))? - } else { - let builder = ClientConfig::builder(); - if tls_server_name_verification { - builder - .with_root_certificates(root_cert_store) - .with_no_client_auth() - } else { - builder - .dangerous() - .with_custom_certificate_verifier(Arc::new(WebPkiVerifierAnyServerName::new( - root_cert_store, - ))) - .with_no_client_auth() - } - }; - Ok(TlsClientConfig { - client_config: cc, - tls_close_link_on_expiration, - bind_iface: config.get(BIND_INTERFACE), - }) - } - - async fn load_tls_private_key(config: &Config<'_>) -> ZResult> { - load_tls_key( - config, - TLS_CONNECT_PRIVATE_KEY_RAW, - TLS_CONNECT_PRIVATE_KEY_FILE, - TLS_CONNECT_PRIVATE_KEY_BASE64, - ) - .await - } - - async fn load_tls_certificate(config: &Config<'_>) -> ZResult> { - load_tls_certificate( - config, - TLS_CONNECT_CERTIFICATE_RAW, - TLS_CONNECT_CERTIFICATE_FILE, - TLS_CONNECT_CERTIFICATE_BASE64, - ) - .await - } -} - -fn process_pem(pem: &mut dyn io::BufRead) -> ZResult>> { - let certs: Vec = rustls_pemfile::certs(pem) - .map(|result| result.map_err(|err| zerror!("Error processing PEM certificates: {err}."))) - .collect::, ZError>>()?; - - let trust_anchors: Vec = certs - .into_iter() - .map(|cert| { - anchor_from_trusted_cert(&cert) - .map_err(|err| zerror!("Error processing trust anchor: {err}.")) - .map(|trust_anchor| trust_anchor.to_owned()) - }) - .collect::, ZError>>()?; - - Ok(trust_anchors) -} - -async fn load_tls_key( - config: &Config<'_>, - tls_private_key_raw_config_key: &str, - tls_private_key_file_config_key: &str, - tls_private_key_base64_config_key: &str, -) -> ZResult> { - if let Some(value) = config.get(tls_private_key_raw_config_key) { - return Ok(value.as_bytes().to_vec()); - } - - if let Some(b64_key) = config.get(tls_private_key_base64_config_key) { - return base64_decode(b64_key); - } - - if let Some(value) = config.get(tls_private_key_file_config_key) { - return Ok(tokio::fs::read(value) - .await - .map_err(|e| zerror!("Invalid TLS private key file: {}", e))?) - .and_then(|result| { - if result.is_empty() { - Err(zerror!("Empty TLS key.").into()) - } else { - Ok(result) - } - }); - } - Err(zerror!("Missing TLS private key.").into()) -} - -async fn load_tls_certificate( - config: &Config<'_>, - tls_certificate_raw_config_key: &str, - tls_certificate_file_config_key: &str, - tls_certificate_base64_config_key: &str, -) -> ZResult> { - if let Some(value) = config.get(tls_certificate_raw_config_key) { - return Ok(value.as_bytes().to_vec()); - } - - if let Some(b64_certificate) = config.get(tls_certificate_base64_config_key) { - return base64_decode(b64_certificate); - } - - if let Some(value) = config.get(tls_certificate_file_config_key) { - return Ok(tokio::fs::read(value) - .await - .map_err(|e| zerror!("Invalid TLS certificate file: {}", e))?); - } - Err(zerror!("Missing tls certificates.").into()) -} - -fn load_trust_anchors(config: &Config<'_>) -> ZResult> { - let mut root_cert_store = RootCertStore::empty(); - if let Some(value) = config.get(TLS_ROOT_CA_CERTIFICATE_RAW) { - let mut pem = BufReader::new(value.as_bytes()); - let trust_anchors = process_pem(&mut pem)?; - root_cert_store.extend(trust_anchors); - return Ok(Some(root_cert_store)); - } - - if let Some(b64_certificate) = config.get(TLS_ROOT_CA_CERTIFICATE_BASE64) { - let certificate_pem = base64_decode(b64_certificate)?; - let mut pem = BufReader::new(certificate_pem.as_slice()); - let trust_anchors = process_pem(&mut pem)?; - root_cert_store.extend(trust_anchors); - return Ok(Some(root_cert_store)); - } - - if let Some(filename) = config.get(TLS_ROOT_CA_CERTIFICATE_FILE) { - let mut pem = BufReader::new(File::open(filename)?); - let trust_anchors = process_pem(&mut pem)?; - root_cert_store.extend(trust_anchors); - return Ok(Some(root_cert_store)); - } - Ok(None) -} - -pub async fn get_quic_addr(address: &Address<'_>) -> ZResult { - match tokio::net::lookup_host(address.as_str()).await?.next() { - Some(addr) => Ok(addr), - None => bail!("Couldn't resolve QUIC locator address: {}", address), - } -} - -pub fn get_quic_host<'a>(address: &'a Address<'a>) -> ZResult<&'a str> { - address - .as_str() - .split(':') - .next() - .ok_or_else(|| zerror!("Invalid QUIC address").into()) -} - -pub fn base64_decode(data: &str) -> ZResult> { - use base64::{engine::general_purpose, Engine}; - Ok(general_purpose::STANDARD - .decode(data) - .map_err(|e| zerror!("Unable to perform base64 decoding: {e:?}"))?) -} diff --git a/io/zenoh-links/zenoh-link-quic_datagram/Cargo.toml b/io/zenoh-links/zenoh-link-quic_datagram/Cargo.toml deleted file mode 100644 index 57df1c8277..0000000000 --- a/io/zenoh-links/zenoh-link-quic_datagram/Cargo.toml +++ /dev/null @@ -1,50 +0,0 @@ -# -# Copyright (c) 2025 ZettaScale Technology -# -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License 2.0 which is available at -# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -# which is available at https://www.apache.org/licenses/LICENSE-2.0. -# -# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -# -# Contributors: -# ZettaScale Zenoh Team, -# -[package] -authors = { workspace = true } -categories = { workspace = true } -description = "Internal crate for zenoh." -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -name = "zenoh-link-quic_datagram" -repository = { workspace = true } -rust-version = { workspace = true } -version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = { workspace = true } -quinn = { workspace = true } -rustls = { workspace = true } -rustls-webpki = { workspace = true } -time = { workspace = true } -tokio = { workspace = true, features = [ - "fs", - "io-util", - "net", - "sync", - "time", -] } -# tokio-rustls = { workspace = true } -tokio-util = { workspace = true, features = ["rt"] } -tracing = { workspace = true } -zenoh-core = { workspace = true } -zenoh-link-commons = { workspace = true, features = ["quic"] } -zenoh-protocol = { workspace = true } -zenoh-result = { workspace = true } -zenoh-util = { workspace = true } - -[package.metadata.cargo-machete] -ignored = ["rustls-webpki"] diff --git a/io/zenoh-links/zenoh-link-quic_datagram/README.md b/io/zenoh-links/zenoh-link-quic_datagram/README.md deleted file mode 100644 index e97cecd513..0000000000 --- a/io/zenoh-links/zenoh-link-quic_datagram/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# ⚠️ WARNING ⚠️ - -This crate is intended for Zenoh's internal use. - -- [Click here for Zenoh's main repository](https://github.com/eclipse-zenoh/zenoh) -- [Click here for Zenoh's documentation](https://zenoh.io) diff --git a/io/zenoh-links/zenoh-link-quic_datagram/src/lib.rs b/io/zenoh-links/zenoh-link-quic_datagram/src/lib.rs deleted file mode 100644 index 68d84978b7..0000000000 --- a/io/zenoh-links/zenoh-link-quic_datagram/src/lib.rs +++ /dev/null @@ -1,119 +0,0 @@ -// -// Copyright (c) 2025 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! ⚠️ WARNING ⚠️ -//! -//! This crate is intended for Zenoh's internal use. -//! -//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) -use std::{str::FromStr, sync::Arc, time::Duration}; - -use async_trait::async_trait; -use zenoh_core::{zconfigurable, zerror}; -use zenoh_link_commons::LocatorInspector; -use zenoh_protocol::{ - core::{Config, Locator, Metadata, Reliability}, - transport::BatchSize, -}; -use zenoh_result::ZResult; - -mod unicast; -pub use unicast::*; -pub use zenoh_link_commons::quic::TlsConfigurator as QuicDatagramConfigurator; - -pub const QUIC_DATAGRAM_LOCATOR_PREFIX: &str = "quic"; - -// NOTE: this was copied from `zenoh-link-udp` -#[cfg(any(target_os = "linux", target_os = "windows"))] -const QUIC_DATAGRAM_MAX_MTU: BatchSize = u16::MAX - 8 - 40; -#[cfg(target_os = "macos")] -const QUIC_DATAGRAM_MAX_MTU: BatchSize = 9_216; -#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] -const QUIC_DATAGRAM_MAX_MTU: BatchSize = 8_192; - -const IS_RELIABLE: bool = false; - -#[derive(Default, Clone, Copy, Debug)] -pub struct QuicDatagramLocatorInspector; - -#[async_trait] -impl LocatorInspector for QuicDatagramLocatorInspector { - fn protocol(&self) -> &str { - QUIC_DATAGRAM_LOCATOR_PREFIX - } - - async fn is_multicast(&self, _locator: &Locator) -> ZResult { - Ok(false) - } - - fn is_reliable(&self, locator: &Locator) -> ZResult { - if let Some(reliability) = locator - .metadata() - .get(Metadata::RELIABILITY) - .map(Reliability::from_str) - .transpose()? - { - Ok(reliability == Reliability::Reliable) - } else { - Ok(IS_RELIABLE) - } - } -} - -zconfigurable! { - // Default MTU (QUIC PDU) in bytes. - static ref QUIC_DATAGRAM_DEFAULT_MTU: BatchSize = QUIC_DATAGRAM_MAX_MTU; - // The LINGER option causes the shutdown() call to block until (1) all application data is delivered - // to the remote end or (2) a timeout expires. The timeout is expressed in seconds. - // More info on the LINGER option and its dynamics can be found at: - // https://blog.netherlabs.nl/articles/2009/01/18/the-ultimate-so_linger-page-or-why-is-my-tcp-not-reliable - static ref QUIC_DATAGRAM_LINGER_TIMEOUT: i32 = 10; - // Amount of time in microseconds to throttle the accept loop upon an error. - // Default set to 100 ms. - static ref QUIC_DATAGRAM_ACCEPT_THROTTLE_TIME: u64 = 100_000; -} - -pub(crate) fn set_mtu_config( - config: Config, - server_config: &mut quinn::ServerConfig, -) -> ZResult<()> { - if let Some(initial_mtu) = config.get("initial_mtu") { - let initial_mtu = initial_mtu.parse::().map_err(|err| { - zerror!( - "could not parse QUIC Datagram endpoint's initial_mtu value `{initial_mtu}`: {err}" - ) - })?; - Arc::get_mut(&mut server_config.transport) - .unwrap() - .initial_mtu(initial_mtu); - } - - if let Some(mtu_discovery_interval_secs) = config.get("mtu_discovery_interval_secs") { - let mtu_discovery_interval = mtu_discovery_interval_secs - .parse::() - .map_err(|err| { - zerror!( - "could not parse QUIC Datagram endpoint's mtu_discovery_interval_secs value `{mtu_discovery_interval_secs}`: {err}" - ) - }) - .map(Duration::from_secs)?; - let mut mtu_discovery_config = quinn::MtuDiscoveryConfig::default(); - mtu_discovery_config.interval(mtu_discovery_interval); - Arc::get_mut(&mut server_config.transport) - .unwrap() - .mtu_discovery_config(Some(mtu_discovery_config)); - } - - Ok(()) -} diff --git a/io/zenoh-links/zenoh-link-quic_datagram/src/unicast.rs b/io/zenoh-links/zenoh-link-quic_datagram/src/unicast.rs deleted file mode 100644 index 77043ad323..0000000000 --- a/io/zenoh-links/zenoh-link-quic_datagram/src/unicast.rs +++ /dev/null @@ -1,585 +0,0 @@ -// -// Copyright (c) 2025 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use std::{ - fmt, - net::{Ipv4Addr, Ipv6Addr, SocketAddr}, - sync::Arc, - time::Duration, -}; - -use async_trait::async_trait; -use quinn::{ - crypto::rustls::{QuicClientConfig, QuicServerConfig}, - EndpointConfig, -}; -use tokio_util::{bytes::Bytes, sync::CancellationToken}; -use zenoh_link_commons::{ - get_ip_interface_names, parse_dscp, - quic::{ - get_cert_chain_expiration, get_cert_common_name, get_quic_addr, get_quic_host, - TlsClientConfig, TlsServerConfig, ALPN_QUIC_HTTP, - }, - set_dscp, - tls::expiration::{LinkCertExpirationManager, LinkWithCertExpiration}, - LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, - NewLinkChannelSender, BIND_INTERFACE, BIND_SOCKET, -}; -use zenoh_protocol::{ - core::{Address, EndPoint, Locator}, - transport::BatchSize, -}; -use zenoh_result::{bail, zerror, ZResult}; - -use super::{ - set_mtu_config, QUIC_DATAGRAM_ACCEPT_THROTTLE_TIME, QUIC_DATAGRAM_DEFAULT_MTU, - QUIC_DATAGRAM_LOCATOR_PREFIX, -}; - -pub struct LinkUnicastQuicDatagram { - connection: quinn::Connection, - src_addr: SocketAddr, - src_locator: Locator, - dst_locator: Locator, - auth_identifier: LinkAuthId, - expiration_manager: Option, -} - -impl LinkUnicastQuicDatagram { - fn new( - connection: quinn::Connection, - src_addr: SocketAddr, - dst_locator: Locator, - auth_identifier: LinkAuthId, - expiration_manager: Option, - ) -> LinkUnicastQuicDatagram { - // Build the Quic object - LinkUnicastQuicDatagram { - connection, - src_addr, - src_locator: Locator::new(QUIC_DATAGRAM_LOCATOR_PREFIX, src_addr.to_string(), "rel=0") - .unwrap(), - dst_locator, - auth_identifier, - expiration_manager, - } - } - - async fn close(&self) -> ZResult<()> { - tracing::trace!("Closing QUIC link: {}", self); - self.connection.close(quinn::VarInt::from_u32(0), &[0]); - Ok(()) - } -} - -#[async_trait] -impl LinkUnicastTrait for LinkUnicastQuicDatagram { - async fn close(&self) -> ZResult<()> { - if let Some(expiration_manager) = &self.expiration_manager { - if !expiration_manager.set_closing() { - // expiration_task is closing link, return its returned ZResult to Transport - return expiration_manager.wait_for_expiration_task().await; - } - // cancel the expiration task and close link - expiration_manager.cancel_expiration_task(); - let res = self.close().await; - let _ = expiration_manager.wait_for_expiration_task().await; - return res; - } - self.close().await - } - - async fn write(&self, buffer: &[u8]) -> ZResult { - let amt = buffer.len(); - self.connection - .send_datagram(Bytes::copy_from_slice(buffer))?; - Ok(amt) - } - - async fn write_all(&self, buffer: &[u8]) -> ZResult<()> { - self.connection - .send_datagram(Bytes::copy_from_slice(buffer))?; - Ok(()) - } - - async fn read(&self, buffer: &mut [u8]) -> ZResult { - let bytes = self.connection.read_datagram().await?; - buffer - .get_mut(..bytes.len()) - .map(|buffer| buffer.copy_from_slice(&bytes)) - .ok_or_else(|| { - zerror!( - "QUIC datagram of len {} cannot fit in RX buffer of len {}", - bytes.len(), - buffer.len() - ) - })?; - Ok(bytes.len()) - } - - async fn read_exact(&self, _: &mut [u8]) -> ZResult<()> { - unreachable!() - } - - #[inline(always)] - fn get_src(&self) -> &Locator { - &self.src_locator - } - - #[inline(always)] - fn get_dst(&self) -> &Locator { - &self.dst_locator - } - - #[inline(always)] - fn get_mtu(&self) -> BatchSize { - self.connection - .max_datagram_size() - .map(|mtu| mtu as BatchSize) - .unwrap_or(*QUIC_DATAGRAM_DEFAULT_MTU) - } - - #[inline(always)] - fn get_interface_names(&self) -> Vec { - get_ip_interface_names(&self.src_addr) - } - - #[inline(always)] - fn is_reliable(&self) -> bool { - super::IS_RELIABLE - } - - #[inline(always)] - fn is_streamed(&self) -> bool { - false - } - - #[inline(always)] - fn get_auth_id(&self) -> &LinkAuthId { - &self.auth_identifier - } -} - -#[async_trait] -impl LinkWithCertExpiration for LinkUnicastQuicDatagram { - async fn expire(&self) -> ZResult<()> { - let expiration_manager = self - .expiration_manager - .as_ref() - .expect("expiration_manager should be set"); - if expiration_manager.set_closing() { - return self.close().await; - } - // Transport is already closing the link - Ok(()) - } -} - -impl Drop for LinkUnicastQuicDatagram { - fn drop(&mut self) { - self.connection.close(quinn::VarInt::from_u32(0), &[0]); - } -} - -impl fmt::Display for LinkUnicastQuicDatagram { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{} => {}", - self.src_addr, - self.connection.remote_address() - )?; - Ok(()) - } -} - -impl fmt::Debug for LinkUnicastQuicDatagram { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("QuicDatagram") - .field("src", &self.src_addr) - .field("dst", &self.connection.remote_address()) - .finish() - } -} - -pub struct LinkManagerUnicastQuicDatagram { - manager: NewLinkChannelSender, - listeners: ListenersUnicastIP, -} - -impl LinkManagerUnicastQuicDatagram { - pub fn new(manager: NewLinkChannelSender) -> Self { - Self { - manager, - listeners: ListenersUnicastIP::new(), - } - } -} - -#[async_trait] -impl LinkManagerUnicastTrait for LinkManagerUnicastQuicDatagram { - async fn new_link(&self, endpoint: EndPoint) -> ZResult { - let epaddr = endpoint.address(); - let host = get_quic_host(&epaddr)?; - let epconf = endpoint.config(); - let dst_addr = get_quic_addr(&epaddr).await?; - - // if both `iface`, and `bind` are present, return error - if let (Some(_), Some(_)) = (epconf.get(BIND_INTERFACE), epconf.get(BIND_SOCKET)) { - bail!( - "Using Config options `iface` and `bind` in conjunction is unsupported at this time {} {:?}", - BIND_INTERFACE, - BIND_SOCKET - ) - } - - // Initialize the QUIC connection - let mut client_crypto = TlsClientConfig::new(&epconf).await.map_err(|e| { - zerror!("Cannot create a new unreliable QUIC client on {dst_addr}: {e}") - })?; - - client_crypto.client_config.alpn_protocols = - ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); - - let src_addr = if let Some(bind_socket_str) = epconf.get(BIND_SOCKET) { - get_quic_addr(&Address::from(bind_socket_str)).await? - } else if dst_addr.is_ipv4() { - SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0) - } else { - SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0) - }; - - let socket = tokio::net::UdpSocket::bind(src_addr).await?; - if let Some(dscp) = parse_dscp(&epconf)? { - set_dscp(&socket, src_addr, dscp)?; - } - - // Initialize the Endpoint - if let Some(iface) = client_crypto.bind_iface { - zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?; - }; - - let mut quic_endpoint = { - // create the Endpoint with this socket - let runtime = quinn::default_runtime() - .ok_or_else(|| std::io::Error::other("no async runtime found"))?; - ZResult::Ok(quinn::Endpoint::new_with_abstract_socket( - EndpointConfig::default(), - None, - runtime.wrap_udp_socket(socket.into_std()?)?, - runtime, - )?) - } - .map_err(|e| zerror!("Can not create a new unreliable QUIC link bound to {host}: {e}"))?; - - let quic_config: QuicClientConfig = client_crypto - .client_config - .try_into() - .map_err(|e| zerror!("Can not get unreliable QUIC config {host}: {e}"))?; - quic_endpoint.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_config))); - - let src_addr = quic_endpoint.local_addr().map_err(|e| { - zerror!( - "Can not get unreliable QUIC local_addr bound to {}: {}", - host, - e - ) - })?; - - let quic_conn = quic_endpoint - .connect(dst_addr, host) - .map_err(|e| { - zerror!( - "Can not get connect quick endpoint : {} : {} : {}", - dst_addr, - host, - e - ) - })? - .await - .map_err(|e| zerror!("Can not create a new QUIC link bound to {}: {}", host, e))?; - - let auth_id = get_cert_common_name(&quic_conn)?; - let certchain_expiration_time = - get_cert_chain_expiration(&quic_conn)?.expect("server should have certificate chain"); - - let link = Arc::::new_cyclic(|weak_link| { - let mut expiration_manager = None; - if client_crypto.tls_close_link_on_expiration { - // setup expiration manager - expiration_manager = Some(LinkCertExpirationManager::new( - weak_link.clone(), - src_addr, - dst_addr, - QUIC_DATAGRAM_LOCATOR_PREFIX, - certchain_expiration_time, - )) - } - LinkUnicastQuicDatagram::new( - quic_conn, - src_addr, - endpoint.into(), - auth_id.into(), - expiration_manager, - ) - }); - - Ok(LinkUnicast(link)) - } - - async fn new_listener(&self, endpoint: EndPoint) -> ZResult { - let epaddr = endpoint.address(); - let epconf = endpoint.config(); - - if epconf.is_empty() { - bail!("No unreliable QUIC configuration provided"); - }; - - let addr = get_quic_addr(&epaddr).await?; - let host = get_quic_host(&epaddr)?; - - // Server config - let mut server_crypto = TlsServerConfig::new(&epconf) - .await - .map_err(|e| zerror!("Cannot create a new unreliable QUIC listener on {addr}: {e}"))?; - server_crypto.server_config.alpn_protocols = - ALPN_QUIC_HTTP.iter().map(|&x| x.into()).collect(); - - // Install ring based rustls CryptoProvider. - rustls::crypto::ring::default_provider() - // This can be called successfully at most once in any process execution. - // Call this early in your process to configure which provider is used for the provider. - // The configuration should happen before any use of ClientConfig::builder() or ServerConfig::builder(). - .install_default() - // Ignore the error here, because `rustls::crypto::ring::default_provider().install_default()` will inevitably be executed multiple times - // when there are multiple quic links, and all but the first execution will fail. - .ok(); - - let quic_config: QuicServerConfig = server_crypto - .server_config - .try_into() - .map_err(|e| zerror!("Can not create a new unreliable QUIC listener on {addr}: {e}"))?; - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config)); - - // We do not accept unidireactional streams. - Arc::get_mut(&mut server_config.transport) - .unwrap() - .max_concurrent_uni_streams(0_u8.into()); - // For the time being we only allow one bidirectional stream - Arc::get_mut(&mut server_config.transport) - .unwrap() - .max_concurrent_bidi_streams(1_u8.into()); - - set_mtu_config(epconf, &mut server_config)?; - - // Initialize the Endpoint - let quic_endpoint = if let Some(iface) = server_crypto.bind_iface { - async { - // Bind the UDP socket - let socket = tokio::net::UdpSocket::bind(addr).await?; - zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?; - - // create the Endpoint with this socket - let runtime = quinn::default_runtime() - .ok_or_else(|| std::io::Error::other("no async runtime found"))?; - ZResult::Ok(quinn::Endpoint::new_with_abstract_socket( - EndpointConfig::default(), - Some(server_config), - runtime.wrap_udp_socket(socket.into_std()?)?, - runtime, - )?) - } - .await - } else { - quinn::Endpoint::server(server_config, addr).map_err(Into::into) - } - .map_err(|e| { - zerror!( - "Can not create a new unreliable QUIC listener on {}: {}", - addr, - e - ) - })?; - - let local_addr = quic_endpoint.local_addr().map_err(|e| { - zerror!( - "Can not create a new unreliable QUIC listener on {}: {}", - addr, - e - ) - })?; - let local_port = local_addr.port(); - - // Update the endpoint locator address - let locator = Locator::new( - endpoint.protocol(), - format!("{host}:{local_port}"), - endpoint.metadata(), - )?; - let endpoint = EndPoint::new( - locator.protocol(), - locator.address(), - locator.metadata(), - endpoint.config(), - )?; - - // Spawn the accept loop for the listener - let token = self.listeners.token.child_token(); - - let task = { - let token = token.clone(); - let manager = self.manager.clone(); - - async move { - accept_task( - quic_endpoint, - token, - manager, - server_crypto.tls_close_link_on_expiration, - ) - .await - } - }; - - // Initialize the QuicAcceptor - self.listeners - .add_listener(endpoint, local_addr, task, token) - .await?; - - Ok(locator) - } - - async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { - let epaddr = endpoint.address(); - let addr = get_quic_addr(&epaddr).await?; - self.listeners.del_listener(addr).await - } - - async fn get_listeners(&self) -> Vec { - self.listeners.get_endpoints() - } - - async fn get_locators(&self) -> Vec { - self.listeners.get_locators() - } -} - -async fn accept_task( - quic_endpoint: quinn::Endpoint, - token: CancellationToken, - manager: NewLinkChannelSender, - tls_close_link_on_expiration: bool, -) -> ZResult<()> { - async fn accept(acceptor: quinn::Accept<'_>) -> ZResult { - let qc = acceptor.await.ok_or_else(|| { - zerror!("Can not accept unreliable QUIC connections: acceptor closed") - })?; - - let conn = qc.await.map_err(|e| { - let e = zerror!("Unreliable QUIC acceptor failed: {:?}", e); - tracing::warn!("{}", e); - e - })?; - - Ok(conn) - } - - let src_addr = quic_endpoint - .local_addr() - .map_err(|e| zerror!("Can not accept unreliable QUIC connections: {}", e))?; - - // The accept future - tracing::trace!( - "Ready to accept unreliable QUIC connections on: {:?}", - src_addr - ); - - loop { - tokio::select! { - _ = token.cancelled() => break, - - res = accept(quic_endpoint.accept()) => { - match res { - Ok(quic_conn) => { - // Get the right source address in case an unsepecified IP (i.e. 0.0.0.0 or [::]) is used - let src_addr = match quic_conn.local_ip() { - Some(ip) => SocketAddr::new(ip, src_addr.port()), - None => { - tracing::debug!("Can not accept unreliable QUIC connection: empty local IP"); - continue; - } - }; - let dst_addr = quic_conn.remote_address(); - let dst_locator = Locator::new(QUIC_DATAGRAM_LOCATOR_PREFIX, dst_addr.to_string(), "")?; - // Get Quic auth identifier - let auth_id = get_cert_common_name(&quic_conn)?; - - // Get certificate chain expiration - let mut maybe_expiration_time = None; - if tls_close_link_on_expiration { - match get_cert_chain_expiration(&quic_conn)? { - exp @ Some(_) => maybe_expiration_time = exp, - None => tracing::warn!( - "Cannot monitor expiration for unreliable QUIC link {:?} => {:?} : client does not have certificates", - src_addr, - dst_addr, - ), - } - } - - tracing::debug!("Accepted unreliable QUIC connection on {:?}: {:?}. {:?}.", src_addr, dst_addr, auth_id); - // Create the new link object - let link = Arc::::new_cyclic(|weak_link| { - let mut expiration_manager = None; - if let Some(certchain_expiration_time) = maybe_expiration_time { - // setup expiration manager - expiration_manager = Some(LinkCertExpirationManager::new( - weak_link.clone(), - src_addr, - dst_addr, - QUIC_DATAGRAM_LOCATOR_PREFIX, - certchain_expiration_time, - )); - } - LinkUnicastQuicDatagram::new( - quic_conn, - src_addr, - dst_locator, - auth_id.into(), - expiration_manager, - ) - }); - - // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { - tracing::error!("{}-{}: {}", file!(), line!(), e) - } - - } - Err(e) => { - tracing::warn!("{} Hint: increase the system open file limit.", e); - // Throttle the accept loop upon an error - // NOTE: This might be due to various factors. However, the most common case is that - // the process has reached the maximum number of open files in the system. On - // Linux systems this limit can be changed by using the "ulimit" command line - // tool. In case of systemd-based systems, this can be changed by using the - // "sysctl" command line tool. - tokio::time::sleep(Duration::from_micros(*QUIC_DATAGRAM_ACCEPT_THROTTLE_TIME)).await; - } - } - } - } - } - Ok(()) -} diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 171efe1bce..909299dff2 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -40,7 +40,6 @@ transport_auth = [] transport_compression = [] transport_multilink = ["auth_pubkey"] transport_quic = ["zenoh-link/transport_quic"] -transport_quic_datagram = ["zenoh-link/transport_quic_datagram"] transport_serial = ["zenoh-link/transport_serial"] transport_tcp = ["zenoh-config/transport_tcp", "zenoh-link/transport_tcp"] transport_tls = ["zenoh-link/transport_tls"] diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 60bc6633ac..07140c7ba0 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -30,7 +30,7 @@ use zenoh_core::{zasynclock, zcondfeat}; use zenoh_crypto::PseudoRng; use zenoh_link::*; use zenoh_protocol::{ - core::{parameters, Reliability, ZenohIdProto}, + core::{parameters, ZenohIdProto}, transport::{close, TransportSn}, }; use zenoh_result::{bail, zerror, ZResult}; @@ -76,42 +76,11 @@ pub struct TransportManagerConfigUnicast { pub is_compression: bool, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct LinkKey { - protocol: String, - reliability: Option, -} - -impl LinkKey { - pub fn with_protocol(protocol: &str) -> Self { - LinkKey { - protocol: protocol.to_string(), - reliability: None, - } - } -} - -impl From<&EndPoint> for LinkKey { - fn from(endpoint: &EndPoint) -> Self { - let protocol = endpoint.protocol().to_string(); - let reliability = Reliability::from( - LocatorInspector::default() - .is_reliable(&endpoint.to_locator()) - .expect("endpoint protocol should be valid"), - ); - - Self { - protocol, - reliability: Some(reliability), - } - } -} - pub struct TransportManagerStateUnicast { // Incoming uninitialized transports pub(super) incoming: Arc, // Established listeners - pub(super) link_managers: Arc>>, + pub(super) link_managers: Arc>>, // Established transports pub(super) transports: Arc>>>, // Multilink @@ -382,19 +351,19 @@ impl TransportManager { } let mut w_guard = zasynclock!(self.state.unicast.link_managers); - let key = LinkKey::from(endpoint); - if let Some(lm) = w_guard.get(&key) { + if let Some(lm) = w_guard.get(&link_kind) { Ok(lm.clone()) } else { let lm = LinkManagerBuilderUnicast::make(self.new_unicast_link_sender.clone(), endpoint)?; - w_guard.insert(key, lm.clone()); + w_guard.insert(link_kind, lm.clone()); Ok(lm) } } async fn get_link_manager_unicast(&self, endpoint: &EndPoint) -> ZResult { - match zasynclock!(self.state.unicast.link_managers).get(&LinkKey::from(endpoint)) { + let link_kind = LinkKind::try_from(endpoint)?; + match zasynclock!(self.state.unicast.link_managers).get(&link_kind) { Some(manager) => Ok(manager.clone()), None => bail!( "Can not get the link manager for protocol ({}) because it has not been found", @@ -404,7 +373,8 @@ impl TransportManager { } async fn del_link_manager_unicast(&self, endpoint: &EndPoint) -> ZResult<()> { - match zasynclock!(self.state.unicast.link_managers).remove(&LinkKey::from(endpoint)) { + let link_kind = LinkKind::try_from(endpoint)?; + match zasynclock!(self.state.unicast.link_managers).remove(&link_kind) { Some(_) => Ok(()), None => bail!( "Can not delete the link manager for protocol ({}) because it has not been found.", diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 07d548fd18..7604c619c4 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -65,11 +65,7 @@ use zenoh_transport::{ // will validate the key and certificate brought in by the server in front of the client. // #[cfg(all( - any( - feature = "transport_tls", - feature = "transport_quic", - feature = "transport_quic_datagram" - ), + any(feature = "transport_tls", feature = "transport_quic",), target_family = "unix" ))] const CLIENT_KEY: &str = "-----BEGIN RSA PRIVATE KEY----- @@ -101,11 +97,7 @@ F6/CuIw9EsAq6qIB8O88FXQqald+BZOx6AzB8Oedsz/WtMmIEmr/+Q== -----END RSA PRIVATE KEY-----"; #[cfg(all( - any( - feature = "transport_tls", - feature = "transport_quic", - feature = "transport_quic_datagram" - ), + any(feature = "transport_tls", feature = "transport_quic",), target_family = "unix" ))] const CLIENT_CERT: &str = "-----BEGIN CERTIFICATE----- @@ -130,11 +122,7 @@ abY= -----END CERTIFICATE-----"; #[cfg(all( - any( - feature = "transport_tls", - feature = "transport_quic", - feature = "transport_quic_datagram" - ), + any(feature = "transport_tls", feature = "transport_quic",), target_family = "unix" ))] const CLIENT_CA: &str = "-----BEGIN CERTIFICATE----- @@ -158,11 +146,7 @@ Ck0v2xSPAiVjg6w65rUQeW6uB5m0T2wyj+wm0At8vzhZPlgS1fKhcmT2dzOq3+oN R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== -----END CERTIFICATE-----"; -#[cfg(any( - feature = "transport_tls", - feature = "transport_quic", - feature = "transport_quic_datagram" -))] +#[cfg(any(feature = "transport_tls", feature = "transport_quic",))] const SERVER_KEY: &str = "-----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAmDCySqKHPmEZShDH3ldPaV/Zsh9+HlHFLk9H10vJZj5WfzVu 5puZQ8GvBFIOtVrl0L9qLkA6bZiHHXm/8OEVvd135ZMp4NV23fdTsEASXfvGVQY8 @@ -191,11 +175,7 @@ H7HZKUCly2lCIizZdDVBkz4AWvaJlRc/3lE2Hd3Es6E52kTvROVKhdz06xuS8t5j ESElGO6qXEA821RpQp+2+uhL90+iC294cPqlS5LDmvTMypVDHzrxPQ== -----END RSA PRIVATE KEY-----"; -#[cfg(any( - feature = "transport_tls", - feature = "transport_quic", - feature = "transport_quic_datagram" -))] +#[cfg(any(feature = "transport_tls", feature = "transport_quic",))] const SERVER_CERT: &str = "-----BEGIN CERTIFICATE----- MIIDLjCCAhagAwIBAgIIW1mAtJWJAJYwDQYJKoZIhvcNAQELBQAwIDEeMBwGA1UE AxMVbWluaWNhIHJvb3QgY2EgNGRjYzJmMCAXDTIzMDMwNjE2NDEwNloYDzIxMjMw @@ -217,11 +197,7 @@ C1LSpiiQUaRSglOvYf/Zx6r+4BOS4OaaArwHkecZQqBSCcBLEAyb/FaaXdBowI0U PQ4= -----END CERTIFICATE-----"; -#[cfg(any( - feature = "transport_tls", - feature = "transport_quic", - feature = "transport_quic_datagram" -))] +#[cfg(any(feature = "transport_tls", feature = "transport_quic",))] const SERVER_CA: &str = "-----BEGIN CERTIFICATE----- MIIDSzCCAjOgAwIBAgIITcwv1N10nqEwDQYJKoZIhvcNAQELBQAwIDEeMBwGA1UE AxMVbWluaWNhIHJvb3QgY2EgNGRjYzJmMCAXDTIzMDMwNjE2NDEwNloYDzIxMjMw @@ -252,7 +228,6 @@ const MSG_SIZE_ALL: [usize; 3] = [1_024, 131_072, 100 * 1024 * 1024]; #[cfg(any( feature = "transport_tcp", feature = "transport_udp", - feature = "transport_quic_datagram", feature = "transport_unixsock-stream", ))] const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; @@ -1583,7 +1558,7 @@ fn transport_unicast_qos_and_lowlatency_failure() { assert!(good_manager2.is_ok()); } -#[cfg(all(feature = "transport_quic_datagram", target_family = "unix"))] +#[cfg(all(feature = "transport_quic", target_family = "unix"))] #[test] fn transport_unicast_quic_datagram_only_mutual_wrong_client_certs_failure() { use zenoh_link_commons::tls::config::*; @@ -1654,7 +1629,7 @@ fn transport_unicast_quic_datagram_only_mutual_wrong_client_certs_failure() { assert!(result.is_err()); } -#[cfg(all(feature = "transport_quic_datagram", target_family = "unix"))] +#[cfg(all(feature = "transport_quic", target_family = "unix"))] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn transport_unicast_quic_datagram_only_mutual_no_client_certs_failure() { use std::vec; @@ -1720,7 +1695,7 @@ async fn transport_unicast_quic_datagram_only_mutual_no_client_certs_failure() { assert!(result.is_err()); } -#[cfg(all(feature = "transport_quic_datagram", target_family = "unix"))] +#[cfg(all(feature = "transport_quic", target_family = "unix"))] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn transport_unicast_quic_datagram_only_mutual_success() { use zenoh_link_commons::tls::config::*; @@ -1783,7 +1758,7 @@ async fn transport_unicast_quic_datagram_only_mutual_success() { .await; } -#[cfg(feature = "transport_quic_datagram")] +#[cfg(feature = "transport_quic")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn transport_unicast_quic_datagram_only_server() { use zenoh_link_commons::tls::config::*; diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 39d7470fe7..d7f055e506 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -37,7 +37,6 @@ default = [ "transport_compression", "transport_multilink", "transport_quic", - "transport_quic_datagram", "transport_tcp", "transport_tls", "transport_udp", @@ -66,7 +65,6 @@ tracing-instrument = [ transport_compression = ["zenoh-transport/transport_compression"] transport_multilink = ["zenoh-transport/transport_multilink"] transport_quic = ["zenoh-transport/transport_quic"] -transport_quic_datagram = ["zenoh-transport/transport_quic_datagram"] transport_serial = ["zenoh-transport/transport_serial"] transport_tcp = ["zenoh-config/transport_tcp", "zenoh-transport/transport_tcp"] transport_tls = ["zenoh-transport/transport_tls"]