Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ddf8b19
Detect change on network interface to update locators and scouting
Mar 12, 2025
67da75e
Refactor scouting out of the runtime
Mar 13, 2025
b94c464
Move net::Runtime::scout to net::Scouting
Mar 13, 2025
8a3b311
Make the scouting routine create and close appropriate sockets on net…
Mar 13, 2025
3779614
Collect multiple netlink messages before updating the locators and th…
Mar 13, 2025
b5aaab5
Reformat code
Mar 13, 2025
4f9c5fb
Run clippy
Mar 13, 2025
56daad9
Hide the netlink monitoring behind cfg(target_os = "linux")
Mar 13, 2025
8c686fe
Add TODO
Apr 1, 2025
782021d
Merge tag '1.4.0' into feature/netlink-monitor
Jun 2, 2025
e74c967
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Jun 26, 2025
7378e00
Send locators updates to peers&routers
Jun 12, 2025
eec08cc
Use a 10s poll interval to see if locators have changed instead of NE…
Jun 25, 2025
cfd1111
Don't assume TransportManager::get_locators will always yield the loc…
Jun 26, 2025
d5418be
Add endpoint_poll_interval_ms to configure the network change detecti…
Jun 26, 2025
3d8d2b6
Add attributes to make the linter pass when the netlink feature is no…
Jun 5, 2025
fe2256b
Fix RandomState import
Jun 26, 2025
da5345e
Only start the endpoint poll routine if the duration is positive
Jun 26, 2025
b9f1f51
Fix memory leak at Runtime cleanup
Jun 27, 2025
bceee2d
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Jul 1, 2025
22010b3
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Aug 4, 2025
fb357b2
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Aug 28, 2025
4fb7c12
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Sep 22, 2025
3f079dc
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@
/// increase factor for the next timeout until next try
period_increase_factor: 2,
},
/// Interval in millisecond to check if the listening endpoints changed (i.e. when listening on 0.0.0.0 or [::]).
/// Also update the multicast scouting listening interfaces. Use -1 to disable.
endpoint_poll_interval_ms: 10000,
},

/// Configure the session open behavior.
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub mod listen {

pub const timeout_ms: ModeDependentValue<i64> = ModeDependentValue::Unique(0);
pub const exit_on_failure: ModeDependentValue<bool> = ModeDependentValue::Unique(true);

pub const endpoint_poll_interval_ms: Option<i64> = Some(10_000);
}

#[allow(non_upper_case_globals)]
Expand Down Expand Up @@ -180,6 +182,7 @@ impl Default for ListenConfig {
peer: Some(vec![]),
client: None,
}),
endpoint_poll_interval_ms: Some(10_000),
exit_on_failure: None,
retry: None,
}
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,9 @@ validated_struct::validator! {
/// if connection timeout exceed, exit from application
pub exit_on_failure: Option<ModeDependentValue<bool>>,
pub retry: Option<connection_retry::ConnectionRetryModeDependentConf>,
/// Interval in millisecond to check if the listening endpoints changed (e.g. when listening on 0.0.0.0).
/// Also update the multicast scouting listening interfaces. Use -1 to disable.
pub endpoint_poll_interval_ms: Option<i64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific reason to use i64 for purely positive value, can Option::None be used for disabling ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the idea from timeout_ms, which has the same semantic if I'm not mistaken (-1 = infinite / disabled).

After verification, I disable the poll if the interval is <= 0, not just < 0 , while timeout_ms actually has a timeout of 0 if you set 0.
I don't mind changing

},
/// Configure the session open behavior.
pub open: #[derive(Default)]
Expand Down
28 changes: 19 additions & 9 deletions commons/zenoh-util/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::net::{IpAddr, Ipv6Addr};
#[cfg(unix)]
use std::sync::RwLock;

#[cfg(unix)]
use lazy_static::lazy_static;
Expand All @@ -20,6 +22,8 @@ use pnet_datalink::NetworkInterface;
use tokio::net::{TcpSocket, UdpSocket};
use zenoh_core::zconfigurable;
#[cfg(unix)]
use zenoh_core::{zread, zwrite};
#[cfg(unix)]
use zenoh_result::zerror;
use zenoh_result::{bail, ZResult};

