Skip to content

Commit 86e3007

Browse files
committed
Add TakerSubscription
1 parent dba36cc commit 86e3007

File tree

2 files changed

+295
-42
lines changed

2 files changed

+295
-42
lines changed

rclrs/src/node.rs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use crate::{
3434
IntoNodeSubscriptionCallback, LogParams, Logger, ParameterBuilder, ParameterInterface,
3535
ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError,
3636
Service, ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState,
37-
TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX,
37+
TakerSubscription, TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState,
38+
ENTITY_LIFECYCLE_MUTEX,
3839
};
3940

4041
/// A processing unit that can communicate with other nodes. See the API of
@@ -785,6 +786,85 @@ impl NodeState {
785786
)
786787
}
787788

789+
/// Creates a [`TakerSubscription`].
790+
///
791+
/// # Behavior
792+
///
793+
/// This subscription uses no callback and calling [`spin`][1] on the
794+
/// node's executor will have no effect, nor is it required to receive
795+
/// messages.
796+
///
797+
/// In order to receive messages, use [`take`][2] or one of its variants.
798+
///
799+
/// ```no_run
800+
/// # use rclrs::*;
801+
/// # let executor = Context::default().create_basic_executor();
802+
/// # let node = executor.create_node("my_node").unwrap();
803+
/// use example_interfaces::msg::String as StringMsg;
804+
///
805+
/// let subscription =
806+
/// node.create_taker_subscription::<StringMsg>("topic".keep_last(1))?;
807+
///
808+
/// loop {
809+
/// if let Some(msg) = subscription.take()? {
810+
/// println!("{}", msg.data);
811+
/// }
812+
/// std::thread::sleep(std::time::Duration::from_millis(100));
813+
/// }
814+
/// # Ok::<(), RclrsError>(())
815+
/// ```
816+
///
817+
/// [TakerSubscription]s can also be used in a [`WaitSet`][3] to wait for
818+
/// messages from one or more subscriptions.
819+
///
820+
/// ```no_run
821+
/// # use rclrs::*;
822+
/// # let context = Context::default();
823+
/// # let executor = context.create_basic_executor();
824+
/// # let node = executor.create_node("my_node").unwrap();
825+
/// use std::sync::Arc;
826+
/// use example_interfaces::msg::String as StringMsg;
827+
///
828+
/// let subscription =
829+
/// Arc::new(node.create_taker_subscription::<StringMsg>("topic")?);
830+
///
831+
/// // `_lifecycle` must be named to avoid being dropped, which would cause
832+
/// // the waitable to be dropped from the WaitSet.
833+
/// let (waitable, _lifecycle) =
834+
/// Waitable::new(Box::new(Arc::clone(&subscription)), None);
835+
///
836+
/// let mut waitset = WaitSet::new(&context)?;
837+
/// waitset.add([waitable])?;
838+
///
839+
/// loop {
840+
/// waitset.wait(None, |_| Ok(()))?;
841+
///
842+
/// if let Some(msg) = subscription.take()? {
843+
/// println!("{}", msg.data);
844+
/// }
845+
/// }
846+
/// # Ok::<(), RclrsError>(())
847+
/// ```
848+
///
849+
/// # Subscription Options
850+
///
851+
/// See [`create_subscription`][4] for examples
852+
/// of setting the subscription options.
853+
///
854+
/// [1]: crate::Executor::spin
855+
/// [2]: crate::TakerSubscription::take
856+
/// [3]: crate::WaitSet
857+
/// [4]: crate::NodeState::create_subscription
858+
pub fn create_taker_subscription<'a, T>(
859+
&self,
860+
options: impl Into<SubscriptionOptions<'a>>,
861+
) -> Result<TakerSubscription<T>, RclrsError>
862+
where
863+
T: Message,
864+
{
865+
TakerSubscription::create(options, &self.handle)
866+
}
867+
788868
/// Creates a [`Subscription`] with an async callback.
789869
///
790870
/// # Behavior

