Skip to content

Commit 8f18ee4

Browse files
authored
support DSCP link config (#1937)
1 parent 2f955cc commit 8f18ee4

File tree

12 files changed

+150
-23
lines changed

12 files changed

+150
-23
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

DEFAULT_CONFIG.json5

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
3131
/// For TCP, UDP, Quic and TLS links, it is possible to specify a `bind` address for the local socket:
3232
/// E.g. tcp/192.168.0.1:7447#bind=192.168.0.1:0
33-
// Note!: Currently it is unsupported to specify both `bind` and `iface`.
33+
/// Note!: Currently it is unsupported to specify both `bind` and `iface`.
34+
///
35+
/// For TCP/UDP links, it's possible to specify the DSCP field of the IP header:
36+
/// E.g. tcp/192.168.0.1:7447#dscp=0x08
3437
connect: {
3538
/// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout)
3639
/// Accepts a single value (e.g. timeout_ms: 0)
@@ -77,6 +80,9 @@
7780
///
7881
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
7982
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
83+
///
84+
/// For TCP/UDP links, it's possible to specify the DSCP field of the IP header:
85+
/// E.g. tcp/192.168.0.1:7447#dscp=0x08
8086
listen: {
8187
/// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout)
8288
/// Accepts a single value (e.g. timeout_ms: 0)

io/zenoh-link-commons/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ futures = { workspace = true }
3535
rustls = { workspace = true, optional = true }
3636
rustls-webpki = { workspace = true, optional = true }
3737
serde = { workspace = true, features = ["default"] }
38+
socket2 = { workspace = true }
3839
time = { workspace = true }
3940
tokio = { workspace = true, features = [
4041
"fs",

io/zenoh-link-commons/src/dscp.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
//
2+
// Copyright (c) 2025 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <[email protected]>
13+
//
14+
use std::{net::SocketAddr, num::ParseIntError};
15+
16+
use tracing::warn;
17+
use zenoh_protocol::core::Config;
18+
use zenoh_result::{zerror, ZResult};
19+
20+
use crate::DSCP;
21+
22+
fn parse_int(s: &str) -> Result<u32, ParseIntError> {
23+
if let Some(s) = ["0x", "0X"].iter().find_map(|pfx| s.strip_prefix(pfx)) {
24+
u32::from_str_radix(s, 16)
25+
} else if let Some(s) = ["0xb", "0B"].iter().find_map(|pfx| s.strip_prefix(pfx)) {
26+
u32::from_str_radix(s, 2)
27+
} else {
28+
s.parse::<u32>()
29+
}
30+
}
31+
32+
/// Parse DSCP config.
33+
///
34+
/// It supports hexa/binary prefixes, as well as `|` operator, e.g. `dscp=0x04|0x10`.
35+
pub fn parse_dscp(config: &Config) -> ZResult<Option<u32>> {
36+
let Some(dscp) = config.get(DSCP) else {
37+
return Ok(None);
38+
};
39+
Ok(Some(
40+
dscp.split('|')
41+
.map(parse_int)
42+
.map(Result::ok)
43+
.reduce(|a, b| Some(a? | b?))
44+
.flatten()
45+
.ok_or_else(|| zerror!("Unknown DSCP argument: {dscp}"))?,
46+
))
47+
}
48+
49+
/// Set DSCP option to the socket if supported by the target.
50+
///
51+
/// If the target doesn't support it, a warning is emitted.
52+
/// IPv4 uses IP_TOS, while IPv6 uses IPV6_TCLASS
53+
pub fn set_dscp<'a>(
54+
socket: impl Into<socket2::SockRef<'a>>,
55+
addr: SocketAddr,
56+
dscp: u32,
57+
) -> std::io::Result<()> {
58+
match addr {
59+
#[cfg(not(any(
60+
target_os = "fuchsia",
61+
target_os = "redox",
62+
target_os = "solaris",
63+
target_os = "illumos",
64+
)))]
65+
SocketAddr::V4(_) => socket.into().set_tos(dscp)?,
66+
#[cfg(any(
67+
target_os = "android",
68+
target_os = "dragonfly",
69+
target_os = "freebsd",
70+
target_os = "fuchsia",
71+
target_os = "linux",
72+
target_os = "macos",
73+
target_os = "netbsd",
74+
target_os = "openbsd"
75+
))]
76+
SocketAddr::V6(_) => socket.into().set_tclass_v6(dscp)?,
77+
#[allow(unreachable_patterns)]
78+
SocketAddr::V4(_) => warn!(
79+
"IPv4 DSCP is unsupported on platform {}",
80+
std::env::consts::OS
81+
),
82+
#[allow(unreachable_patterns)]
83+
SocketAddr::V6(_) => warn!(
84+
"IPv6 DSCP is unsupported on platform {}",
85+
std::env::consts::OS
86+
),
87+
}
88+
Ok(())
89+
}

