Skip to content

Commit c4e0bed

Browse files
Add docs, cleanup tests
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent eb5aeeb commit c4e0bed

File tree

5 files changed

+191
-8
lines changed

5 files changed

+191
-8
lines changed

rclrs/src/dynamic_message/dynamic_publisher.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ use crate::error::{RclrsError, ToResult};
99
use crate::rcl_bindings::*;
1010
use crate::{NodeHandle, PublisherHandle, PublisherOptions, ENTITY_LIFECYCLE_MUTEX};
1111

12+
/// Struct for sending dynamic messages.
13+
///
14+
/// Create a dynamic publisher using [`Node::create_dynamic_publisher`][1].
15+
/// Refer to [`crate::Publisher`] for details of the behavior.
16+
///
17+
/// [1]: crate::NodeState::create_dynamic_publisher
1218
pub type DynamicPublisher = Arc<DynamicPublisherState>;
1319

1420
/// Struct for sending messages of type `T`.
@@ -158,17 +164,17 @@ mod tests {
158164
let graph = construct_test_graph(namespace)?;
159165

160166
let node_1_empty_publisher = graph.node1.create_dynamic_publisher(
161-
"test_msgs/msg/Empty".try_into().unwrap(),
167+
"test_msgs/msg/Empty".try_into()?,
162168
"graph_test_topic_1",
163169
)?;
164170
let topic1 = node_1_empty_publisher.topic_name();
165171
let node_1_basic_types_publisher = graph.node1.create_dynamic_publisher(
166-
"test_msgs/msg/BasicTypes".try_into().unwrap(),
172+
"test_msgs/msg/BasicTypes".try_into()?,
167173
"graph_test_topic_2",
168174
)?;
169175
let topic2 = node_1_basic_types_publisher.topic_name();
170176
let node_2_default_publisher = graph.node2.create_dynamic_publisher(
171-
"test_msgs/msg/Defaults".try_into().unwrap(),
177+
"test_msgs/msg/Defaults".try_into()?,
172178
"graph_test_topic_3",
173179
)?;
174180
let topic3 = node_2_default_publisher.topic_name();

rclrs/src/dynamic_message/dynamic_subscription.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,30 @@ use crate::{
1717
Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
1818
};
1919

20+
/// Struct for receiving messages whose type is not know at compile time.
21+
///
22+
/// Create a dynamic subscription using [`NodeState::create_dynamic_subscription()`][1]
23+
/// or [`NodeState::create_async_dynamic_subscription`][2].
24+
///
25+
/// There can be multiple subscriptions for the same topic, in different nodes or the same node.
26+
/// A clone of a `Subscription` will refer to the same subscription instance as the original.
27+
/// The underlying instance is tied to [`DynamicSubscriptionState`] which implements the [`DynamicSubscription`] API.
28+
///
29+
/// Receiving messages requires the node's executor to [spin][3].
30+
///
31+
/// When a subscription is created, it may take some time to get "matched" with a corresponding
32+
/// publisher.
33+
///
34+
/// [1]: crate::NodeState::create_subscription
35+
/// [2]: crate::NodeState::create_async_subscription
36+
/// [3]: crate::Executor::spin
2037
pub type DynamicSubscription = Arc<DynamicSubscriptionState<Node>>;
2138

39+
/// A [`DynamicSubscription`] that runs on a [`Worker`].
40+
///
41+
/// Create a worker dynamic subscription using [`WorkerState::create_dynamic_subscription`][1].
42+
///
43+
/// [1]: crate::WorkerState::create_dynamic_subscription
2244
pub type WorkerDynamicSubscription<Payload> = Arc<DynamicSubscriptionState<Worker<Payload>>>;
2345

2446
struct DynamicSubscriptionExecutable<Payload> {
@@ -165,7 +187,7 @@ impl<Payload: 'static> DynamicSubscriptionCallback<Payload> {
165187
}
166188

167189
impl<Payload> DynamicSubscriptionExecutable<Payload> {
168-
pub fn take(&self) -> Result<(DynamicMessage, MessageInfo), RclrsError> {
190+
fn take(&self) -> Result<(DynamicMessage, MessageInfo), RclrsError> {
169191
let mut dynamic_message = self.metadata.create()?;
170192
let rmw_message = dynamic_message.storage.as_mut_ptr();
171193
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
@@ -357,20 +379,20 @@ mod tests {
357379
let graph = construct_test_graph(namespace)?;
358380

359381
let node_2_empty_subscription = graph.node2.create_dynamic_subscription::<_>(
360-
"test_msgs/msg/Empty".try_into().unwrap(),
382+
"test_msgs/msg/Empty".try_into()?,
361383
"graph_test_topic_1",
362384
|_, _| {},
363385
)?;
364386
let topic1 = node_2_empty_subscription.topic_name();
365387
let node_2_basic_types_subscription = graph.node2.create_dynamic_subscription::<_>(
366-
"test_msgs/msg/BasicTypes".try_into().unwrap(),
388+
"test_msgs/msg/BasicTypes".try_into()?,
367389
"graph_test_topic_2",
368390
|_, _| {},
369391
)?;
370392
let topic2 = node_2_basic_types_subscription.topic_name();
371393

372394
let node_1_defaults_subscription = graph.node1.create_dynamic_subscription::<_>(
373-
"test_msgs/msg/Defaults".try_into().unwrap(),
395+
"test_msgs/msg/Defaults".try_into()?,
374396
"graph_test_topic_3",
375397
|_, _| {},
376398
)?;

rclrs/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ mod rcl_bindings;
200200

201201
#[cfg(feature = "dyn_msg")]
202202
pub mod dynamic_message;
203+
#[cfg(feature = "dyn_msg")]
204+
pub use dynamic_message::*;
203205

204206
pub use arguments::*;
205207
pub use client::*;

rclrs/src/node.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,22 @@ impl NodeState {
403403
PublisherState::<T>::create(options, Arc::clone(&self.handle))
404404
}
405405

406+
/// Creates a [`DynamicPublisher`], a publisher whose type is only known at runtime.
407+
///
408+
/// Refer to [`Node::create_publisher`] for the API, the only key difference is that the
409+
/// publisher's message type is passed as a [`crate::MessageTypeName`] parameter.
410+
///
411+
/// Pass in only the topic name for the `options` argument to use all default publisher options:
412+
/// ```
413+
/// # use rclrs::*;
414+
/// # let executor = Context::default().create_basic_executor();
415+
/// # let node = executor.create_node("my_node").unwrap();
416+
/// let publisher = node.create_dynamic_publisher(
417+
/// "test_msgs/msg/Empty".try_into().unwrap(),
418+
/// "my_topic"
419+
/// .keep_last(100)
420+
/// )
421+
/// .unwrap();
406422
#[cfg(feature = "dyn_msg")]
407423
pub fn create_dynamic_publisher<'a>(
408424
&self,
@@ -803,6 +819,42 @@ impl NodeState {
803819
)
804820
}
805821

822+
/// Creates a [`DynamicSubscription`] with an ordinary callback.
823+
///
824+
/// For the behavior and API refer to [`Node::create_subscription`], except two key
825+
/// differences:
826+
///
827+
/// - The message type is determined at runtime through the `topic_type` function parameter.
828+
/// - Only one type of callback is supported (returning both [`crate::DynamicMessage`] and
829+
/// [`crate::MessageInfo`]).
830+
///
831+
/// # Message type passing
832+
///
833+
/// The message type can be passed as a [`crate::MessageTypeName`] struct. The struct also implements `TryFrom<&str>`
834+
/// ```
835+
/// # use rclrs::*;
836+
/// # let executor = Context::default().create_basic_executor();
837+
/// # let node = executor.create_node("my_node").unwrap();
838+
/// let subscription = node.create_dynamic_subscription(
839+
/// MessageTypeName {
840+
/// package_name: "test_msgs".to_owned(),
841+
/// type_name: "Empty".to_owned(),
842+
/// },
843+
/// "my_topic"
844+
/// .transient_local(),
845+
/// |_msg: DynamicMessage, _info: MessageInfo| {
846+
/// println!("Received message!");
847+
/// },
848+
/// );
849+
///
850+
/// let subscription = node.create_dynamic_subscription(
851+
/// "test_msgs/msg/Empty".try_into().unwrap(),
852+
/// "my_topic",
853+
/// |_msg: DynamicMessage, _info: MessageInfo| {
854+
/// println!("Received message!");
855+
/// },
856+
/// );
857+
/// ```
806858
#[cfg(feature = "dyn_msg")]
807859
pub fn create_dynamic_subscription<'a, F>(
808860
&self,
@@ -822,6 +874,60 @@ impl NodeState {
822874
)
823875
}
824876

877+
/// Creates a [`DynamicSubscription`] with an async callback.
878+
///
879+
/// For the behavior and API refer to [`Node::create_async_subscription`], except two key
880+
/// differences:
881+
///
882+
/// - The message type is determined at runtime through the `topic_type` function parameter.
883+
/// - Only one type of callback is supported (returning both [`crate::DynamicMessage`] and
884+
/// [`crate::MessageInfo`].
885+
///
886+
/// # Message type passing
887+
///
888+
/// The message type can be passed as a [`crate::MessageTypeName`] struct. The struct also implements `TryFrom<&str>`
889+
/// ```
890+
/// # use rclrs::*;
891+
/// # let executor = Context::default().create_basic_executor();
892+
/// # let node = executor.create_node("my_node").unwrap();
893+
/// use std::sync::Arc;
894+
///
895+
/// let count_worker = node.create_worker(0_usize);
896+
/// let data_worker = node.create_worker(String::new());
897+
///
898+
/// let service = node.create_async_dynamic_subscription(
899+
/// "example_interfaces/msg/String".try_into()?,
900+
/// "topic",
901+
/// move |msg: DynamicMessage, _info: MessageInfo| {
902+
/// // Clone the workers so they can be captured into the async block
903+
/// let count_worker = Arc::clone(&count_worker);
904+
/// let data_worker = Arc::clone(&data_worker);
905+
/// Box::pin(async move {
906+
/// // Update the message count
907+
/// let current_count = count_worker.run(move |count: &mut usize| {
908+
/// *count += 1;
909+
/// *count
910+
/// }).await.unwrap();
911+
///
912+
/// // Change the data in the data_worker and get back the data
913+
/// // that was previously put in there.
914+
/// let previous = data_worker.run(move |data: &mut String| {
915+
/// let value = msg.get("data").unwrap();
916+
/// let Value::Simple(value) = value else {
917+
/// panic!("Unexpected value type, expected Simple value");
918+
/// };
919+
/// let SimpleValue::String(value) = value else {
920+
/// panic!("Unexpected value type, expected String");
921+
/// };
922+
/// std::mem::replace(data, value.to_string())
923+
/// }).await.unwrap();
924+
///
925+
/// println!("Current count is {current_count}, data was previously {previous}");
926+
/// })
927+
/// }
928+
/// )?;
929+
/// # Ok::<(), RclrsError>(())
930+
/// ```
825931
#[cfg(feature = "dyn_msg")]
826932
pub fn create_async_dynamic_subscription<'a, F>(
827933
&self,

rclrs/src/worker.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,45 @@ impl<Payload: 'static + Send + Sync> WorkerState<Payload> {
269269
)
270270
}
271271

272+
/// Creates a [`WorkerDynamicSubscription`], whose message type is only know at runtime.
273+
///
274+
/// Refer to ['Worker::create_subscription`] for the API and behavior except two key
275+
/// differences:
276+
///
277+
/// - The message type is determined at runtime through the `topic_type` function parameter.
278+
/// - Only one type of callback is supported (returning both [`crate::DynamicMessage`] and
279+
/// [`crate::MessageInfo`]).
280+
///
281+
/// ```
282+
/// # use rclrs::*;
283+
/// # let executor = Context::default().create_basic_executor();
284+
/// # let node = executor.create_node("my_node").unwrap();
285+
/// // The worker's payload is data that we want to share with other callbacks.
286+
/// let worker = node.create_worker::<Option<String>>(None);
287+
///
288+
/// // This variable will be the mutable internal state of the subscription
289+
/// // callback.
290+
/// let mut count = 0_usize;
291+
///
292+
/// let subscription = worker.create_dynamic_subscription(
293+
/// "example_interfaces/msg/String".try_into()?,
294+
/// "topic",
295+
/// move |data: &mut Option<String>, msg: DynamicMessage, _msg_info: MessageInfo| {
296+
/// count += 1;
297+
/// let value = msg.get("data").unwrap();
298+
/// let Value::Simple(value) = value else {
299+
/// panic!("Unexpected value type, expected Simple value");
300+
/// };
301+
/// let SimpleValue::String(value) = value else {
302+
/// panic!("Unexpected value type, expected String");
303+
/// };
304+
/// println!("#{count} | I heard: '{}'", value);
305+
///
306+
/// *data = Some(value.to_string());
307+
/// },
308+
/// )?;
309+
/// # Ok::<(), RclrsError>(())
310+
/// ```
272311
#[cfg(feature = "dyn_msg")]
273312
pub fn create_dynamic_subscription<'a, F>(
274313
&self,
@@ -535,6 +574,7 @@ mod tests {
535574
struct TestPayload {
536575
subscription_count: usize,
537576
service_count: usize,
577+
#[cfg(feature = "dyn_msg")]
538578
dynamic_subscription_count: usize,
539579
}
540580

@@ -550,6 +590,7 @@ mod tests {
550590
},
551591
);
552592

593+
#[cfg(feature = "dyn_msg")]
553594
let _count_dynamic_sub = worker.create_dynamic_subscription(
554595
"test_msgs/msg/Empty".try_into().unwrap(),
555596
"test_worker_topic",
@@ -569,8 +610,14 @@ mod tests {
569610
let promise = worker.listen_until(move |payload| {
570611
if payload.service_count > 0
571612
&& payload.subscription_count > 0
572-
&& payload.dynamic_subscription_count > 0
573613
{
614+
#[cfg(feature = "dyn_msg")]
615+
if payload.dynamic_subscription_count > 0 {
616+
Some(*payload)
617+
} else {
618+
None
619+
}
620+
#[cfg(not(feature = "dyn_msg"))]
574621
Some(*payload)
575622
} else {
576623
None

0 commit comments

Comments
 (0)