Skip to content

Commit 83f3393

Browse files
author
Hugo Laloge
committed
Use a 10s poll interval to see if locators have changed instead of NETLINK
NETLINK is only available on Linux. The poll interval may be less reactive and efficient, but it is available everywhere. Moreover, this fix the case where a new locator was not deteced if an interface IP was added before the interface was UP and RUNNING.
1 parent b4ab8dc commit 83f3393

File tree

4 files changed

+94
-227
lines changed

4 files changed

+94
-227
lines changed

zenoh/src/net/runtime/mod.rs

Lines changed: 20 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,24 @@
1818
//!
1919
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
2020
mod adminspace;
21-
#[cfg(target_os = "linux")]
22-
mod netlink;
2321
pub mod orchestrator;
2422
mod scouting;
2523

2624
#[cfg(feature = "plugins")]
2725
use std::sync::{Mutex, MutexGuard};
28-
#[cfg(target_os = "linux")]
29-
use std::time::Duration;
3026
use std::{
3127
any::Any,
3228
collections::HashSet,
33-
net::IpAddr,
3429
sync::{
3530
atomic::{AtomicU32, Ordering},
3631
Arc, Weak,
3732
},
33+
time::Duration,
3834
};
3935

4036
pub use adminspace::AdminSpace;
4137
use async_trait::async_trait;
4238
use futures::{stream::StreamExt, Future};
43-
#[cfg(target_os = "linux")]
44-
use rtnetlink::packet_route::{
45-
address::{AddressAttribute, AddressMessage},
46-
RouteNetlinkMessage,
47-
};
4839
pub use scouting::Scouting;
4940
use tokio::task::JoinHandle;
5041
use tokio_util::sync::CancellationToken;
@@ -67,7 +58,7 @@ use zenoh_transport::{
6758
multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler,
6859
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
6960
};
70-
#[cfg(target_os = "linux")]
61+
#[cfg(unix)]
7162
use zenoh_util::net::update_iface_cache;
7263

7364
use self::orchestrator::StartConditions;
@@ -86,9 +77,6 @@ use crate::{
8677
GIT_VERSION, LONG_VERSION,
8778
};
8879

