Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ddf8b19
Detect change on network interface to update locators and scouting
Mar 12, 2025
67da75e
Refactor scouting out of the runtime
Mar 13, 2025
b94c464
Move net::Runtime::scout to net::Scouting
Mar 13, 2025
8a3b311
Make the scouting routine create and close appropriate sockets on net…
Mar 13, 2025
3779614
Collect multiple netlink messages before updating the locators and th…
Mar 13, 2025
b5aaab5
Reformat code
Mar 13, 2025
4f9c5fb
Run clippy
Mar 13, 2025
56daad9
Hide the netlink monitoring behind cfg(target_os = "linux")
Mar 13, 2025
8c686fe
Add TODO
Apr 1, 2025
782021d
Merge tag '1.4.0' into feature/netlink-monitor
Jun 2, 2025
e74c967
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Jun 26, 2025
7378e00
Send locators updates to peers&routers
Jun 12, 2025
eec08cc
Use a 10s poll interval to see if locators have changed instead of NE…
Jun 25, 2025
cfd1111
Don't assume TransportManager::get_locators will always yield the loc…
Jun 26, 2025
d5418be
Add endpoint_poll_interval_ms to configure the network change detecti…
Jun 26, 2025
3d8d2b6
Add attributes to make the linter pass when the netlink feature is no…
Jun 5, 2025
fe2256b
Fix RandomState import
Jun 26, 2025
da5345e
Only start the endpoint poll routine if the duration is positive
Jun 26, 2025
b9f1f51
Fix memory leak at Runtime cleanup
Jun 27, 2025
bceee2d
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Jul 1, 2025
22010b3
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Aug 4, 2025
fb357b2
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Aug 28, 2025
4fb7c12
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Sep 22, 2025
3f079dc
Merge remote-tracking branch 'upstream/main' into feature/endpoint-au…
Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@
/// increase factor for the next timeout until next try
period_increase_factor: 2,
},
/// Interval in millisecond to check if the listening endpoints changed (i.e. when listening on 0.0.0.0 or [::]).
/// Also update the multicast scouting listening interfaces. Use -1 to disable.
endpoint_poll_interval_ms: 10000,
},
/// Configure the session open behavior.
open: {
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub mod listen {

pub const timeout_ms: ModeDependentValue<i64> = ModeDependentValue::Unique(0);
pub const exit_on_failure: ModeDependentValue<bool> = ModeDependentValue::Unique(true);

pub const endpoint_poll_interval_ms: Option<i64> = Some(10_000);
}

#[allow(non_upper_case_globals)]
Expand Down Expand Up @@ -180,6 +182,7 @@ impl Default for ListenConfig {
peer: Some(vec![]),
client: None,
}),
endpoint_poll_interval_ms: Some(10_000),
exit_on_failure: None,
retry: None,
}
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ validated_struct::validator! {
/// if connection timeout exceed, exit from application
pub exit_on_failure: Option<ModeDependentValue<bool>>,
pub retry: Option<connection_retry::ConnectionRetryModeDependentConf>,
/// Interval in millisecond to check if the listening endpoints changed (e.g. when listening on 0.0.0.0).
/// Also update the multicast scouting listening interfaces. Use -1 to disable.
pub endpoint_poll_interval_ms: Option<i64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific reason to use i64 for purely positive value, can Option::None be used for disabling ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the idea from timeout_ms, which has the same semantic if I'm not mistaken (-1 = infinite / disabled).

After verification, I disable the poll if the interval is <= 0, not just < 0 , while timeout_ms actually has a timeout of 0 if you set 0.
I don't mind changing

},
/// Configure the session open behavior.
pub open: #[derive(Default)]
Expand Down
28 changes: 19 additions & 9 deletions commons/zenoh-util/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::net::{IpAddr, Ipv6Addr};
#[cfg(unix)]
use std::sync::RwLock;

#[cfg(unix)]
use lazy_static::lazy_static;
Expand All @@ -20,6 +22,8 @@
use tokio::net::{TcpSocket, UdpSocket};
use zenoh_core::zconfigurable;
#[cfg(unix)]
use zenoh_core::{zread, zwrite};
#[cfg(unix)]
use zenoh_result::zerror;
use zenoh_result::{bail, ZResult};