io/zenoh-link-commons/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
2020
extern crate alloc;
2121

22+
mod dscp;
2223
mod listener;
2324
mod multicast;
2425
pub mod tcp;
@@ -30,6 +31,7 @@ use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
3031
use core::{cmp::PartialEq, fmt, hash::Hash};
3132

3233
use async_trait::async_trait;
34+
pub use dscp::*;
3335
pub use listener::*;
3436
pub use multicast::*;
3537
use serde::Serialize;
@@ -48,6 +50,7 @@ pub const BIND_SOCKET: &str = "bind";
4850
pub const BIND_INTERFACE: &str = "iface";
4951
pub const TCP_SO_SND_BUF: &str = "so_sndbuf";
5052
pub const TCP_SO_RCV_BUF: &str = "so_rcvbuf";
53+
pub const DSCP: &str = "dscp";
5154

5255
#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
5356
pub struct Link {

io/zenoh-link-commons/src/tcp.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ use std::net::SocketAddr;
1616
use tokio::net::{TcpListener, TcpSocket, TcpStream};
1717
use zenoh_result::{zerror, ZResult};
1818

19+
use crate::set_dscp;
20+
1921
pub struct TcpSocketConfig<'a> {
2022
tx_buffer_size: Option<u32>,
2123
rx_buffer_size: Option<u32>,
2224
iface: Option<&'a str>,
2325
bind_socket: Option<SocketAddr>,
26+
dscp: Option<u32>,
2427
}
2528

2629
impl<'a> TcpSocketConfig<'a> {
@@ -29,12 +32,14 @@ impl<'a> TcpSocketConfig<'a> {
2932
rx_buffer_size: Option<u32>,
3033
iface: Option<&'a str>,
3134
bind_socket: Option<SocketAddr>,
35+
dscp: Option<u32>,
3236
) -> Self {
3337
Self {
3438
tx_buffer_size,
3539
rx_buffer_size,
3640
iface,
3741
bind_socket,
42+
dscp,
3843
}
3944
}
4045

@@ -119,6 +124,9 @@ impl<'a> TcpSocketConfig<'a> {
119124
if let Some(size) = self.rx_buffer_size {
120125
socket.set_recv_buffer_size(size)?;
121126
}
127+
if let Some(dscp) = self.dscp {
128+
set_dscp(&socket, *addr, dscp)?;
129+
}
122130

123131
Ok(socket)
124132
}

io/zenoh-links/zenoh-link-quic/src/unicast.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio_util::sync::CancellationToken;
3030
use x509_parser::prelude::{FromDer, X509Certificate};
3131
use zenoh_core::zasynclock;
3232
use zenoh_link_commons::{
33-
get_ip_interface_names,
33+
get_ip_interface_names, parse_dscp, set_dscp,
3434
tls::expiration::{LinkCertExpirationManager, LinkWithCertExpiration},
3535
LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP,
3636
NewLinkChannelSender, BIND_INTERFACE, BIND_SOCKET,
@@ -281,6 +281,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
281281
};
282282

283283
let socket = tokio::net::UdpSocket::bind(src_addr).await?;
284+
if let Some(dscp) = parse_dscp(&epconf)? {
285+
set_dscp(&socket, src_addr, dscp)?;
286+
}
284287

285288
// Initialize the Endpoint
286289
if let Some(iface) = client_crypto.bind_iface {

io/zenoh-links/zenoh-link-tcp/src/unicast.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
257257
)
258258
}
259259

260-
let link_config = TcpLinkConfig::new(&config).await?;
261-
let socket_config = TcpSocketConfig::new(
262-
link_config.tx_buffer_size,
263-
link_config.rx_buffer_size,
264-
link_config.bind_iface,
265-
link_config.bind_socket,
266-
);
260+
let socket_config = TcpSocketConfig::from(TcpLinkConfig::new(&config).await?);
267261

