Skip to content

Commit 8a3b311

Browse files
author
Hugo Laloge
committed
Make the scouting routine create and close appropriate sockets on network changes
1 parent b94c464 commit 8a3b311

File tree

3 files changed

+235
-55
lines changed

3 files changed

+235
-55
lines changed

zenoh/src/net/runtime/mod.rs

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,28 @@ mod netlink;
2323
pub mod orchestrator;
2424
mod scouting;
2525

26+
use self::orchestrator::StartConditions;
27+
use super::{primitives::DeMux, routing, routing::router::Router};
28+
#[cfg(feature = "plugins")]
29+
use crate::api::loader::{load_plugins, start_plugins};
30+
#[cfg(feature = "plugins")]
31+
use crate::api::plugins::PluginsManager;
32+
#[cfg(feature = "internal")]
33+
use crate::session::CloseBuilder;
34+
use crate::{
35+
api::{
36+
builders::close::{Closeable, Closee},
37+
config::{Config, Notifier},
38+
},
39+
GIT_VERSION, LONG_VERSION,
40+
};
41+
pub use adminspace::AdminSpace;
42+
use async_trait::async_trait;
43+
use futures::{stream::StreamExt, Future};
44+
use rtnetlink::packet_route::address::{AddressAttribute, AddressMessage};
45+
use rtnetlink::packet_route::RouteNetlinkMessage;
46+
pub use scouting::Scouting;
47+
use std::net::IpAddr;
2648
#[cfg(feature = "plugins")]
2749
use std::sync::{Mutex, MutexGuard};
2850
use std::{
@@ -33,12 +55,6 @@ use std::{
3355
Arc, Weak,
3456
},
3557
};
36-
37-
pub use adminspace::AdminSpace;
38-
use async_trait::async_trait;
39-
use futures::{stream::StreamExt, Future};
40-
use rtnetlink::packet_route::{AddressFamily, RouteNetlinkMessage};
41-
pub use scouting::Scouting;
4258
use tokio::task::JoinHandle;
4359
use tokio_util::sync::CancellationToken;
4460
use uhlc::{HLCBuilder, HLC};
@@ -61,21 +77,6 @@ use zenoh_transport::{
6177
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
6278
};
6379
use zenoh_util::net::update_iface_cache;
64-
use self::orchestrator::StartConditions;
65-
use super::{primitives::DeMux, routing, routing::router::Router};
66-
#[cfg(feature = "plugins")]
67-
use crate::api::loader::{load_plugins, start_plugins};
68-
#[cfg(feature = "plugins")]
69-
use crate::api::plugins::PluginsManager;
70-
#[cfg(feature = "internal")]
71-
use crate::session::CloseBuilder;
72-
use crate::{
73-
api::{
74-
builders::close::{Closeable, Closee},
75-
config::{Config, Notifier},
76-
},
77-
GIT_VERSION, LONG_VERSION,
78-
};
7980

