Skip to content

Commit 3fe2c38

Browse files
committed
close advanced sub with session closing callback
1 parent 663cbb5 commit 3fe2c38

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

zenoh-ext/src/advanced_subscriber.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use zenoh::{
2323
ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
2424
},
2525
sample::{Locality, Sample, SampleKind, SourceSn},
26-
session::{EntityGlobalId, EntityId},
26+
session::{ClosingCallbackId, EntityGlobalId, EntityId},
2727
Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
2828
KE_SUB,
2929
};
@@ -409,6 +409,15 @@ struct State {
409409
callback: Callback<Sample>,
410410
miss_handlers: HashMap<usize, Callback<Miss>>,
411411
token: Option<LivelinessToken>,
412+
closing_callback_id: Option<ClosingCallbackId>,
413+
}
414+
415+
impl Drop for State {
416+
fn drop(&mut self) {
417+
if let Some(id) = self.closing_callback_id.take() {
418+
self.session.unregister_closing_callback(id);
419+
}
420+
}
412421
}
413422

414423
#[zenoh_macros::unstable]
@@ -683,8 +692,21 @@ impl<Handler> AdvancedSubscriber<Handler> {
683692
callback: callback.clone(),
684693
miss_handlers: HashMap::new(),
685694
token: None,
695+
closing_callback_id: None,
686696
}));
687697

698+
let closing_callback_id = conf
699+
.session
700+
.register_closing_callback({
701+
let statesref = statesref.clone();
702+
move || {
703+
let mut state = zlock!(statesref);
704+
state.callback = Callback::new(Arc::new(|_| ()));
705+
}
706+
})
707+
.map_err(|_| "session closed")?;
708+
zlock!(statesref).closing_callback_id = Some(closing_callback_id);
709+
688710
let sub_callback = {
689711
let statesref = statesref.clone();
690712
let session = conf.session.clone();

0 commit comments

Comments
 (0)