Skip to content

Commit 7f5fee3

Browse files
committed
Fix RAII for subscription, client, and service
Signed-off-by: Michael X. Grey <[email protected]>
1 parent c463201 commit 7f5fee3

File tree

3 files changed

+54
-10
lines changed

3 files changed

+54
-10
lines changed

rclrs/src/client.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use rosidl_runtime_rs::Message;
1111
use crate::{
1212
error::{RclReturnCode, ToResult},
1313
rcl_bindings::*,
14-
MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
14+
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
1515
};
1616

1717
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -76,14 +76,17 @@ where
7676
pub(crate) handle: Arc<ClientHandle>,
7777
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
7878
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
79+
/// Ensure the parent node remains alive as long as the subscription is held.
80+
/// This implementation will change in the future.
81+
node: Arc<Node>,
7982
}
8083

8184
impl<T> Client<T>
8285
where
8386
T: rosidl_runtime_rs::Service,
8487
{
8588
/// Creates a new client.
86-
pub(crate) fn new(node_handle: Arc<NodeHandle>, topic: &str) -> Result<Self, RclrsError>
89+
pub(crate) fn new(node: &Arc<Node>, topic: &str) -> Result<Self, RclrsError>
8790
// This uses pub(crate) visibility to avoid instantiating this struct outside
8891
// [`Node::create_client`], see the struct's documentation for the rationale
8992
where
@@ -102,7 +105,7 @@ where
102105
let client_options = unsafe { rcl_client_get_default_options() };
103106

104107
{
105-
let rcl_node = node_handle.rcl_node.lock().unwrap();
108+
let rcl_node = node.handle.rcl_node.lock().unwrap();
106109
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
107110

108111
// SAFETY:
@@ -136,6 +139,7 @@ where
136139
futures: Arc::new(Mutex::new(
137140
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
138141
)),
142+
node: Arc::clone(node),
139143
})
140144
}
141145

rclrs/src/service.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use rosidl_runtime_rs::Message;
99
use crate::{
1010
error::{RclReturnCode, ToResult},
1111
rcl_bindings::*,
12-
MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
12+
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
1313
};
1414

1515
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -73,6 +73,9 @@ where
7373
pub(crate) handle: Arc<ServiceHandle>,
7474
/// The callback function that runs when a request was received.
7575
pub callback: Mutex<ServiceCallback<T::Request, T::Response>>,
76+
/// Ensure the parent node remains alive as long as the subscription is held.
77+
/// This implementation will change in the future.
78+
node: Arc<Node>,
7679
}
7780

7881
impl<T> Service<T>
@@ -81,7 +84,7 @@ where
8184
{
8285
/// Creates a new service.
8386
pub(crate) fn new<F>(
84-
node_handle: Arc<NodeHandle>,
87+
node: &Arc<Node>,
8588
topic: &str,
8689
callback: F,
8790
) -> Result<Self, RclrsError>
@@ -104,7 +107,7 @@ where
104107
let service_options = unsafe { rcl_service_get_default_options() };
105108

106109
{
107-
let rcl_node = node_handle.rcl_node.lock().unwrap();
110+
let rcl_node = node.handle.rcl_node.lock().unwrap();
108111
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
109112
unsafe {
110113
// SAFETY:
@@ -133,6 +136,7 @@ where
133136

134137
Ok(Self {
135138
handle,
139+
node: Arc::clone(node),
136140
callback: Mutex::new(Box::new(callback)),
137141
})
138142
}

rclrs/src/subscription.rs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
error::{RclReturnCode, ToResult},
1111
qos::QoSProfile,
1212
rcl_bindings::*,
13-
NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
13+
Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
1414
};
1515

1616
mod callback;
@@ -31,7 +31,7 @@ unsafe impl Send for rcl_subscription_t {}
3131
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
3232
pub struct SubscriptionHandle {
3333
rcl_subscription: Mutex<rcl_subscription_t>,
34-
node_handle: Arc<NodeHandle>,
34+
node_handle: Arc<Node>,
3535
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
3636
}
3737

@@ -84,6 +84,9 @@ where
8484
pub(crate) handle: Arc<SubscriptionHandle>,
8585
/// The callback function that runs when a message was received.
8686
pub callback: Mutex<AnySubscriptionCallback<T>>,
87+
/// Ensure the parent node remains alive as long as the subscription is held.
88+
/// This implementation will change in the future.
89+
node: Arc<Node>,
8790
message: PhantomData<T>,
8891
}
8992

@@ -93,7 +96,7 @@ where
9396
{
9497
/// Creates a new subscription.
9598
pub(crate) fn new<Args>(
96-
node_handle: Arc<NodeHandle>,
99+
node: &Arc<Node>,
97100
topic: &str,
98101
qos: QoSProfile,
99102
callback: impl SubscriptionCallback<T, Args>,
@@ -117,7 +120,7 @@ where
117120
subscription_options.qos = qos.into();
118121

119122
{
120-
let rcl_node = node_handle.rcl_node.lock().unwrap();
123+
let rcl_node = node.handle.rcl_node.lock().unwrap();
121124
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
122125
unsafe {
123126
// SAFETY:
@@ -146,6 +149,7 @@ where
146149
Ok(Self {
147150
handle,
148151
callback: Mutex::new(callback.into_callback()),
152+
node: Arc::clone(node),
149153
message: PhantomData,
150154
})
151155
}
@@ -396,4 +400,36 @@ mod tests {
396400
);
397401
Ok(())
398402
}
403+
404+
#[test]
405+
fn test_node_subscription_raii() {
406+
use rclrs::*;
407+
use std::sync::atomic::Ordering;
408+
409+
let executor = Context::default().create_basic_executor();
410+
411+
let triggered = Arc::new(AtomicBool::new(false));
412+
let callback = |_| {
413+
triggered.store(true, Ordering::AcqRel);
414+
};
415+
416+
let (subscription, publisher) = {
417+
let node = executor
418+
.create_node(&format!("test_node_subscription_raii_{}", line!()))
419+
.unwrap();
420+
421+
let qos = QoSProfile::default().keep_all().reliable();
422+
let subscription = node.create_subscription::<msg::Empty>("test_topic", qos, callback).unwrap();
423+
let publisher = node.create_publisher::<msg::Empty>("test_topic", qos).unwrap();
424+
425+
(subscription, publisher)
426+
};
427+
428+
publisher.publish(msg::Empty {});
429+
let start_time = std::time::Instant::now();
430+
while !triggered.load(Ordering::AcqRel) {
431+
assert!(executor.spin(SpinOptions::spin_once()).is_empty());
432+
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
433+
}
434+
}
399435
}

0 commit comments

Comments
 (0)