89-
#[cfg(target_os = "linux")]
90-
const NETLINK_TIMEOUT: Duration = Duration::from_millis(500);
91-
9280
pub(crate) struct RuntimeState {
9381
zid: ZenohId,
9482
whatami: WhatAmI,
@@ -259,11 +247,9 @@ impl RuntimeBuilder {
259247
zenoh_config::ShmInitMode::Lazy => {}
260248
};
261249

262-
#[cfg(target_os = "linux")]
263250
runtime.spawn({
264-
let netlink_monitor = netlink::NetlinkMonitor::new()?;
265251
let runtime2 = runtime.clone();
266-
async move { runtime2.monitor_netlink_socket(netlink_monitor).await }
252+
async move { runtime2.monitor_available_addrs().await }
267253
});
268254

269255
Ok(runtime)
@@ -393,90 +379,30 @@ impl Runtime {
393379
self.state.pending_connections.lock().await.remove(zid)
394380
}
395381

396-
#[cfg(target_os = "linux")]
397-
async fn monitor_netlink_socket(&self, mut netlink: netlink::NetlinkMonitor) {
398-
fn add_addr_to_set(
399-
message: &AddressMessage,
400-
new: bool,
401-
new_addresses: &mut HashSet<IpAddr>,
402-
del_addresses: &mut HashSet<IpAddr>,
403-
) {
404-
if let Some(addr) = get_relevant_address(message) {
405-
if new {
406-
new_addresses.insert(*addr);
407-
del_addresses.remove(addr);
408-
} else {
409-
del_addresses.insert(*addr);
410-
new_addresses.remove(addr);
411-
}
412-
}
413-
}
414-
382+
async fn monitor_available_addrs(&self) {
415383
let token = self.get_cancellation_token();
384+
let update_interval = Duration::from_secs(10);
416385
loop {
417-
let mut new_addresses = HashSet::<IpAddr>::new();
418-
let mut old_addresses = HashSet::<IpAddr>::new();
419386
tokio::select! {
420-
message = netlink.next() => {
421-
tracing::trace!("NETLINK message: {:?}", message);
422-
match &message {
423-
Some(RouteNetlinkMessage::NewAddress(msg)) => add_addr_to_set(msg, true, &mut new_addresses, &mut old_addresses),
424-
Some(RouteNetlinkMessage::DelAddress(msg)) => add_addr_to_set(msg, false, &mut new_addresses, &mut old_addresses),
425-
Some(_) => (),
426-
None => { break; }
427-
}
428-
},
429-
_ = token.cancelled() => { return }
430-
}
431-
432-
// Same, but with a timeout so we collect multiple netlink messages
433-
// and restart the scouting routine only one time.
434-
loop {
435-
tokio::select! {
436-
message = netlink.next() => {
437-
tracing::trace!("NETLINK message: {:?}", message);
438-
match &message {
439-
Some(RouteNetlinkMessage::NewAddress(msg)) => add_addr_to_set(msg, true, &mut new_addresses, &mut old_addresses),
440-
Some(RouteNetlinkMessage::DelAddress(msg)) => add_addr_to_set(msg, false, &mut new_addresses, &mut old_addresses),
441-
Some(_) => (),
442-
None => { break; }
443-
}
444-
},
445-
_ = tokio::time::sleep(NETLINK_TIMEOUT) => { break; }
446-
_ = token.cancelled() => { return }
447-
}
387+
_ = tokio::time::sleep(update_interval) => self.update_available_addrs().await,
388+
_ = token.cancelled() => return,
448389
}
390+
}
391+
}
449392

450-
update_iface_cache();
451-
self.update_locators();
393+
async fn update_available_addrs(&self) {
394+
#[cfg(unix)]
395+
update_iface_cache();
452396

453-
{
454-
let ctrl_lock = zlock!(self.state.router.tables.ctrl_lock);
455-
let mut tables = zwrite!(self.state.router.tables.tables);
456-
ctrl_lock.update_self_locators(&mut tables);
457-
}
397+
if self.update_locators() {
398+
let ctrl_lock = zlock!(self.state.router.tables.ctrl_lock);
399+
let mut tables = zwrite!(self.state.router.tables.tables);
400+
ctrl_lock.update_self_locators(&mut tables);
401+
}
458402

459-
let scouting = self.state.scouting.lock().await;
460-
if let Some(scouting) = scouting.as_ref() {
461-
let new_addresses = new_addresses.drain().collect::<Vec<_>>();
462-
let old_addresses = old_addresses.drain().collect::<Vec<_>>();
463-
tracing::debug!(
464-
"Update scouting addresses with +{:?}, -{:?}",
465-
new_addresses,
466-
old_addresses
467-
);
468-
let res = scouting
469-
.update_addresses(&new_addresses, &old_addresses)
470-
.await;
471-
if let Err(e) = res {
472-
tracing::error!(
473-
"Could not update scouting addresses with +{:?}, -{:?}: {}",
474-
new_addresses,
475-
old_addresses,
476-
e
477-
);
478-
}
479-
}
403+
let scouting = self.state.scouting.lock().await;
404+
if let Some(scouting) = scouting.as_ref() {
405+
scouting.update_addrs_if_needed().await;
480406
}
481407
}
482408
}
@@ -678,18 +604,3 @@ impl Closeable for Runtime {
678604
self.state.clone()
679605
}
680606
}
681-
682-
/// Find the relevant IpAddr in the attributes.
683-
/// Prefer the IFA_LOCAL address, or else use the IFA_ADDRESS address
684-
/// TODO: Prove why this choice
685-
fn get_relevant_address(address_message: &AddressMessage) -> Option<&IpAddr> {
686-
let mut res = None;
687-
for attribute in &address_message.attributes {
688-
match attribute {
689-
AddressAttribute::Address(addr) => res = Some(addr),
690-
AddressAttribute::Local(addr) => return Some(addr),
691-
_ => (),
692-
}
693-
}
694-
res
695-
}

zenoh/src/net/runtime/netlink.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

zenoh/src/net/runtime/orchestrator.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,17 @@ impl Runtime {
544544
Ok(())
545545
}
546546

547-
pub fn update_locators(&self) {
547+
pub fn update_locators(&self) -> bool {
548548
let mut locators = self.state.locators.write().unwrap();
549549
let new_locators = self.manager().get_locators();
550+
if new_locators == *locators {
551+
return false;
552+
}
553+
554+
tracing::info!(
555+
"New locators: {new_locators:?}, previously: {:?}",
556+
*locators
557+
);
550558
if tracing::enabled!(tracing::Level::INFO) {
551559
for locator in &new_locators {
552560
if !locators.contains(locator) {
@@ -560,6 +568,7 @@ impl Runtime {
560568
}
561569
}
562570
*locators = new_locators;
571+
true
563572
}
564573

565574
pub fn get_interfaces(names: &str) -> Vec<IpAddr> {

0 commit comments

Comments
 (0)