From cfac938f5ab0326cbddcebb049ee7c0b2ccf7447 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 10:35:17 +0200 Subject: [PATCH 1/6] add session closing callback API --- Cargo.lock | 10 ++-- Cargo.toml | 1 + zenoh/Cargo.toml | 1 + zenoh/src/api/session.rs | 108 +++++++++++++++++++++++++++++++++++++++ zenoh/src/lib.rs | 2 +- 5 files changed, 115 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd5d2c029f..e378bf739d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3766,12 +3766,9 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "slab" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" [[package]] name = "smallvec" @@ -4496,7 +4493,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] @@ -5193,6 +5190,7 @@ dependencies = [ "rustc_version 0.4.1", "serde", "serde_json", + "slab", "socket2 0.5.7", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index e7f5513d27..1ed09e632e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,6 +163,7 @@ serde = { version = "1.0.210", default-features = false, features = [ serde_json = "1.0.128" serde_with = "3.12.0" serde_yaml = "0.9.34" +slab = "0.4.10" static_init = "1.0.3" stabby = "36.1.1" sha3 = "0.10.8" diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index b9e3f595fb..2bf4e75cbd 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -101,6 +101,7 @@ rand = { workspace = true, features = ["default"] } ref-cast = { workspace = true } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } +slab = { workspace = true } socket2 = { workspace = true } uhlc = { workspace = true, features = ["default"] } vec_map = { workspace = true } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 834dfa02c5..181b43368c 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -156,6 +156,8 @@ pub(crate) struct SessionState { pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, pub(crate) publisher_qos_tree: KeBoxTree, + #[cfg(feature = "unstable")] + pub(crate) closing_callbacks: ClosingCallbackList, } impl SessionState { @@ -190,6 +192,8 @@ impl SessionState { aggregated_subscribers, aggregated_publishers, publisher_qos_tree, + #[cfg(feature = "unstable")] + closing_callbacks: Default::default(), } } } @@ -1249,6 +1253,61 @@ impl Session { source_info: SourceInfo::empty(), } } + + /// Registers a closing callback to the session. + /// + /// The callback will be called when the session will be closed. It returns an id to unregister + /// the callback with [`unregister_closing_callback`](Self::unregister_closing_callback). If + /// the session is already closed, the callback is returned in an error. + /// + /// # Examples + /// + /// ```rust + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// if let Err(_) = session.register_closing_callback(|| println!("session closed")) { + /// println!("session already closed"); + /// } + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn register_closing_callback( + &self, + callback: F, + ) -> Result { + let mut state = zwrite!(self.0.state); + if state.primitives().is_err() { + return Err(callback); + } + Ok(state.closing_callbacks.insert_callback(Box::new(callback))) + } + + /// Unregisters a closing callback. + /// + /// The callback must have been registered with + /// [`register_closing_callback`](Self::register_closing_callback). + /// It will no longer be called on session closing. + /// + /// # Examples + /// + /// ```rust + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let Ok(id) = session.register_closing_callback(|| println!("session closed")) else { + /// panic!("session already closed"); + /// }; + /// session.unregister_closing_callback(id); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn unregister_closing_callback(&self, callback_id: ClosingCallbackId) { + let mut state = zwrite!(self.0.state); + state.closing_callbacks.remove_callback(callback_id); + } } impl Session { @@ -3213,6 +3272,7 @@ impl Closee for Arc { // will be stabilized. let mut state = zwrite!(self.state); let _matching_listeners = std::mem::take(&mut state.matching_listeners); + state.closing_callbacks.close(); drop(state); } } @@ -3225,3 +3285,51 @@ impl Closeable for Session { self.0.clone() } } + +#[cfg(feature = "unstable")] +#[derive(Default)] +pub(crate) struct ClosingCallbackList { + callbacks: slab::Slab, + generation: usize, +} + +#[cfg(feature = "unstable")] +impl ClosingCallbackList { + fn insert_callback(&mut self, callback: Box) -> ClosingCallbackId { + let generation = self.generation; + let index = self.callbacks.insert(ClosingCallback { + callback, + generation, + }); + self.generation = self.generation.wrapping_add(1); + ClosingCallbackId { index, generation } + } + + fn remove_callback(&mut self, id: ClosingCallbackId) { + if matches!( self.callbacks.get(id.index), Some(cb) if cb.generation == id.generation) { + self.callbacks.remove(id.index); + } + } + + fn close(&mut self) { + for cb in self.callbacks.drain() { + (cb.callback)(); + } + } +} + +#[cfg(feature = "unstable")] +struct ClosingCallback { + callback: Box, + generation: usize, +} + +/// The id of a registered session closing callback. +/// +/// See [`Session::register_closing_callback`]. +#[zenoh_macros::unstable] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ClosingCallbackId { + index: usize, + generation: usize, +} diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index cb5e0d0f0c..adb03d19f8 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -217,7 +217,7 @@ pub mod session { session::OpenBuilder, }, info::SessionInfo, - session::{open, Session, SessionClosedError, Undeclarable}, + session::{open, ClosingCallbackId, Session, SessionClosedError, Undeclarable}, }; } From b2846273b030b8db8b3a097aae003a0bee406576 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 10:40:49 +0200 Subject: [PATCH 2/6] fix ClosingCallbackId reexport --- zenoh/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index adb03d19f8..2b2614eaf9 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -208,6 +208,8 @@ pub mod session { #[zenoh_macros::internal] pub use crate::api::builders::session::{init, InitBuilder}; + #[zenoh_macros::unstable] + pub use crate::api::session::ClosingCallbackId; pub use crate::api::{ builders::{ close::CloseBuilder, @@ -217,7 +219,7 @@ pub mod session { session::OpenBuilder, }, info::SessionInfo, - session::{open, ClosingCallbackId, Session, SessionClosedError, Undeclarable}, + session::{open, Session, SessionClosedError, Undeclarable}, }; } From 79d62be7eeba3822403855f8a3ebed4d3f5000d5 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 10:42:40 +0200 Subject: [PATCH 3/6] typo --- zenoh/src/api/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 181b43368c..5897a9e681 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1304,7 +1304,7 @@ impl Session { /// # } /// ``` #[zenoh_macros::unstable] - pub fn unregister_closing_callback(&self, callback_id: ClosingCallbackId) { + pub fn unregister_closing_callback(&self, callback_id: ClosingCallbackId) { let mut state = zwrite!(self.0.state); state.closing_callbacks.remove_callback(callback_id); } From 82db03b7d3cd054ea40e9c2f84cf95967e32129e Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 11:16:33 +0200 Subject: [PATCH 4/6] release the state lock before executing callbacks --- zenoh/src/api/session.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 5897a9e681..9132da2b9f 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -3272,7 +3272,7 @@ impl Closee for Arc { // will be stabilized. let mut state = zwrite!(self.state); let _matching_listeners = std::mem::take(&mut state.matching_listeners); - state.closing_callbacks.close(); + let _closing_callbacks = std::mem::take(&mut state.closing_callbacks); drop(state); } } @@ -3310,8 +3310,10 @@ impl ClosingCallbackList { self.callbacks.remove(id.index); } } +} - fn close(&mut self) { +impl Drop for ClosingCallbackList { + fn drop(&mut self) { for cb in self.callbacks.drain() { (cb.callback)(); } From 4685e866b1779ad24c155a77c8345854e4a9e028 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 11:18:07 +0200 Subject: [PATCH 5/6] typo --- zenoh/src/api/session.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 9132da2b9f..e06a20ebb8 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -3312,6 +3312,7 @@ impl ClosingCallbackList { } } +#[cfg(feature = "unstable")] impl Drop for ClosingCallbackList { fn drop(&mut self) { for cb in self.callbacks.drain() { From 663cbb5ac10c486d1a435a98dd023dc7830bd2b3 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 12:29:14 +0200 Subject: [PATCH 6/6] make feature internal instead of unstable --- zenoh/src/api/session.rs | 13 ++++--------- zenoh/src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index e06a20ebb8..93ff5780df 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -156,7 +156,6 @@ pub(crate) struct SessionState { pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, pub(crate) publisher_qos_tree: KeBoxTree, - #[cfg(feature = "unstable")] pub(crate) closing_callbacks: ClosingCallbackList, } @@ -192,7 +191,6 @@ impl SessionState { aggregated_subscribers, aggregated_publishers, publisher_qos_tree, - #[cfg(feature = "unstable")] closing_callbacks: Default::default(), } } @@ -1260,6 +1258,8 @@ impl Session { /// the callback with [`unregister_closing_callback`](Self::unregister_closing_callback). If /// the session is already closed, the callback is returned in an error. /// + /// Execution order of callbacks is unspecified. + /// /// # Examples /// /// ```rust @@ -1272,7 +1272,7 @@ impl Session { /// } /// # } /// ``` - #[zenoh_macros::unstable] + #[zenoh_macros::internal] pub fn register_closing_callback( &self, callback: F, @@ -1303,7 +1303,7 @@ impl Session { /// session.unregister_closing_callback(id); /// # } /// ``` - #[zenoh_macros::unstable] + #[zenoh_macros::internal] pub fn unregister_closing_callback(&self, callback_id: ClosingCallbackId) { let mut state = zwrite!(self.0.state); state.closing_callbacks.remove_callback(callback_id); @@ -3286,14 +3286,12 @@ impl Closeable for Session { } } -#[cfg(feature = "unstable")] #[derive(Default)] pub(crate) struct ClosingCallbackList { callbacks: slab::Slab, generation: usize, } -#[cfg(feature = "unstable")] impl ClosingCallbackList { fn insert_callback(&mut self, callback: Box) -> ClosingCallbackId { let generation = self.generation; @@ -3312,7 +3310,6 @@ impl ClosingCallbackList { } } -#[cfg(feature = "unstable")] impl Drop for ClosingCallbackList { fn drop(&mut self) { for cb in self.callbacks.drain() { @@ -3321,7 +3318,6 @@ impl Drop for ClosingCallbackList { } } -#[cfg(feature = "unstable")] struct ClosingCallback { callback: Box, generation: usize, @@ -3330,7 +3326,6 @@ struct ClosingCallback { /// The id of a registered session closing callback. /// /// See [`Session::register_closing_callback`]. -#[zenoh_macros::unstable] #[derive(Debug, Clone, PartialEq, Eq)] pub struct ClosingCallbackId { index: usize, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 2b2614eaf9..f152eb1224 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -208,7 +208,7 @@ pub mod session { #[zenoh_macros::internal] pub use crate::api::builders::session::{init, InitBuilder}; - #[zenoh_macros::unstable] + #[zenoh_macros::internal] pub use crate::api::session::ClosingCallbackId; pub use crate::api::{ builders::{