8081
pub(crate) struct RuntimeState {
8182
zid: ZenohId,
@@ -257,17 +258,7 @@ impl RuntimeBuilder {
257258
tokio::select! {
258259
message = netlink_monitor.next() => {
259260
match &message {
260-
Some(RouteNetlinkMessage::NewAddress(addr))
261-
| Some(RouteNetlinkMessage::DelAddress(addr)) => {
262-
tracing::debug!("NETLINK message: {:?}", message);
263-
// TODO Find how to handle IPv6 addresses changes without renewing too often.
264-
if addr.header.family == AddressFamily::Inet {
265-
update_iface_cache();
266-
runtime2.print_locators();
267-
// TODO Update scout if it exists.
268-
}
269-
},
270-
Some(_) => {},
261+
Some(msg) => { runtime2.on_netlink_addresses_changed(msg).await },
271262
None => { break; }
272263
}
273264
},
@@ -403,6 +394,44 @@ impl Runtime {
403394
pub(crate) async fn remove_pending_connection(&self, zid: &ZenohIdProto) -> bool {
404395
self.state.pending_connections.lock().await.remove(zid)
405396
}
397+
398+
#[cfg(unix)]
399+
async fn on_netlink_addresses_changed(&self, message: &RouteNetlinkMessage) {
400+
tracing::trace!("NETLINK message: {:?}", message);
401+
let (addr_message, is_new) = match message {
402+
RouteNetlinkMessage::NewAddress(addr) => (addr, true),
403+
RouteNetlinkMessage::DelAddress(addr) => (addr, false),
404+
_ => return,
405+
};
406+
407+
if let Some(relevant_addr) = get_relevant_address(addr_message) {
408+
// TODO Find how to handle IPv6 addresses changes without renewing too often.
409+
if relevant_addr.is_ipv4() {
410+
update_iface_cache();
411+
self.update_locators();
412+
let scouting = self.state.scouting.lock().await;
413+
if let Some(scouting) = scouting.as_ref() {
414+
let res = if is_new {
415+
scouting
416+
.update_addresses(&[relevant_addr.clone()], &[])
417+
.await
418+
} else {
419+
scouting
420+
.update_addresses(&[], &[relevant_addr.clone()])
421+
.await
422+
};
423+
if let Err(e) = res {
424+
tracing::error!(
425+
"Could not {} address {}: {}",
426+
if is_new { "add" } else { "remove" },
427+
relevant_addr,
428+
e
429+
);
430+
}
431+
}
432+
}
433+
}
434+
}
406435
}
407436

408437
struct RuntimeTransportEventHandler {
@@ -602,3 +631,18 @@ impl Closeable for Runtime {
602631
self.state.clone()
603632
}
604633
}
634+
635+
/// Find the relevant IpAddr in the attributes.
636+
/// Prefer the IFA_LOCAL address, or else use the IFA_ADDRESS address
637+
/// TODO: Prove why this choice
638+
fn get_relevant_address(address_message: &AddressMessage) -> Option<&IpAddr> {
639+
let mut res = None;
640+
for attribute in &address_message.attributes {
641+
match attribute {
642+
AddressAttribute::Address(addr) => res = Some(addr),
643+
AddressAttribute::Local(addr) => return Some(addr),
644+
_ => (),
645+
}
646+
}
647+
res
648+
}

zenoh/src/net/runtime/orchestrator.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ impl Runtime {
503503
self.spawn_add_listener(endpoint, retry_config).await
504504
}
505505
}
506-
self.print_locators();
506+
self.update_locators();
507507
Ok(())
508508
}
509509

@@ -515,7 +515,7 @@ impl Runtime {
515515
let this = self.clone();
516516
self.spawn(async move {
517517
this.add_listener_retry(listener, retry_config).await;
518-
this.print_locators();
518+
this.update_locators();
519519
});
520520
}
521521

@@ -545,12 +545,22 @@ impl Runtime {
545545
Ok(())
546546
}
547547

548-
pub fn print_locators(&self) {
548+
pub fn update_locators(&self) {
549549
let mut locators = self.state.locators.write().unwrap();
550-
*locators = self.manager().get_locators();
551-
for locator in &*locators {
552-
tracing::info!("Zenoh can be reached at: {}", locator);
550+
let new_locators = self.manager().get_locators();
551+
if tracing::enabled!(tracing::Level::INFO) {
552+
for locator in &new_locators {
553+
if !locators.contains(locator) {
554+
tracing::info!("Zenoh can be reached at: {}", locator);
555+
}
556+
}
557+
for old_locator in &*locators {
558+
if !new_locators.contains(old_locator) {
559+
tracing::info!("Zenoh can no longer be reached at: {}", old_locator);
560+
}
561+
}
553562
}
563+
*locators = new_locators;
554564
}
555565

556566
pub fn get_interfaces(names: &str) -> Vec<IpAddr> {

0 commit comments

Comments
 (0)