Skip to content

Commit eec08cc

Browse files
author
Hugo Laloge
committed
Use a 10s poll interval to see if locators have changed instead of NETLINK
NETLINK is only available on Linux. The poll interval may be less reactive and efficient, but it is available everywhere. Moreover, this fix the case where a new locator was not deteced if an interface IP was added before the interface was UP and RUNNING.
1 parent 7378e00 commit eec08cc

File tree

7 files changed

+95
-316
lines changed

7 files changed

+95
-316
lines changed

Cargo.lock

Lines changed: 0 additions & 84 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ regex = "1.10.6"
145145
ron = "0.8.1"
146146
ringbuffer-spsc = "0.1.9"
147147
rsa = "0.9"
148-
rtnetlink = "0.15"
149148
rustc_version = "0.4.1"
150149
rustls = { version = "0.23.13", default-features = false, features = [
151150
"logging",

zenoh/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,6 @@ zenoh-runtime = { workspace = true }
123123
zenoh-task = { workspace = true }
124124
once_cell = { workspace = true }
125125

126-
[target.'cfg(target_os = "linux")'.dependencies]
127-
rtnetlink = { workspace = true }
128-
129126
[dev-dependencies]
130127
tokio = { workspace = true }
131128
libc = { workspace = true }

zenoh/src/net/runtime/mod.rs

Lines changed: 21 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,24 @@
1818
//!
1919
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
2020
mod adminspace;
21-
#[cfg(target_os = "linux")]
22-
mod netlink;
2321
pub mod orchestrator;
2422
mod scouting;
2523

2624
#[cfg(feature = "plugins")]
2725
use std::sync::{Mutex, MutexGuard};
28-
#[cfg(target_os = "linux")]
29-
use std::time::Duration;
3026
use std::{
3127
any::Any,
3228
collections::HashSet,
33-
net::IpAddr,
3429
sync::{
3530
atomic::{AtomicU32, Ordering},
3631
Arc, Weak,
3732
},
33+
time::Duration,
3834
};
3935

4036
pub use adminspace::AdminSpace;
4137
use async_trait::async_trait;
4238
use futures::{stream::StreamExt, Future};
43-
#[cfg(target_os = "linux")]
44-
use rtnetlink::packet_route::{
45-
address::{AddressAttribute, AddressMessage},
46-
RouteNetlinkMessage,
47-
};
4839
pub use scouting::Scouting;
4940
use tokio::task::JoinHandle;
5041
use tokio_util::sync::CancellationToken;
@@ -67,7 +58,7 @@ use zenoh_transport::{
6758
multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler,
6859
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
6960
};
70-
#[cfg(target_os = "linux")]
61+
#[cfg(unix)]
7162
use zenoh_util::net::update_iface_cache;
7263

7364
use self::orchestrator::StartConditions;
@@ -86,9 +77,6 @@ use crate::{
8677
GIT_VERSION, LONG_VERSION,
8778
};
8879

89-
#[cfg(target_os = "linux")]
90-
const NETLINK_TIMEOUT: Duration = Duration::from_millis(500);
91-
9280
pub(crate) struct RuntimeState {
9381
zid: ZenohId,
9482
whatami: WhatAmI,
@@ -259,11 +247,9 @@ impl RuntimeBuilder {
259247
zenoh_config::ShmInitMode::Lazy => {}
260248
};
261249

262-
#[cfg(target_os = "linux")]
263250
runtime.spawn({
264-
let netlink_monitor = netlink::NetlinkMonitor::new()?;
265251
let runtime2 = runtime.clone();
266-
async move { runtime2.monitor_netlink_socket(netlink_monitor).await }
252+
async move { runtime2.monitor_available_addrs().await }
267253
});
268254

269255
Ok(runtime)
@@ -393,91 +379,31 @@ impl Runtime {
393379
self.state.pending_connections.lock().await.remove(zid)
394380
}
395381

396-
#[cfg(target_os = "linux")]
397-
async fn monitor_netlink_socket(&self, mut netlink: netlink::NetlinkMonitor) {
398-
fn add_addr_to_set(
399-
message: &AddressMessage,
400-
new: bool,
401-
new_addresses: &mut HashSet<IpAddr>,
402-
del_addresses: &mut HashSet<IpAddr>,
403-
) {
404-
if let Some(addr) = get_relevant_address(message) {
405-
if new {
406-
new_addresses.insert(*addr);
407-
del_addresses.remove(addr);
408-
} else {
409-
del_addresses.insert(*addr);
410-
new_addresses.remove(addr);
411-
}
412-
}
413-
}
414-
382+
async fn monitor_available_addrs(&self) {
415383
let token = self.get_cancellation_token();
384+
let update_interval = Duration::from_secs(10);
416385
loop {
417-
let mut new_addresses = HashSet::<IpAddr>::new();
418-
let mut old_addresses = HashSet::<IpAddr>::new();
419386
tokio::select! {
420-
message = netlink.next() => {
421-
tracing::trace!("NETLINK message: {:?}", message);
422-
match &message {
423-
Some(RouteNetlinkMessage::NewAddress(msg)) => add_addr_to_set(msg, true, &mut new_addresses, &mut old_addresses),
424-
Some(RouteNetlinkMessage::DelAddress(msg)) => add_addr_to_set(msg, false, &mut new_addresses, &mut old_addresses),
425-
Some(_) => (),
426-
None => { break; }
427-
}
428-
},
429-
_ = token.cancelled() => { return }
430-
}
431-
432-
// Same, but with a timeout so we collect multiple netlink messages
433-
// and restart the scouting routine only one time.
434-
loop {
435-
tokio::select! {
436-
message = netlink.next() => {
437-
tracing::trace!("NETLINK message: {:?}", message);
438-
match &message {
439-
Some(RouteNetlinkMessage::NewAddress(msg)) => add_addr_to_set(msg, true, &mut new_addresses, &mut old_addresses),
440-
Some(RouteNetlinkMessage::DelAddress(msg)) => add_addr_to_set(msg, false, &mut new_addresses, &mut old_addresses),
441-
Some(_) => (),
442-
None => { break; }
443-
}
444-
},
445-
_ = tokio::time::sleep(NETLINK_TIMEOUT) => { break; }
446-
_ = token.cancelled() => { return }
447-
}
387+
_ = tokio::time::sleep(update_interval) => self.update_available_addrs().await,
388+
_ = token.cancelled() => return,
448389
}
390+
}
391+
}
449392

450-
update_iface_cache();
451-
self.update_locators();
393+
async fn update_available_addrs(&self) {
394+
#[cfg(unix)]
395+
update_iface_cache();
452396

453-
{
454-
let tables_lock = &self.state.router.tables;
455-
let _ctrl_lock = zlock!(tables_lock.ctrl_lock);
456-
let mut tables = zwrite!(tables_lock.tables);
457-
tables_lock.hat_code.update_self_locators(&mut tables);
458-
}
397+
if self.update_locators() {
398+
let tables_lock = &self.state.router.tables;
399+
let _ctrl_lock = zlock!(tables_lock.ctrl_lock);
400+
let mut tables = zwrite!(tables_lock.tables);
401+
tables_lock.hat_code.update_self_locators(&mut tables);
402+
}
459403

460-
let scouting = self.state.scouting.lock().await;
461-
if let Some(scouting) = scouting.as_ref() {
462-
let new_addresses = new_addresses.drain().collect::<Vec<_>>();
463-
let old_addresses = old_addresses.drain().collect::<Vec<_>>();
464-
tracing::debug!(
465-
"Update scouting addresses with +{:?}, -{:?}",
466-
new_addresses,
467-
old_addresses
468-
);
469-
let res = scouting
470-
.update_addresses(&new_addresses, &old_addresses)
471-
.await;
472-
if let Err(e) = res {
473-
tracing::error!(
474-
"Could not update scouting addresses with +{:?}, -{:?}: {}",
475-
new_addresses,
476-
old_addresses,
477-
e
478-
);
479-
}
480-
}
404+
let scouting = self.state.scouting.lock().await;
405+
if let Some(scouting) = scouting.as_ref() {
406+
scouting.update_addrs_if_needed().await;
481407
}
482408
}
483409
}
@@ -679,18 +605,3 @@ impl Closeable for Runtime {
679605
self.state.clone()
680606
}
681607
}
682-
683-
/// Find the relevant IpAddr in the attributes.
684-
/// Prefer the IFA_LOCAL address, or else use the IFA_ADDRESS address
685-
/// TODO: Prove why this choice
686-
fn get_relevant_address(address_message: &AddressMessage) -> Option<&IpAddr> {
687-
let mut res = None;
688-
for attribute in &address_message.attributes {
689-
match attribute {
690-
AddressAttribute::Address(addr) => res = Some(addr),
691-
AddressAttribute::Local(addr) => return Some(addr),
692-
_ => (),
693-
}
694-
}
695-
res
696-
}

zenoh/src/net/runtime/netlink.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)