rclrs/src/subscription.rs

Lines changed: 214 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use std::{
22
any::Any,
33
ffi::{CStr, CString},
4+
marker::PhantomData,
45
sync::{Arc, Mutex, MutexGuard},
56
};
67

78
use rosidl_runtime_rs::{Message, RmwMessage};
89

910
use crate::{
1011
error::ToResult, qos::QoSProfile, rcl_bindings::*, IntoPrimitiveOptions, Node, NodeHandle,
11-
RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable, WaitableLifecycle,
12-
WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
12+
RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclReturnCode, RclrsError, Waitable,
13+
WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
1314
};
1415

1516
mod any_subscription_callback;
@@ -117,47 +118,9 @@ where
117118
node_handle: &Arc<NodeHandle>,
118119
commands: &Arc<WorkerCommands>,
119120
) -> Result<Arc<Self>, RclrsError> {
120-
let SubscriptionOptions { topic, qos } = options.into();
121121
let callback = Arc::new(Mutex::new(callback));
122122

123-
// SAFETY: Getting a zero-initialized value is always safe.
124-
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
125-
let type_support =
126-
<T as Message>::RmwMsg::get_type_support() as *const rosidl_message_type_support_t;
127-
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
128-
err,
129-
s: topic.into(),
130-
})?;
131-
132-
// SAFETY: No preconditions for this function.
133-
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
134-
rcl_subscription_options.qos = qos.into();
135-
136-
{
137-
let rcl_node = node_handle.rcl_node.lock().unwrap();
138-
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
139-
unsafe {
140-
// SAFETY:
141-
// * The rcl_subscription is zero-initialized as mandated by this function.
142-
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription.
143-
// * The topic name and the options are copied by this function, so they can be dropped afterwards.
144-
// * The entity lifecycle mutex is locked to protect against the risk of global
145-
// variables in the rmw implementation being unsafely modified during cleanup.
146-
rcl_subscription_init(
147-
&mut rcl_subscription,
148-
&*rcl_node,
149-
type_support,
150-
topic_c_string.as_ptr(),
151-
&rcl_subscription_options,
152-
)
153-
.ok()?;
154-
}
155-
}
156-
157-
let handle = Arc::new(SubscriptionHandle {
158-
rcl_subscription: Mutex::new(rcl_subscription),
159-
node_handle: Arc::clone(node_handle),
160-
});
123+
let handle = SubscriptionHandle::create::<T>(options, node_handle)?;
161124

162125
let (waitable, lifecycle) = Waitable::new(
163126
Box::new(SubscriptionExecutable {
@@ -292,6 +255,52 @@ struct SubscriptionHandle {
292255
}
293256

294257
impl SubscriptionHandle {
258+
fn create<'a, T: Message>(
259+
options: impl Into<SubscriptionOptions<'a>>,
260+
node_handle: &Arc<NodeHandle>,
261+
) -> Result<Arc<Self>, RclrsError> {
262+
let SubscriptionOptions { topic, qos } = options.into();
263+
264+
// SAFETY: Getting a zero-initialized value is always safe.
265+
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
266+
let type_support =
267+
<T as Message>::RmwMsg::get_type_support() as *const rosidl_message_type_support_t;
268+
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
269+
err,
270+
s: topic.into(),
271+
})?;
272+
273+
// SAFETY: No preconditions for this function.
274+
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
275+
rcl_subscription_options.qos = qos.into();
276+
277+
{
278+
let rcl_node = node_handle.rcl_node.lock().unwrap();
279+
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
280+
unsafe {
281+
// SAFETY:
282+
// * The rcl_subscription is zero-initialized as mandated by this function.
283+
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription.
284+
// * The topic name and the options are copied by this function, so they can be dropped afterwards.
285+
// * The entity lifecycle mutex is locked to protect against the risk of global
286+
// variables in the rmw implementation being unsafely modified during cleanup.
287+
rcl_subscription_init(
288+
&mut rcl_subscription,
289+
&*rcl_node,
290+
type_support,
291+
topic_c_string.as_ptr(),
292+
&rcl_subscription_options,
293+
)
294+
.ok()?;
295+
}
296+
}
297+
298+
Ok(Arc::new(Self {
299+
rcl_subscription: Mutex::new(rcl_subscription),
300+
node_handle: Arc::clone(node_handle),
301+
}))
302+
}
303+
295304
fn lock(&self) -> MutexGuard<rcl_subscription_t> {
296305
self.rcl_subscription.lock().unwrap()
297306
}
@@ -408,6 +417,109 @@ impl Drop for SubscriptionHandle {
408417
}
409418
}
410419