Expand All @@ -30,7 +34,7 @@

#[cfg(unix)]
lazy_static! {
static ref IFACES: Vec<NetworkInterface> = pnet_datalink::interfaces();
static ref IFACES: RwLock<Vec<NetworkInterface>> = RwLock::new(pnet_datalink::interfaces());
}

#[cfg(windows)]
Expand Down Expand Up @@ -68,7 +72,7 @@
pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
#[cfg(unix)]
{
for iface in IFACES.iter() {
for iface in zread!(IFACES).iter() {

Check warning on line 75 in commons/zenoh-util/src/net/mod.rs

View check run for this annotation

Codecov / codecov/patch

commons/zenoh-util/src/net/mod.rs#L75

Added line #L75 was not covered by tests
if iface.name == name {
for ifaddr in &iface.ips {
if ifaddr.is_ipv4() {
Expand Down Expand Up @@ -131,7 +135,7 @@
pub fn get_multicast_interfaces() -> Vec<IpAddr> {
#[cfg(unix)]
{
IFACES
zread!(IFACES)
.iter()
.filter_map(|iface| {
if iface.is_up() && iface.is_running() && iface.is_multicast() {
Expand All @@ -155,7 +159,7 @@
pub fn get_local_addresses(interface: Option<&str>) -> ZResult<Vec<IpAddr>> {
#[cfg(unix)]
{
Ok(IFACES
Ok(zread!(IFACES)
.iter()
.filter(|iface| {
if let Some(interface) = interface.as_ref() {
Expand Down Expand Up @@ -205,7 +209,7 @@
pub fn get_unicast_addresses_of_multicast_interfaces() -> Vec<IpAddr> {
#[cfg(unix)]
{
IFACES
zread!(IFACES)
.iter()
.filter(|iface| iface.is_up() && iface.is_running() && iface.is_multicast())
.flat_map(|iface| {
Expand All @@ -228,7 +232,7 @@
pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult<Vec<IpAddr>> {
#[cfg(unix)]
{
match IFACES.iter().find(|iface| iface.name == name) {
match zread!(IFACES).iter().find(|iface| iface.name == name) {

Check warning on line 235 in commons/zenoh-util/src/net/mod.rs

View check run for this annotation

Codecov / codecov/patch

commons/zenoh-util/src/net/mod.rs#L235

Added line #L235 was not covered by tests
Some(iface) => {
if !iface.is_up() {
bail!("Interface {name} is not up");
Expand Down Expand Up @@ -282,7 +286,7 @@
pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
#[cfg(unix)]
{
IFACES
zread!(IFACES)

Check warning on line 289 in commons/zenoh-util/src/net/mod.rs

View check run for this annotation

Codecov / codecov/patch

commons/zenoh-util/src/net/mod.rs#L289

Added line #L289 was not covered by tests
.iter()
.find(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.index)
Expand Down Expand Up @@ -319,12 +323,12 @@
#[cfg(unix)]
{
if addr.is_unspecified() {
Ok(IFACES
Ok(zread!(IFACES)
.iter()
.map(|iface| iface.name.clone())
.collect::<Vec<String>>())
} else {
Ok(IFACES
Ok(zread!(IFACES)
.iter()
.filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.name.clone())
Expand Down Expand Up @@ -435,6 +439,12 @@
.collect()
}

#[cfg(unix)]
pub fn update_iface_cache() {
let mut interfaces = zwrite!(IFACES);
*interfaces = pnet_datalink::interfaces();
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn set_bind_to_device_tcp_socket(socket: &TcpSocket, iface: &str) -> ZResult<()> {
socket.bind_device(Some(iface.as_bytes()))?;
Expand Down
5 changes: 3 additions & 2 deletions zenoh/src/api/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use crate::{
builders::scouting::ScoutBuilder,
handlers::{Callback, DefaultHandler},
},
net::runtime::{orchestrator::Loop, Runtime},
net::runtime::{orchestrator::Loop, Runtime, Scouting},
Config,
};

/// A scout that returns [`Hello`] messages through a callback.
///
/// # Examples
Expand Down Expand Up @@ -172,7 +173,7 @@ pub(crate) fn _scout(
let task = TerminatableTask::spawn(
zenoh_runtime::ZRuntime::Acceptor,
async move {
let scout = Runtime::scout(&sockets, what, &addr, move |hello| {
let scout = Scouting::scout(&sockets, what, &addr, move |hello| {
let callback = callback.clone();
async move {
callback.call(hello.into());
Expand Down
15 changes: 15 additions & 0 deletions zenoh/src/net/protocol/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,21 @@
}
}

pub(crate) fn update_locators(&mut self) {
self.graph[self.idx].sn += 1;
self.send_on_links(
vec![(
self.idx,

Check warning on line 994 in zenoh/src/net/protocol/network.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/protocol/network.rs#L990-L994

Added lines #L990 - L994 were not covered by tests
Details {
zid: false,
locators: true,
links: self.full_linkstate || self.router_peers_failover_brokering,

Check warning on line 998 in zenoh/src/net/protocol/network.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/protocol/network.rs#L998

Added line #L998 was not covered by tests
},
)],
|link| link.transport.get_whatami().unwrap_or(WhatAmI::Peer) == WhatAmI::Router,

Check warning on line 1001 in zenoh/src/net/protocol/network.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/protocol/network.rs#L1001

Added line #L1001 was not covered by tests
);
}

Check warning on line 1003 in zenoh/src/net/protocol/network.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/protocol/network.rs#L1003

Added line #L1003 was not covered by tests

fn remove_detached_nodes(&mut self) -> Vec<(NodeIndex, Node)> {
let mut dfs_stack = vec![self.idx];
let mut visit_map = self.graph.visit_map();
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@
Ok(())
}

fn update_self_locators(&self, tables: &mut Tables) {
if let Some(net) = hat_mut!(tables).linkstatepeers_net.as_mut() {
net.update_locators();
}
}

Check warning on line 284 in zenoh/src/net/routing/hat/linkstate_peer/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/hat/linkstate_peer/mod.rs#L280-L284

Added lines #L280 - L284 were not covered by tests

fn close_face(
&self,
tables: &TablesLock,
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
send_declare: &mut SendDeclare,
) -> ZResult<()>;

fn update_self_locators(&self, _tables: &mut Tables) {}

Check warning on line 108 in zenoh/src/net/routing/hat/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/hat/mod.rs#L108

Added line #L108 was not covered by tests

fn handle_oam(
&self,
tables: &mut Tables,
Expand Down
15 changes: 15 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,4 +605,19 @@
}
vec![]
}

pub(super) fn update_locators(&mut self) {
self.graph[self.idx].sn += 1;
self.send_on_links(
vec![(
self.idx,
Details {
zid: false,
locators: true,
links: self.router_peers_failover_brokering,
},
)],
|link| link.transport.get_whatami().unwrap_or(WhatAmI::Peer) == WhatAmI::Router,

Check warning on line 620 in zenoh/src/net/routing/hat/p2p_peer/gossip.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/hat/p2p_peer/gossip.rs#L609-L620

Added lines #L609 - L620 were not covered by tests
);
}

Check warning on line 622 in zenoh/src/net/routing/hat/p2p_peer/gossip.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/hat/p2p_peer/gossip.rs#L622

Added line #L622 was not covered by tests
}
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@
Ok(())
}

fn update_self_locators(&self, tables: &mut Tables) {
if let Some(net) = hat_mut!(tables).gossip.as_mut() {
net.update_locators();
}
}

Check warning on line 226 in zenoh/src/net/routing/hat/p2p_peer/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/hat/p2p_peer/mod.rs#L222-L226

Added lines #L222 - L226 were not covered by tests

fn close_face(
&self,
tables: &TablesLock,
Expand Down
9 changes: 9 additions & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@
Ok(())
}

fn update_self_locators(&self, tables: &mut Tables) {
if let Some(net) = hat_mut!(tables).routers_net.as_mut() {
net.update_locators();
}
if let Some(net) = hat_mut!(tables).linkstatepeers_net.as_mut() {
net.update_locators();
}
}

Check warning on line 470 in zenoh/src/net/routing/hat/router/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/hat/router/mod.rs#L463-L470

Added lines #L463 - L470 were not covered by tests

fn close_face(
&self,
tables: &TablesLock,
Expand Down
44 changes: 44 additions & 0 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
mod adminspace;
pub mod orchestrator;
mod scouting;

#[cfg(feature = "plugins")]
use std::sync::{Mutex, MutexGuard};
Expand All @@ -29,11 +30,13 @@
atomic::{AtomicU32, Ordering},
Arc, Weak,
},
time::Duration,
};

pub use adminspace::AdminSpace;
use async_trait::async_trait;
use futures::{stream::StreamExt, Future};
pub use scouting::Scouting;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uhlc::{HLCBuilder, HLC};
Expand All @@ -55,6 +58,8 @@
multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler,
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
};
#[cfg(unix)]
use zenoh_util::net::update_iface_cache;

use self::orchestrator::StartConditions;
use super::{primitives::DeMux, routing, routing::router::Router};
Expand Down Expand Up @@ -87,6 +92,7 @@
plugins_manager: Mutex<PluginsManager>,
start_conditions: Arc<StartConditions>,
pending_connections: tokio::sync::Mutex<HashSet<ZenohIdProto>>,
scouting: tokio::sync::Mutex<Option<Scouting>>,
}

pub struct WeakRuntime {
Expand Down Expand Up @@ -175,6 +181,7 @@
// SHM lazy init flag
#[cfg(feature = "shared-memory")]
let shm_init_mode = *config.transport.shared_memory.mode();
let endpoint_poll_interval = config.listen.endpoint_poll_interval_ms().unwrap_or(10_000);

let config = Notifier::new(crate::config::Config(config));
let runtime = Runtime {
Expand All @@ -193,6 +200,7 @@
plugins_manager: Mutex::new(plugins_manager),
start_conditions: Arc::new(StartConditions::default()),
pending_connections: tokio::sync::Mutex::new(HashSet::new()),
scouting: tokio::sync::Mutex::new(None),
}),
};
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
Expand Down Expand Up @@ -240,6 +248,14 @@
zenoh_config::ShmInitMode::Lazy => {}
};

if endpoint_poll_interval > 0 {
let poll_interval = Duration::from_millis(endpoint_poll_interval as u64);
runtime.spawn({
let runtime2 = runtime.clone();
async move { runtime2.monitor_available_addrs(poll_interval).await }
});
}

Check warning on line 257 in zenoh/src/net/runtime/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/runtime/mod.rs#L257

Added line #L257 was not covered by tests

Ok(runtime)
}
}
Expand Down Expand Up @@ -366,6 +382,33 @@
pub(crate) async fn remove_pending_connection(&self, zid: &ZenohIdProto) -> bool {
self.state.pending_connections.lock().await.remove(zid)
}

async fn monitor_available_addrs(&self, poll_interval: Duration) {
let token = self.get_cancellation_token();
loop {
tokio::select! {
_ = tokio::time::sleep(poll_interval) => self.update_available_addrs().await,
_ = token.cancelled() => return,
}
}
}

async fn update_available_addrs(&self) {
#[cfg(unix)]
update_iface_cache();

if self.update_locators() {
let tables_lock = &self.state.router.tables;
let _ctrl_lock = zlock!(tables_lock.ctrl_lock);
let mut tables = zwrite!(tables_lock.tables);
tables_lock.hat_code.update_self_locators(&mut tables);

Check warning on line 404 in zenoh/src/net/runtime/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/runtime/mod.rs#L401-L404

Added lines #L401 - L404 were not covered by tests
}

let scouting = self.state.scouting.lock().await;
if let Some(scouting) = scouting.as_ref() {
scouting.update_addrs_if_needed().await;

Check warning on line 409 in zenoh/src/net/runtime/mod.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/runtime/mod.rs#L409

Added line #L409 was not covered by tests
}
}
}

struct RuntimeTransportEventHandler {
Expand Down Expand Up @@ -547,6 +590,7 @@
self.manager.close().await;
// clean up to break cyclic reference of self.state to itself
self.transport_handlers.write().unwrap().clear();
zasynclock!(self.scouting).take();
// TODO: the call below is needed to prevent intermittent leak
// due to not freed resource Arc, that apparently happens because
// the task responsible for resource clean up was aborted earlier than expected.
Expand Down
Loading
Loading