268262
let mut errs: Vec<ZError> = vec![];
269263
for da in dst_addrs {
@@ -294,8 +288,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
294288

295289
let config = endpoint.config();
296290

297-
let link_config = TcpLinkConfig::new(&config).await?;
298-
let socket_config: TcpSocketConfig<'_> = link_config.into();
291+
let socket_config = TcpSocketConfig::from(TcpLinkConfig::new(&config).await?);
299292

300293
let mut errs: Vec<ZError> = vec![];
301294
for da in addrs {

io/zenoh-links/zenoh-link-tcp/src/utils.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use std::net::SocketAddr;
1515

1616
use zenoh_config::Config as ZenohConfig;
1717
use zenoh_link_commons::{
18-
tcp::TcpSocketConfig, ConfigurationInspector, BIND_INTERFACE, BIND_SOCKET, TCP_SO_RCV_BUF,
19-
TCP_SO_SND_BUF,
18+
parse_dscp, tcp::TcpSocketConfig, ConfigurationInspector, BIND_INTERFACE, BIND_SOCKET,
19+
TCP_SO_RCV_BUF, TCP_SO_SND_BUF,
2020
};
2121
use zenoh_protocol::core::{parameters, Address, Config};
2222
use zenoh_result::{zerror, ZResult};
@@ -51,6 +51,7 @@ pub(crate) struct TcpLinkConfig<'a> {
5151
pub(crate) tx_buffer_size: Option<u32>,
5252
pub(crate) bind_iface: Option<&'a str>,
5353
pub(crate) bind_socket: Option<SocketAddr>,
54+
pub(crate) dscp: Option<u32>,
5455
}
5556

5657
impl<'a> TcpLinkConfig<'a> {
@@ -65,6 +66,7 @@ impl<'a> TcpLinkConfig<'a> {
6566
tx_buffer_size: None,
6667
bind_iface: config.get(BIND_INTERFACE),
6768
bind_socket,
69+
dscp: parse_dscp(config)?,
6870
};
6971

7072
if let Some(size) = config.get(TCP_SO_RCV_BUF) {
@@ -91,6 +93,7 @@ impl<'a> From<TcpLinkConfig<'a>> for TcpSocketConfig<'a> {
9193
value.rx_buffer_size,
9294
value.bind_iface,
9395
value.bind_socket,
96+
value.dscp,
9497
)
9598
}
9699
}

io/zenoh-links/zenoh-link-tls/src/utils.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use secrecy::ExposeSecret;
3232
use webpki::anchor_from_trusted_cert;
3333
use zenoh_config::Config as ZenohConfig;
3434
use zenoh_link_commons::{
35-
tcp::TcpSocketConfig, tls::WebPkiVerifierAnyServerName, ConfigurationInspector, BIND_INTERFACE,
36-
BIND_SOCKET, TCP_SO_RCV_BUF, TCP_SO_SND_BUF,
35+
parse_dscp, tcp::TcpSocketConfig, tls::WebPkiVerifierAnyServerName, ConfigurationInspector,
36+
BIND_INTERFACE, BIND_SOCKET, TCP_SO_RCV_BUF, TCP_SO_SND_BUF,
3737
};
3838
use zenoh_protocol::core::{
3939
endpoint::{Address, Config},
@@ -285,6 +285,7 @@ impl<'a> TlsServerConfig<'a> {
285285
tcp_rx_buffer_size,
286286
config.get(BIND_INTERFACE),
287287
bind_socket,
288+
parse_dscp(config)?,
288289
),
289290
})
290291
}
@@ -443,7 +444,6 @@ impl<'a> TlsClientConfig<'a> {
443444
.map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?,
444445
);
445446
};
446-
447447
let mut bind_socket = None;
448448
if let Some(bind_socket_str) = config.get(BIND_SOCKET) {
449449
bind_socket = Some(get_tls_addr(&Address::from(bind_socket_str)).await?);
@@ -457,6 +457,7 @@ impl<'a> TlsClientConfig<'a> {
457457
tcp_rx_buffer_size,
458458
config.get(BIND_INTERFACE),
459459
bind_socket,
460+
parse_dscp(config)?,
460461
),
461462
})
462463
}

0 commit comments

Comments
 (0)