Expand All @@ -30,7 +34,7 @@ zconfigurable! {

#[cfg(unix)]
lazy_static! {
static ref IFACES: Vec<NetworkInterface> = pnet_datalink::interfaces();
static ref IFACES: RwLock<Vec<NetworkInterface>> = RwLock::new(pnet_datalink::interfaces());
}

#[cfg(windows)]
Expand Down Expand Up @@ -68,7 +72,7 @@ unsafe fn get_adapters_addresses(af_spec: i32) -> ZResult<Vec<u8>> {
pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
#[cfg(unix)]
{
for iface in IFACES.iter() {
for iface in zread!(IFACES).iter() {
if iface.name == name {
for ifaddr in &iface.ips {
if ifaddr.is_ipv4() {
Expand Down Expand Up @@ -131,7 +135,7 @@ pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
pub fn get_multicast_interfaces() -> Vec<IpAddr> {
#[cfg(unix)]
{
IFACES
zread!(IFACES)
.iter()
.filter_map(|iface| {
if iface.is_up() && iface.is_running() && iface.is_multicast() {
Expand All @@ -155,7 +159,7 @@ pub fn get_multicast_interfaces() -> Vec<IpAddr> {
pub fn get_local_addresses(interface: Option<&str>) -> ZResult<Vec<IpAddr>> {
#[cfg(unix)]
{
Ok(IFACES
Ok(zread!(IFACES)
.iter()
.filter(|iface| {
if let Some(interface) = interface.as_ref() {
Expand Down Expand Up @@ -205,7 +209,7 @@ pub fn get_local_addresses(interface: Option<&str>) -> ZResult<Vec<IpAddr>> {
pub fn get_unicast_addresses_of_multicast_interfaces() -> Vec<IpAddr> {
#[cfg(unix)]
{
IFACES
zread!(IFACES)
.iter()
.filter(|iface| iface.is_up() && iface.is_running() && iface.is_multicast())
.flat_map(|iface| {
Expand All @@ -228,7 +232,7 @@ pub fn get_unicast_addresses_of_multicast_interfaces() -> Vec<IpAddr> {
pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult<Vec<IpAddr>> {
#[cfg(unix)]
{
match IFACES.iter().find(|iface| iface.name == name) {
match zread!(IFACES).iter().find(|iface| iface.name == name) {
Some(iface) => {
if !iface.is_up() {
bail!("Interface {name} is not up");
Expand Down Expand Up @@ -282,7 +286,7 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult<Vec<IpAddr>> {
pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
#[cfg(unix)]
{
IFACES
zread!(IFACES)
.iter()
.find(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.index)
Expand Down Expand Up @@ -319,12 +323,12 @@ pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult<Vec<String>> {
#[cfg(unix)]
{
if addr.is_unspecified() {
Ok(IFACES
Ok(zread!(IFACES)
.iter()
.map(|iface| iface.name.clone())
.collect::<Vec<String>>())
} else {
Ok(IFACES
Ok(zread!(IFACES)
.iter()
.filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.name.clone())
Expand Down Expand Up @@ -435,6 +439,12 @@ pub fn get_ipv6_ipaddrs(interface: Option<&str>) -> Vec<IpAddr> {
.collect()
}

#[cfg(unix)]
pub fn update_iface_cache() {
let mut interfaces = zwrite!(IFACES);
*interfaces = pnet_datalink::interfaces();
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn set_bind_to_device_tcp_socket(socket: &TcpSocket, iface: &str) -> ZResult<()> {
socket.bind_device(Some(iface.as_bytes()))?;
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/api/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
builders::scouting::ScoutBuilder,
handlers::{Callback, CallbackParameter, DefaultHandler},
},
net::runtime::{orchestrator::Loop, Runtime},
net::runtime::{orchestrator::Loop, Runtime, Scouting},
Config,
};

Expand Down Expand Up @@ -181,7 +181,7 @@ pub(crate) fn _scout(
let task = TerminatableTask::spawn(
zenoh_runtime::ZRuntime::Acceptor,
async move {
let scout = Runtime::scout(&sockets, what, &addr, move |hello| {
let scout = Scouting::scout(&sockets, what, &addr, move |hello| {
let callback = callback.clone();
async move {
callback.call(hello.into());
Expand Down
15 changes: 15 additions & 0 deletions zenoh/src/net/protocol/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,21 @@ impl Network {
}
}

pub(crate) fn update_locators(&mut self) {
self.graph[self.idx].sn += 1;
self.send_on_links(
vec![(
self.idx,
Details {
zid: false,
locators: true,
links: self.full_linkstate || self.router_peers_failover_brokering,
},
)],
|link| link.transport.get_whatami().unwrap_or(WhatAmI::Peer) == WhatAmI::Router,
);
}

fn remove_detached_nodes(&mut self) -> Vec<(NodeIndex, Node)> {
let mut dfs_stack = vec![self.idx];
let mut visit_map = self.graph.visit_map();
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ impl HatBaseTrait for HatCode {
Ok(())
}

fn update_self_locators(&self, tables: &mut Tables) {
if let Some(net) = hat_mut!(tables).linkstatepeers_net.as_mut() {
net.update_locators();
}
}

fn close_face(
&self,
tables: &TablesLock,
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub(crate) trait HatBaseTrait {
send_declare: &mut SendDeclare,
) -> ZResult<()>;

fn update_self_locators(&self, _tables: &mut Tables) {}

fn handle_oam(
&self,
tables: &mut Tables,
Expand Down
15 changes: 15 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,4 +605,19 @@ impl Network {
}
vec![]
}

pub(super) fn update_locators(&mut self) {
self.graph[self.idx].sn += 1;
self.send_on_links(
vec![(
self.idx,
Details {
zid: false,
locators: true,
links: self.router_peers_failover_brokering,
},
)],
|link| link.transport.get_whatami().unwrap_or(WhatAmI::Peer) == WhatAmI::Router,
);
}
}
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ impl HatBaseTrait for HatCode {
Ok(())
}

fn update_self_locators(&self, tables: &mut Tables) {
if let Some(net) = hat_mut!(tables).gossip.as_mut() {
net.update_locators();
}
}

fn close_face(
&self,
tables: &TablesLock,
Expand Down
9 changes: 9 additions & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ impl HatBaseTrait for HatCode {
Ok(())
}

fn update_self_locators(&self, tables: &mut Tables) {
if let Some(net) = hat_mut!(tables).routers_net.as_mut() {
net.update_locators();
}
if let Some(net) = hat_mut!(tables).linkstatepeers_net.as_mut() {
net.update_locators();
}
}

fn close_face(
&self,
tables: &TablesLock,
Expand Down
44 changes: 44 additions & 0 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
mod adminspace;
pub mod orchestrator;
mod scouting;

#[cfg(feature = "plugins")]
use std::sync::{Mutex, MutexGuard};
Expand All @@ -29,11 +30,13 @@ use std::{
atomic::{AtomicU32, Ordering},
Arc, Weak,
},
time::Duration,
};

pub use adminspace::AdminSpace;
use async_trait::async_trait;
use futures::{stream::StreamExt, Future};
pub use scouting::Scouting;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uhlc::{HLCBuilder, HLC};
Expand All @@ -55,6 +58,8 @@ use zenoh_transport::{
multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler,
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
};
#[cfg(unix)]
use zenoh_util::net::update_iface_cache;

use self::orchestrator::StartConditions;
use super::{primitives::DeMux, routing, routing::router::Router};
Expand Down Expand Up @@ -87,6 +92,7 @@ pub(crate) struct RuntimeState {
plugins_manager: Mutex<PluginsManager>,
start_conditions: Arc<StartConditions>,
pending_connections: tokio::sync::Mutex<HashSet<ZenohIdProto>>,
scouting: tokio::sync::Mutex<Option<Scouting>>,
}

pub struct WeakRuntime {
Expand Down Expand Up @@ -175,6 +181,7 @@ impl RuntimeBuilder {
// SHM lazy init flag
#[cfg(feature = "shared-memory")]
let shm_init_mode = *config.transport.shared_memory.mode();
let endpoint_poll_interval = config.listen.endpoint_poll_interval_ms().unwrap_or(10_000);

let config = Notifier::new(crate::config::Config(config));
let runtime = Runtime {
Expand All @@ -193,6 +200,7 @@ impl RuntimeBuilder {
plugins_manager: Mutex::new(plugins_manager),
start_conditions: Arc::new(StartConditions::default()),
pending_connections: tokio::sync::Mutex::new(HashSet::new()),
scouting: tokio::sync::Mutex::new(None),
}),
};
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
Expand Down Expand Up @@ -240,6 +248,14 @@ impl RuntimeBuilder {
zenoh_config::ShmInitMode::Lazy => {}
};

if endpoint_poll_interval > 0 {
let poll_interval = Duration::from_millis(endpoint_poll_interval as u64);
runtime.spawn({
let runtime2 = runtime.clone();
async move { runtime2.monitor_available_addrs(poll_interval).await }
});
}

Ok(runtime)
}
}
Expand Down Expand Up @@ -366,6 +382,33 @@ impl Runtime {
pub(crate) async fn remove_pending_connection(&self, zid: &ZenohIdProto) -> bool {
self.state.pending_connections.lock().await.remove(zid)
}

async fn monitor_available_addrs(&self, poll_interval: Duration) {
let token = self.get_cancellation_token();
loop {
tokio::select! {
_ = tokio::time::sleep(poll_interval) => self.update_available_addrs().await,
_ = token.cancelled() => return,
}
}
}

async fn update_available_addrs(&self) {
#[cfg(unix)]
update_iface_cache();

if self.update_locators() {
let tables_lock = &self.state.router.tables;
let _ctrl_lock = zlock!(tables_lock.ctrl_lock);
let mut tables = zwrite!(tables_lock.tables);
tables_lock.hat_code.update_self_locators(&mut tables);
}

let scouting = self.state.scouting.lock().await;
if let Some(scouting) = scouting.as_ref() {
scouting.update_addrs_if_needed().await;
}
}
}

struct RuntimeTransportEventHandler {
Expand Down Expand Up @@ -547,6 +590,7 @@ impl Closee for Arc<RuntimeState> {
self.manager.close().await;
// clean up to break cyclic reference of self.state to itself
self.transport_handlers.write().unwrap().clear();
zasynclock!(self.scouting).take();
// TODO: the call below is needed to prevent intermittent leak
// due to not freed resource Arc, that apparently happens because
// the task responsible for resource clean up was aborted earlier than expected.
Expand Down
Loading
Loading