Skip to content

Commit cfac938

Browse files
committed
add session closing callback API
1 parent 6f9cfa0 commit cfac938

File tree

5 files changed

+115
-7
lines changed

5 files changed

+115
-7
lines changed

Cargo.lock

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ serde = { version = "1.0.210", default-features = false, features = [
163163
serde_json = "1.0.128"
164164
serde_with = "3.12.0"
165165
serde_yaml = "0.9.34"
166+
slab = "0.4.10"
166167
static_init = "1.0.3"
167168
stabby = "36.1.1"
168169
sha3 = "0.10.8"

zenoh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ rand = { workspace = true, features = ["default"] }
101101
ref-cast = { workspace = true }
102102
serde = { workspace = true, features = ["default"] }
103103
serde_json = { workspace = true }
104+
slab = { workspace = true }
104105
socket2 = { workspace = true }
105106
uhlc = { workspace = true, features = ["default"] }
106107
vec_map = { workspace = true }

zenoh/src/api/session.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ pub(crate) struct SessionState {
156156
pub(crate) aggregated_subscribers: Vec<OwnedKeyExpr>,
157157
pub(crate) aggregated_publishers: Vec<OwnedKeyExpr>,
158158
pub(crate) publisher_qos_tree: KeBoxTree<PublisherQoSConfig>,
159+
#[cfg(feature = "unstable")]
160+
pub(crate) closing_callbacks: ClosingCallbackList,
159161
}
160162

161163
impl SessionState {
@@ -190,6 +192,8 @@ impl SessionState {
190192
aggregated_subscribers,
191193
aggregated_publishers,
192194
publisher_qos_tree,
195+
#[cfg(feature = "unstable")]
196+
closing_callbacks: Default::default(),
193197
}
194198
}
195199
}
@@ -1249,6 +1253,61 @@ impl Session {
12491253
source_info: SourceInfo::empty(),
12501254
}
12511255
}
1256+
1257+
/// Registers a closing callback to the session.
1258+
///
1259+
/// The callback will be called when the session will be closed. It returns an id to unregister
1260+
/// the callback with [`unregister_closing_callback`](Self::unregister_closing_callback). If
1261+
/// the session is already closed, the callback is returned in an error.
1262+
///
1263+
/// # Examples
1264+
///
1265+
/// ```rust
1266+
/// # #[tokio::main]
1267+
/// # async fn main() {
1268+
///
1269+
/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
1270+
/// if let Err(_) = session.register_closing_callback(|| println!("session closed")) {
1271+
/// println!("session already closed");
1272+
/// }
1273+
/// # }
1274+
/// ```
1275+
#[zenoh_macros::unstable]
1276+
pub fn register_closing_callback<F: FnOnce() + Send + Sync + 'static>(
1277+
&self,
1278+
callback: F,
1279+
) -> Result<ClosingCallbackId, F> {
1280+
let mut state = zwrite!(self.0.state);
1281+
if state.primitives().is_err() {
1282+
return Err(callback);
1283+
}
1284+
Ok(state.closing_callbacks.insert_callback(Box::new(callback)))
1285+
}
1286+
1287+
/// Unregisters a closing callback.
1288+
///
1289+
/// The callback must have been registered with
1290+
/// [`register_closing_callback`](Self::register_closing_callback).
1291+
/// It will no longer be called on session closing.
1292+
///
1293+
/// # Examples
1294+
///
1295+
/// ```rust
1296+
/// # #[tokio::main]
1297+
/// # async fn main() {
1298+
///
1299+
/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
1300+
/// let Ok(id) = session.register_closing_callback(|| println!("session closed")) else {
1301+
/// panic!("session already closed");
1302+
/// };
1303+
/// session.unregister_closing_callback(id);
1304+
/// # }
1305+
/// ```
1306+
#[zenoh_macros::unstable]
1307+
pub fn unregister_closing_callback<F: FnOnce()>(&self, callback_id: ClosingCallbackId) {
1308+
let mut state = zwrite!(self.0.state);
1309+
state.closing_callbacks.remove_callback(callback_id);
1310+
}
12521311
}
12531312

12541313
impl Session {
@@ -3213,6 +3272,7 @@ impl Closee for Arc<SessionInner> {
32133272
// will be stabilized.
32143273
let mut state = zwrite!(self.state);
32153274
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
3275+
state.closing_callbacks.close();
32163276
drop(state);
32173277
}
32183278
}
@@ -3225,3 +3285,51 @@ impl Closeable for Session {
32253285
self.0.clone()
32263286
}
32273287
}
3288+
3289+
#[cfg(feature = "unstable")]
3290+
#[derive(Default)]
3291+
pub(crate) struct ClosingCallbackList {
3292+
callbacks: slab::Slab<ClosingCallback>,
3293+
generation: usize,
3294+
}
3295+
3296+
#[cfg(feature = "unstable")]
3297+
impl ClosingCallbackList {
3298+
fn insert_callback(&mut self, callback: Box<dyn FnOnce() + Send + Sync>) -> ClosingCallbackId {
3299+
let generation = self.generation;
3300+
let index = self.callbacks.insert(ClosingCallback {
3301+
callback,
3302+
generation,
3303+
});
3304+
self.generation = self.generation.wrapping_add(1);
3305+
ClosingCallbackId { index, generation }
3306+
}
3307+
3308+
fn remove_callback(&mut self, id: ClosingCallbackId) {
3309+
if matches!( self.callbacks.get(id.index), Some(cb) if cb.generation == id.generation) {
3310+
self.callbacks.remove(id.index);
3311+
}
3312+
}
3313+
3314+
fn close(&mut self) {
3315+
for cb in self.callbacks.drain() {
3316+
(cb.callback)();
3317+
}
3318+
}
3319+
}
3320+
3321+
#[cfg(feature = "unstable")]
3322+
struct ClosingCallback {
3323+
callback: Box<dyn FnOnce() + Send + Sync>,
3324+
generation: usize,
3325+
}
3326+
3327+
/// The id of a registered session closing callback.
3328+
///
3329+
/// See [`Session::register_closing_callback`].
3330+
#[zenoh_macros::unstable]
3331+
#[derive(Debug, Clone, PartialEq, Eq)]
3332+
pub struct ClosingCallbackId {
3333+
index: usize,
3334+
generation: usize,
3335+
}

zenoh/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ pub mod session {
217217
session::OpenBuilder,
218218
},
219219
info::SessionInfo,
220-
session::{open, Session, SessionClosedError, Undeclarable},
220+
session::{open, ClosingCallbackId, Session, SessionClosedError, Undeclarable},
221221
};
222222
}
223223

0 commit comments

Comments
 (0)