Skip to content

Commit a27bb89

Browse files
committed
Update docs for subscriptions
Signed-off-by: Michael X. Grey <[email protected]>
1 parent c8f61f0 commit a27bb89

File tree

1 file changed

+158
-59
lines changed

1 file changed

+158
-59
lines changed

rclrs/src/node.rs

Lines changed: 158 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,12 @@ impl NodeState {
401401
/// multiple times simultaneously with different incoming requests. This
402402
/// will depend on what kind of executor is running.
403403
///
404+
/// If you want to ensure that calls to this service can only run
405+
/// one-at-a-time then consider creating a [`Worker`] and using that to
406+
/// [create a service][worker-service].
407+
///
408+
/// [worker-service]: WorkerState::create_service
409+
///
404410
/// # Service Options
405411
///
406412
/// Pass in only the service name for the `options` argument to use all default service options:
@@ -480,9 +486,9 @@ impl NodeState {
480486
/// )?;
481487
/// # Ok::<(), RclrsError>(())
482488
/// ```
483-
/// To share internal state outside of the callback you will need to wrap it
484-
/// in [`Arc`] or `Arc<Mutex<S>>` and then clone the [`Arc`] before capturing
485-
/// it in the closure:
489+
/// To share the internal state outside of the callback you will need to
490+
/// wrap it in [`Arc`] or `Arc<Mutex<S>>` and then clone the [`Arc`] before
491+
/// capturing it in the closure:
486492
///
487493
/// ```
488494
/// # use rclrs::*;
@@ -551,46 +557,8 @@ impl NodeState {
551557
///
552558
/// # Service Options
553559
///
554-
/// Pass in only the service name for the `options` argument to use all default service options:
555-
/// ```
556-
/// # use rclrs::*;
557-
/// # let executor = Context::default().create_basic_executor();
558-
/// # let node = executor.create_node("my_node").unwrap();
559-
/// let service = node.create_service::<test_msgs::srv::Empty, _>(
560-
/// "my_service",
561-
/// |_request: test_msgs::srv::Empty_Request| {
562-
/// println!("Received request!");
563-
/// test_msgs::srv::Empty_Response::default()
564-
/// },
565-
/// );
566-
/// ```
567-
///
568-
/// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the
569-
/// service options:
570-
///
571-
/// ```
572-
/// # use rclrs::*;
573-
/// # let executor = Context::default().create_basic_executor();
574-
/// # let node = executor.create_node("my_node").unwrap();
575-
/// let service = node.create_service::<test_msgs::srv::Empty, _>(
576-
/// "my_service"
577-
/// .keep_all()
578-
/// .transient_local(),
579-
/// |_request: test_msgs::srv::Empty_Request| {
580-
/// println!("Received request!");
581-
/// test_msgs::srv::Empty_Response::default()
582-
/// },
583-
/// );
584-
/// ```
585-
///
586-
/// Any quality of service options that you explicitly specify will override
587-
/// the default service options. Any that you do not explicitly specify will
588-
/// remain the default service options. Note that services are generally
589-
/// expected to use [reliable][2], so it's best not to change the reliability
590-
/// setting unless you know what you are doing.
591-
///
592-
/// [1]: crate::Service
593-
/// [2]: crate::QoSReliabilityPolicy::Reliable
560+
/// See [`create_service`][NodeState::create_service] for examples of setting
561+
/// the service options.
594562
///
595563
/// # Async Service Callbacks
596564
///
@@ -608,7 +576,7 @@ impl NodeState {
608576
/// immediately return a [`Future`][5], so in many cases any internal state
609577
/// that needs to be mutated will still need to be wrapped in [`Arc`] and
610578
/// [`Mutex`] to ensure it is synchronized across multiple runs of the
611-
/// callback.
579+
/// `Future` that the callback produces.
612580
///
613581
/// However unlike the blocking callbacks that can be provided to
614582
/// [`NodeState::create_service`], callbacks for async services can take
@@ -684,7 +652,7 @@ impl NodeState {
684652
/// manage a changing state for the subscription, consider using
685653
/// [`WorkerState::create_subscription`] instead of this one.
686654
///
687-
/// # Usage
655+
/// # Subscription Options
688656
///
689657
/// Pass in only the topic name for the `options` argument to use all default subscription options:
690658
/// ```
@@ -724,8 +692,74 @@ impl NodeState {
724692
/// );
725693
/// ```
726694
///
727-
//
728-
// TODO(@mxgrey): Add examples showing each supported callback signatures
695+
/// # Subscription Callbacks
696+
///
697+
/// Subscription callbacks support six signatures:
698+
/// - [`Fn`] ( `Message` )
699+
/// - [`Fn`] ( `Message`, [`MessageInfo`][1] )
700+
/// - [`Fn`] ( [`Box`]<`Message`> )
701+
/// - [`Fn`] ( [`Box`]<`Message`>, [`MessageInfo`][1] )
702+
/// - [`Fn`] ( [`ReadOnlyLoanedMessage`][2]<`Message`> )
703+
/// - [`Fn`] ( [`ReadOnlyLoanedMessage`][2]<`Message`>, [`MessageInfo`][1] )
704+
///
705+
/// [1]: crate::MessageInfo
706+
/// [2]: crate::ReadOnlyLoanedMessage
707+
///
708+
/// All function signatures use [`Fn`] since the callback may be run
709+
/// multiple times simultaneously across different threads depending on the
710+
/// executor runtime that is being used. Because of this, any internal state
711+
/// captured into the callback that needs to be mutated will need to be
712+
/// wrapped in [`Mutex`] to ensure it is synchronized across multiple
713+
/// simultaneous runs of the callback. For example:
714+
///
715+
/// ```
716+
/// # use rclrs::*;
717+
/// # let executor = Context::default().create_basic_executor();
718+
/// # let node = executor.create_node("my_node").unwrap();
719+
/// use std::sync::Mutex;
720+
///
721+
/// let num_messages = Mutex::new(0usize);
722+
/// let subscription = node.create_subscription(
723+
/// "topic",
724+
/// move |msg: example_interfaces::msg::String| {
725+
/// let mut num = num_messages.lock().unwrap();
726+
/// *num += 1;
727+
/// println!("#{} | I heard: '{}'", *num, msg.data);
728+
/// },
729+
/// )?;
730+
/// # Ok::<(), RclrsError>(())
731+
/// ```
732+
///
733+
/// To share the internal state outside of the callback you will need to
734+
/// wrap it in [`Arc`] (or `Arc<Mutex<S>>` for mutability) and then clone
735+
/// the [`Arc`] before capturing it in the closure:
736+
///
737+
/// ```
738+
/// # use rclrs::*;
739+
/// # let executor = Context::default().create_basic_executor();
740+
/// # let node = executor.create_node("my_node").unwrap();
741+
/// use std::sync::{Arc, Mutex};
742+
///
743+
/// let data = Arc::new(Mutex::new(String::new()));
744+
///
745+
/// let data_in_subscription = Arc::clone(&data);
746+
/// let subscription = node.create_subscription(
747+
/// "topic",
748+
/// move |msg: example_interfaces::msg::String| {
749+
/// let mut data = data_in_subscription.lock().unwrap();
750+
/// *data = msg.data;
751+
/// },
752+
/// )?;
753+
///
754+
/// // TODO(@mxgrey): Replace this with a timer when timers become available
755+
/// std::thread::spawn(move || {
756+
/// loop {
757+
/// std::thread::sleep(std::time::Duration::from_secs(1));
758+
/// println!("Last message received: {}", data.lock().unwrap());
759+
/// }
760+
/// });
761+
/// # Ok::<(), RclrsError>(())
762+
/// ```
729763
pub fn create_subscription<'a, T, Args>(
730764
&self,
731765
options: impl Into<SubscriptionOptions<'a>>,
@@ -748,21 +782,86 @@ impl NodeState {
748782
///
749783
/// This callback may run in parallel with other callbacks. It may even run
750784
/// multiple times simultaneously with different incoming messages. This
751-
/// parallelism will depend on the executor that is being used. When the
752-
/// callback uses `.await`, it will not block anything else from running.
785+
/// parallelism will depend on the executor that is being used.
753786
///
754-
/// Any internal state that needs to be mutated will need to be wrapped in
755-
/// [`Mutex`] to ensure it is synchronized across multiple runs of the
756-
/// callback. To share internal state outside of the callback you will need
757-
/// to wrap it in [`Arc`] or `Arc<Mutex<S>>`. You could also consider storing
758-
/// the shared state inside of a [`Worker`] and calling [`WorkerState::run`]
759-
/// from inside this callback to interact with the state.
787+
/// The key advantage of an async subscription is that the callback can use
788+
/// `.await`, and other callbacks will be able to run concurrently without
789+
/// being blocked by this callback. You can also pass in an `async fn` as
790+
/// the callback, but in most cases you will probably need to use a closure
791+
/// that returns an `async { ... }` block so that you can capture some state
792+
/// into the closure.
793+
///
794+
/// If you don't need async language features for your callback, then
795+
/// consider using [`NodeState::create_subscription`] or
796+
/// [`WorkerState::create_subscription`].
797+
///
798+
/// # Subscription Options
799+
///
800+
/// See [`create_subscription`][NodeState::create_subscription] for examples
801+
/// of setting the subscription options.
802+
///
803+
/// # Async Subscription Callbacks
804+
///
805+
/// Async subscription callbacks support six signatures:
806+
/// - [`FnMut`] ( `Message` ) -> impl [`Future`][1]<Output=()>
807+
/// - [`FnMut`] ( `Message`, [`MessageInfo`][2] ) -> impl [`Future`][1]<Output=()>
808+
/// - [`FnMut`] ( [`Box`]<`Message`> ) -> impl [`Future`][1]<Output=()>
809+
/// - [`FnMut`] ( [`Box`]<`Message`>, [`MessageInfo`][2] ) -> impl [`Future`][1]<Output=()>
810+
/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`][3]<`Message`> ) -> impl [`Future`][1]<Output=()>
811+
/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`][3]<`Message`>, [`MessageInfo`][2] ) -> impl [`Future`][1]<Output=()>
812+
///
813+
/// [1]: std::future::Future
814+
/// [2]: crate::MessageInfo
815+
/// [3]: crate::ReadOnlyLoanedMessage
816+
///
817+
/// In this case the closure can be [`FnMut`], allowing internal state to be
818+
/// mutable, but it should be noted that the function is expected to
819+
/// immediately return a [`Future`][1], so in many cases any internal state
820+
/// that needs to be mutable will still need to be wrapped in [`Arc`] and
821+
/// [`Mutex`] to ensure it is synchronized across mutliple runs of the
822+
/// `Future` that the callback produces.
823+
///
824+
/// However unlike the blocking callbacks that can be provided to
825+
/// [`NodeState::create_subscription`], callbacks for async subscriptions
826+
/// can take advantage of `.await`. This allows you to capture [`Client`]s
827+
/// or [`Worker`]s into the closure, run tasks on them, and await the
828+
/// outcome. This allows one async subscription to share state data across
829+
/// multiple workers.
760830
///
761-
/// # Usage
831+
/// ```
832+
/// # use rclrs::*;
833+
/// # let executor = Context::default().create_basic_executor();
834+
/// # let node = executor.create_node("my_node").unwrap();
835+
/// use std::sync::Arc;
836+
///
837+
/// let count_worker = node.create_worker(0_usize);
838+
/// let data_worker = node.create_worker(String::new());
762839
///
763-
/// See [create_subscription][NodeState::create_subscription] for usage.
764-
//
765-
// TODO(@mxgrey): Add examples showing each supported signature
840+
/// let service = node.create_async_subscription::<example_interfaces::msg::String, _>(
841+
/// "topic",
842+
/// move |msg: example_interfaces::msg::String| {
843+
/// // Clone the workers so they can be captured into the async block
844+
/// let count_worker = Arc::clone(&count_worker);
845+
/// let data_worker = Arc::clone(&data_worker);
846+
/// async move {
847+
/// // Update the message count
848+
/// let current_count = count_worker.run(move |count: &mut usize| {
849+
/// *count += 1;
850+
/// *count
851+
/// }).await.unwrap();
852+
///
853+
/// // Change the data in the data_worker and get back the data
854+
/// // that was previously put in there.
855+
/// let previous = data_worker.run(move |data: &mut String| {
856+
/// std::mem::replace(data, msg.data)
857+
/// }).await.unwrap();
858+
///
859+
/// println!("Current count is {current_count}, data was previously {previous}");
860+
/// }
861+
/// }
862+
/// )?;
863+
/// # Ok::<(), RclrsError>(())
864+
/// ```
766865
pub fn create_async_subscription<'a, T, Args>(
767866
&self,
768867
options: impl Into<SubscriptionOptions<'a>>,

0 commit comments

Comments
 (0)