Skip to content

Commit aaadbe4

Browse files
Initial stab, builds docs
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent 9263652 commit aaadbe4

File tree

3 files changed

+120
-86
lines changed

3 files changed

+120
-86
lines changed

rclrs/src/dynamic_message/dynamic_subscription.rs

Lines changed: 60 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,23 @@ use super::{
99
};
1010
use crate::rcl_bindings::*;
1111
use crate::{
12-
Node, QoSProfile, RclReturnCode, RclrsError, SubscriptionBase, SubscriptionHandle, ToResult,
12+
Node, QoSProfile, RclReturnCode, RclrsError, ToResult, NodeHandle, WorkerCommands, WaitableLifecycle, SubscriptionHandle,
1313
};
1414

1515
/// Struct for receiving messages whose type is only known at runtime.
1616
pub struct DynamicSubscription {
17-
pub(crate) handle: Arc<SubscriptionHandle>,
18-
/// The callback function that runs when a message was received.
17+
/// This handle is used to access the data that rcl holds for this subscription.
18+
handle: Arc<SubscriptionHandle>,
19+
/// This allows us to replace the callback in the subscription task.
20+
///
21+
/// Holding onto this sender will keep the subscription task alive. Once
22+
/// this sender is dropped, the subscription task will end itself.
23+
// callback: Arc<Mutex<AnySubscriptionCallback<T, Scope::Payload>>>,
1924
pub callback: Mutex<Box<dyn FnMut(DynamicMessage) + 'static + Send>>,
25+
/// Holding onto this keeps the waiter for this subscription alive in the
26+
/// wait set of the executor.
27+
#[allow(unused)]
28+
lifecycle: WaitableLifecycle,
2029
metadata: DynamicMessageMetadata,
2130
// This is the regular type support library, not the introspection one.
2231
#[allow(dead_code)]
@@ -27,16 +36,18 @@ impl DynamicSubscription {
2736
/// Creates a new dynamic subscription.
2837
///
2938
/// This is not a public function, by the same rationale as `Subscription::new()`.
30-
pub(crate) fn new<F>(
31-
node: &Node,
39+
pub(crate) fn create_dynamic<F>(
3240
topic: &str,
3341
topic_type: &str,
3442
qos: QoSProfile,
3543
callback: F,
36-
) -> Result<Self, RclrsError>
44+
node_handle: &Arc<NodeHandle>,
45+
commands: &Arc<WorkerCommands>,
46+
) -> Result<Arc<Self>, RclrsError>
3747
where
3848
F: FnMut(DynamicMessage) + 'static + Send,
3949
{
50+
// TODO(luca) a lot of duplication with nomral, refactor
4051
// This loads the introspection type support library.
4152
let metadata = DynamicMessageMetadata::new(topic_type)?;
4253
// However, we also need the regular type support library –
@@ -62,38 +73,62 @@ impl DynamicSubscription {
6273
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();
6374

6475
// SAFETY: No preconditions for this function.
65-
let mut subscription_options = unsafe { rcl_subscription_get_default_options() };
66-
subscription_options.qos = qos.into();
76+
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
77+
rcl_subscription_options.qos = qos.into();
6778
// SAFETY: Getting a zero-initialized value is always safe.
6879
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
69-
unsafe {
70-
// SAFETY: The rcl_subscription is zero-initialized as expected by this function.
71-
// The rcl_node is kept alive because it is co-owned by the subscription.
72-
// The topic name and the options are copied by this function, so they can be dropped
73-
// afterwards.
74-
// TODO: type support?
75-
rcl_subscription_init(
76-
&mut rcl_subscription,
77-
rcl_node,
78-
type_support_ptr,
79-
topic_c_string.as_ptr(),
80-
&subscription_options,
81-
)
82-
.ok()?;
80+
{
81+
let rcl_node = node_handle.rcl_node.lock().unwrap();
82+
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
83+
unsafe {
84+
// SAFETY:
85+
// * The rcl_subscription is zero-initialized as mandated by this function.
86+
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription.
87+
// * The topic name and the options are copied by this function, so they can be dropped afterwards.
88+
// * The entity lifecycle mutex is locked to protect against the risk of global
89+
// variables in the rmw implementation being unsafely modified during cleanup.
90+
rcl_subscription_init(
91+
&mut rcl_subscription,
92+
&*rcl_node,
93+
type_support_ptr,
94+
topic_c_string.as_ptr(),
95+
&rcl_subscription_options,
96+
)
97+
.ok()?;
98+
}
8399
}
84100

85101
let handle = Arc::new(SubscriptionHandle {
86-
rcl_subscription_mtx: Mutex::new(rcl_subscription),
87-
rcl_node_mtx: node.rcl_node_mtx.clone(),
88-
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
102+
rcl_subscription: Mutex::new(rcl_subscription),
103+
node_handle: Arc::clone(node_handle),
89104
});
90105

106+
let (waitable, lifecycle) = Waitable::new(
107+
Box::new(SubscriptionExecutable {
108+
handle: Arc::clone(&handle),
109+
callback: Arc::clone(&callback),
110+
commands: Arc::clone(commands),
111+
}),
112+
Some(Arc::clone(commands.get_guard_condition())),
113+
);
114+
commands.add_to_wait_set(waitable);
115+
116+
Ok(Arc::new(Self {
117+
handle,
118+
callback,
119+
lifecycle,
120+
metadata,
121+
type_support_library,
122+
}))
123+
124+
/*
91125
Ok(Self {
92126
handle,
93127
callback: Mutex::new(Box::new(callback)),
94128
metadata,
95129
type_support_library,
96130
})
131+
*/
97132
}
98133

99134
/// Returns the topic name of the subscription.
@@ -158,29 +193,6 @@ impl DynamicSubscription {
158193
}
159194
}
160195

161-
impl SubscriptionBase for DynamicSubscription {
162-
fn handle(&self) -> &SubscriptionHandle {
163-
&self.handle
164-
}
165-
166-
fn execute(&self) -> Result<(), RclrsError> {
167-
let msg = match self.take() {
168-
Ok(msg) => msg,
169-
Err(RclrsError::RclError {
170-
code: RclReturnCode::SubscriptionTakeFailed,
171-
..
172-
}) => {
173-
// Spurious wakeup – this may happen even when a waitset indicated that this
174-
// subscription was ready, so it shouldn't be an error.
175-
return Ok(());
176-
}
177-
Err(e) => return Err(e),
178-
};
179-
(*self.callback.lock().unwrap())(msg);
180-
Ok(())
181-
}
182-
}
183-
184196
#[cfg(test)]
185197
mod tests {
186198
use super::*;

rclrs/src/node.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub use self::builder::*;
1212
pub use self::graph::*;
1313
=======
1414
*/
15+
use crate::QoSProfile;
1516
#[cfg(feature = "dyn_msg")]
1617
use crate::dynamic_message::{DynamicMessage, DynamicSubscription};
1718

@@ -796,6 +797,27 @@ impl NodeState {
796797
)
797798
}
798799

800+
// TODO(luca) introduce subscription options and perhaps an into callback
801+
#[cfg(feature = "dyn_msg")]
802+
pub fn create_dynamic_subscription<F>(
803+
&self,
804+
// options: impl Into<DynamicSubscriptionOptions<'a>>,
805+
topic: &str,
806+
topic_type: &str,
807+
qos: QoSProfile,
808+
callback: F,
809+
) -> Result<Arc<DynamicSubscription>, RclrsError>
810+
where
811+
F: FnMut(DynamicMessage) + 'static + Send,
812+
{
813+
let subscription = Arc::new(DynamicSubscription::new(
814+
self, topic, topic_type, qos, callback,
815+
)?);
816+
self.subscriptions
817+
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
818+
Ok(subscription)
819+
}
820+
799821
/// Creates a [`DynamicSubscription`][1].
800822
///
801823
/// [1]: crate::dynamic_message::DynamicSubscription

rclrs/src/subscription.rs

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use crate::{
1212
WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
1313
};
1414

15+
use crate::dynamic_message::DynamicMessage;
16+
1517
mod any_subscription_callback;
1618
pub use any_subscription_callback::*;
1719

@@ -33,43 +35,6 @@ pub use message_info::*;
3335
mod readonly_loaned_message;
3436
pub use readonly_loaned_message::*;
3537

36-
/*
37-
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
38-
// they are running in. Therefore, this type can be safely sent to another thread.
39-
unsafe impl Send for rcl_subscription_t {}
40-
41-
/// Internal struct used by subscriptions.
42-
pub struct SubscriptionHandle {
43-
pub(crate) rcl_subscription_mtx: Mutex<rcl_subscription_t>,
44-
pub(crate) rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
45-
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
46-
}
47-
48-
impl SubscriptionHandle {
49-
pub(crate) fn lock(&self) -> MutexGuard<rcl_subscription_t> {
50-
self.rcl_subscription_mtx.lock().unwrap()
51-
}
52-
}
53-
54-
impl Drop for SubscriptionHandle {
55-
fn drop(&mut self) {
56-
let rcl_subscription = self.rcl_subscription_mtx.get_mut().unwrap();
57-
let rcl_node = &mut *self.rcl_node_mtx.lock().unwrap();
58-
// SAFETY: No preconditions for this function (besides the arguments being valid).
59-
unsafe {
60-
rcl_subscription_fini(rcl_subscription, rcl_node);
61-
}
62-
}
63-
}
64-
65-
/// Trait to be implemented by concrete [`Subscription`]s.
66-
pub trait SubscriptionBase: Send + Sync {
67-
/// Internal function to get a reference to the `rcl` handle.
68-
fn handle(&self) -> &SubscriptionHandle;
69-
/// Tries to take a new message and run the callback with it.
70-
fn execute(&self) -> Result<(), RclrsError>;
71-
}
72-
*/
7338
mod worker_subscription_callback;
7439
pub use worker_subscription_callback::*;
7540

@@ -288,6 +253,41 @@ impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for SubscriptionOptions<'a> {
288253
}
289254
}
290255

256+
/// `SubscriptionOptions` are used by [`Node::create_subscription`][1] to initialize
257+
/// a [`Subscription`].
258+
///
259+
/// [1]: crate::NodeState::create_subscription
260+
#[derive(Debug, Clone)]
261+
#[non_exhaustive]
262+
pub struct DynamicSubscriptionOptions<'a> {
263+
/// The topic name for the subscription.
264+
pub topic: &'a str,
265+
/// The topic type for the subscription.
266+
pub topic_type: &'a str,
267+
/// The quality of service settings for the subscription.
268+
pub qos: QoSProfile,
269+
}
270+
271+
impl<'a> DynamicSubscriptionOptions<'a> {
272+
/// Initialize a new [`SubscriptionOptions`] with default settings.
273+
pub fn new(topic: &'a str, topic_type: &'a str) -> Self {
274+
Self {
275+
topic,
276+
topic_type,
277+
qos: QoSProfile::topics_default(),
278+
}
279+
}
280+
}
281+
282+
impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for DynamicSubscriptionOptions<'a> {
283+
fn from(value: T) -> Self {
284+
let primitive = value.into_primitive_options();
285+
let mut options = Self::new(primitive.name);
286+
primitive.apply_to(&mut options.qos);
287+
options
288+
}
289+
}
290+
291291
struct SubscriptionExecutable<T: Message, Payload> {
292292
handle: Arc<SubscriptionHandle>,
293293
callback: Arc<Mutex<AnySubscriptionCallback<T, Payload>>>,
@@ -323,7 +323,7 @@ unsafe impl Send for rcl_subscription_t {}
323323
/// [dropped after][1] the `rcl_subscription_t`.
324324
///
325325
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
326-
struct SubscriptionHandle {
326+
pub(crate) struct SubscriptionHandle {
327327
rcl_subscription: Mutex<rcl_subscription_t>,
328328
node_handle: Arc<NodeHandle>,
329329
}

0 commit comments

Comments
 (0)