420+
/// Struct for receiving messages of type `T` without a callback.
421+
///
422+
/// Create a subscription using [`NodeState::create_taker_subscription()`][1].
423+
///
424+
/// Calling [`spin`][2] on the node's executor will have no effect on this subscription.
425+
///
426+
/// When a subscription is created, it may take some time to get "matched" with a corresponding
427+
/// publisher.
428+
///
429+
/// [1]: crate::NodeState::create_taker_subscription
430+
/// [2]: crate::Executor::spin
431+
pub struct TakerSubscription<T: Message> {
432+
handle: Arc<SubscriptionHandle>,
433+
_phantom: PhantomData<T>,
434+
}
435+
436+
impl<T: Message> TakerSubscription<T> {
437+
/// Used by [`Node`][crate::Node] to create a new taker subscription.
438+
pub(crate) fn create<'a>(
439+
options: impl Into<SubscriptionOptions<'a>>,
440+
node_handle: &Arc<NodeHandle>,
441+
) -> Result<Self, RclrsError> {
442+
let handle = SubscriptionHandle::create::<T>(options, node_handle)?;
443+
444+
Ok(Self {
445+
handle,
446+
_phantom: PhantomData,
447+
})
448+
}
449+
450+
/// Fetches a new message.
451+
///
452+
/// When there is no new message, this will return `Ok(None)`.
453+
pub fn take(&self) -> Result<Option<T>, RclrsError> {
454+
self.take_with_info().map(|res| res.map(|msg| msg.0))
455+
}
456+
457+
/// Fetches a new message and its associated [`MessageInfo`][1].
458+
///
459+
/// When there is no new message, this will return `Ok(None)`.
460+
///
461+
/// [1]: crate::MessageInfo
462+
pub fn take_with_info(&self) -> Result<Option<(T, MessageInfo)>, RclrsError> {
463+
match self.handle.take() {
464+
Ok(msg) => Ok(Some(msg)),
465+
Err(RclrsError::RclError {
466+
code: RclReturnCode::SubscriptionTakeFailed,
467+
..
468+
}) => Ok(None),
469+
Err(e) => Err(e),
470+
}
471+
}
472+
473+
/// Obtains a read-only handle to a message owned by the middleware.
474+
///
475+
/// When there is no new message, this will return `Ok(None)`.
476+
///
477+
/// This is the counterpart to [`Publisher::borrow_loaned_message()`][1]. See its documentation
478+
/// for more information.
479+
///
480+
/// [1]: crate::Publisher::borrow_loaned_message
481+
pub fn take_loaned(&self) -> Result<Option<ReadOnlyLoanedMessage<T>>, RclrsError> {
482+
self.take_loaned_with_info().map(|res| res.map(|msg| msg.0))
483+
}
484+
485+
/// Obtains a read-only handle to a message owned by the middleware and its associated
486+
/// [`MessageInfo`][1].
487+
///
488+
/// When there is no new message, this will return `Ok(None)`.
489+
///
490+
/// This is the counterpart to [`Publisher::borrow_loaned_message()`][2]. See its documentation
491+
/// for more information.
492+
///
493+
/// [1]: crate::MessageInfo
494+
/// [2]: crate::Publisher::borrow_loaned_message
495+
pub fn take_loaned_with_info(
496+
&self,
497+
) -> Result<Option<(ReadOnlyLoanedMessage<T>, MessageInfo)>, RclrsError> {
498+
match self.handle.take_loaned() {
499+
Ok(msg) => Ok(Some(msg)),
500+
Err(RclrsError::RclError {
501+
code: RclReturnCode::SubscriptionTakeFailed,
502+
..
503+
}) => Ok(None),
504+
Err(e) => Err(e),
505+
}
506+
}
507+
}
508+
509+
impl<T: Message> RclPrimitive for Arc<TakerSubscription<T>> {
510+
unsafe fn execute(&mut self, _payload: &mut dyn Any) -> Result<(), RclrsError> {
511+
Ok(())
512+
}
513+
514+
fn kind(&self) -> RclPrimitiveKind {
515+
RclPrimitiveKind::Subscription
516+
}
517+
518+
fn handle(&self) -> RclPrimitiveHandle {
519+
RclPrimitiveHandle::Subscription(self.handle.lock())
520+
}
521+
}
522+
411523
#[cfg(test)]
412524
mod tests {
413525
use super::*;
@@ -520,4 +632,65 @@ mod tests {
520632
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
521633
}
522634
}
635+
636+
#[test]
637+
fn test_taker_subscription() -> Result<(), RclrsError> {
638+
use crate::*;
639+
use std::time::Duration;
640+
641+
let context = Context::default();
642+
let executor = context.create_basic_executor();
643+
let node = executor.create_node("test_node_taker_subscription")?;
644+
645+
let publisher = node.create_publisher::<msg::Empty>("test_topic")?;
646+
let subscriber = Arc::new(node.create_taker_subscription::<msg::Empty>("test_topic")?);
647+
648+
let (waitable, _lifecycle) = Waitable::new(Box::new(Arc::clone(&subscriber)), None);
649+
let mut waitset = WaitSet::new(&context)?;
650+
waitset.add([waitable])?;
651+
652+
let timeout = Some(Duration::from_millis(100));
653+
654+
publisher.publish(msg::Empty::default())?;
655+
waitset.wait(timeout, |_| Ok(()))?;
656+
657+
assert!(subscriber.take()?.is_some());
658+
659+
assert!(subscriber.take()?.is_none());
660+
assert!(subscriber.take_with_info()?.is_none());
661+
assert!(subscriber.take_loaned()?.is_none());
662+
assert!(subscriber.take_loaned_with_info()?.is_none());
663+
664+
publisher.publish(msg::Empty::default())?;
665+
waitset.wait(timeout, |_| Ok(()))?;
666+
667+
assert!(subscriber.take_with_info()?.is_some());
668+
669+
assert!(subscriber.take()?.is_none());
670+
assert!(subscriber.take_with_info()?.is_none());
671+
assert!(subscriber.take_loaned()?.is_none());
672+
assert!(subscriber.take_loaned_with_info()?.is_none());
673+
674+
publisher.publish(msg::Empty::default())?;
675+
waitset.wait(timeout, |_| Ok(()))?;
676+
677+
assert!(subscriber.take_loaned()?.is_some());
678+
679+
assert!(subscriber.take()?.is_none());
680+
assert!(subscriber.take_with_info()?.is_none());
681+
assert!(subscriber.take_loaned()?.is_none());
682+
assert!(subscriber.take_loaned_with_info()?.is_none());
683+
684+
publisher.publish(msg::Empty::default())?;
685+
waitset.wait(timeout, |_| Ok(()))?;
686+
687+
assert!(subscriber.take_loaned_with_info()?.is_some());
688+
689+
assert!(subscriber.take()?.is_none());
690+
assert!(subscriber.take_with_info()?.is_none());
691+
assert!(subscriber.take_loaned()?.is_none());
692+
assert!(subscriber.take_loaned_with_info()?.is_none());
693+
694+
Ok(())
695+
}
523696
}

0 commit comments

Comments
 (0)