Skip to content

Commit 60a542a

Browse files
Add async subscription API
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent 2ccec54 commit 60a542a

File tree

3 files changed

+127
-11
lines changed

3 files changed

+127
-11
lines changed

rclrs/src/dynamic_message/dynamic_subscription.rs

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,25 @@ struct DynamicSubscriptionExecutable<Payload> {
3131
// TODO(luca) consider making these enums if we want different callback types
3232
// TODO(luca) make fields private
3333
pub struct NodeDynamicSubscriptionCallback(
34-
pub Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send>,
34+
pub Box<dyn Fn(DynamicMessage, MessageInfo) + Send + Sync>,
35+
);
36+
pub struct NodeAsyncDynamicSubscriptionCallback(
37+
pub Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + Sync>,
3538
);
3639
pub struct WorkerDynamicSubscriptionCallback<Payload>(
37-
pub Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send>,
40+
pub Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + Sync>,
3841
);
3942

4043
impl Deref for NodeDynamicSubscriptionCallback {
41-
type Target = Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send>;
44+
type Target = Box<dyn Fn(DynamicMessage, MessageInfo) + 'static + Send + Sync>;
45+
fn deref(&self) -> &Self::Target {
46+
&self.0
47+
}
48+
}
49+
50+
impl Deref for NodeAsyncDynamicSubscriptionCallback {
51+
type Target =
52+
Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + Sync>;
4253
fn deref(&self) -> &Self::Target {
4354
&self.0
4455
}
@@ -50,8 +61,14 @@ impl DerefMut for NodeDynamicSubscriptionCallback {
5061
}
5162
}
5263

64+
impl DerefMut for NodeAsyncDynamicSubscriptionCallback {
65+
fn deref_mut(&mut self) -> &mut Self::Target {
66+
&mut self.0
67+
}
68+
}
69+
5370
impl<Payload> Deref for WorkerDynamicSubscriptionCallback<Payload> {
54-
type Target = Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send>;
71+
type Target = Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + Sync>;
5572
fn deref(&self) -> &Self::Target {
5673
&self.0
5774
}
@@ -63,18 +80,29 @@ impl<Payload> DerefMut for WorkerDynamicSubscriptionCallback<Payload> {
6380
}
6481
}
6582

66-
//pub trait NodeDynamicSubscriptionCallback = FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send;
67-
//pub trait WorkerDynamicSubscriptionCallback<Payload> = FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send;
68-
6983
pub enum DynamicSubscriptionCallback<Payload> {
7084
/// A callback with the message and the message info as arguments.
71-
Node(NodeDynamicSubscriptionCallback),
85+
Node(NodeAsyncDynamicSubscriptionCallback),
7286
/// A callback with the payload, message, and the message info as arguments.
7387
Worker(WorkerDynamicSubscriptionCallback<Payload>),
7488
}
7589

7690
impl From<NodeDynamicSubscriptionCallback> for DynamicSubscriptionCallback<()> {
7791
fn from(value: NodeDynamicSubscriptionCallback) -> Self {
92+
let func = Arc::new(value);
93+
DynamicSubscriptionCallback::Node(NodeAsyncDynamicSubscriptionCallback(Box::new(
94+
move |message, info| {
95+
let f = Arc::clone(&func);
96+
Box::pin(async move {
97+
f(message, info);
98+
})
99+
},
100+
)))
101+
}
102+
}
103+
104+
impl From<NodeAsyncDynamicSubscriptionCallback> for DynamicSubscriptionCallback<()> {
105+
fn from(value: NodeAsyncDynamicSubscriptionCallback) -> Self {
78106
DynamicSubscriptionCallback::Node(value)
79107
}
80108
}
@@ -336,6 +364,8 @@ where
336364
#[cfg(test)]
337365
mod tests {
338366
use super::*;
367+
use crate::test_helpers::*;
368+
use test_msgs::msg;
339369

340370
fn assert_send<T: Send>() {}
341371
fn assert_sync<T: Sync>() {}
@@ -345,4 +375,70 @@ mod tests {
345375
assert_send::<DynamicSubscription>();
346376
assert_sync::<DynamicSubscription>();
347377
}
378+
379+
#[test]
380+
fn test_dynamic_subscriptions() -> Result<(), RclrsError> {
381+
use crate::TopicEndpointInfo;
382+
383+
let namespace = "/test_dynamic_subscriptions_graph";
384+
let graph = construct_test_graph(namespace)?;
385+
386+
let node_2_empty_subscription = graph
387+
.node2
388+
.create_dynamic_subscription::<_>("test_msgs/msg/Empty".try_into().unwrap(), "graph_test_topic_1", |_, _| {})?;
389+
let topic1 = node_2_empty_subscription.topic_name();
390+
/*
391+
let node_2_basic_types_subscription =
392+
graph.node2.create_subscription::<msg::BasicTypes, _>(
393+
"graph_test_topic_2",
394+
|_msg: msg::BasicTypes| {},
395+
)?;
396+
let topic2 = node_2_basic_types_subscription.topic_name();
397+
let node_1_defaults_subscription = graph.node1.create_subscription::<msg::Defaults, _>(
398+
"graph_test_topic_3",
399+
|_msg: msg::Defaults| {},
400+
)?;
401+
let topic3 = node_1_defaults_subscription.topic_name();
402+
403+
std::thread::sleep(std::time::Duration::from_millis(100));
404+
405+
// Test count_subscriptions()
406+
assert_eq!(graph.node2.count_subscriptions(&topic1)?, 1);
407+
assert_eq!(graph.node2.count_subscriptions(&topic2)?, 1);
408+
409+
// Test get_subscription_names_and_types_by_node()
410+
let node_1_subscription_names_and_types = graph
411+
.node1
412+
.get_subscription_names_and_types_by_node(&graph.node1.name(), namespace)?;
413+
414+
let types = node_1_subscription_names_and_types.get(&topic3).unwrap();
415+
assert!(types.contains(&"test_msgs/msg/Defaults".to_string()));
416+
417+
let node_2_subscription_names_and_types = graph
418+
.node2
419+
.get_subscription_names_and_types_by_node(&graph.node2.name(), namespace)?;
420+
421+
let types = node_2_subscription_names_and_types.get(&topic1).unwrap();
422+
assert!(types.contains(&"test_msgs/msg/Empty".to_string()));
423+
424+
let types = node_2_subscription_names_and_types.get(&topic2).unwrap();
425+
assert!(types.contains(&"test_msgs/msg/BasicTypes".to_string()));
426+
427+
// Test get_subscriptions_info_by_topic()
428+
let expected_subscriptions_info = vec![TopicEndpointInfo {
429+
node_name: String::from("graph_test_node_2"),
430+
node_namespace: String::from(namespace),
431+
topic_type: String::from("test_msgs/msg/Empty"),
432+
}];
433+
assert_eq!(
434+
graph.node1.get_subscriptions_info_by_topic(&topic1)?,
435+
expected_subscriptions_info
436+
);
437+
assert_eq!(
438+
graph.node2.get_subscriptions_info_by_topic(&topic1)?,
439+
expected_subscriptions_info
440+
);
441+
*/
442+
Ok(())
443+
}
348444
}

rclrs/src/node.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ mod graph;
88
#[cfg(feature = "dyn_msg")]
99
use crate::dynamic_message::{
1010
DynamicMessage, DynamicPublisher, DynamicPublisherState, DynamicSubscription,
11-
DynamicSubscriptionState, MessageTypeName, NodeDynamicSubscriptionCallback,
11+
DynamicSubscriptionState, MessageTypeName, NodeAsyncDynamicSubscriptionCallback,
12+
NodeDynamicSubscriptionCallback,
1213
};
1314

1415
pub use graph::*;
@@ -810,7 +811,7 @@ impl NodeState {
810811
callback: F,
811812
) -> Result<DynamicSubscription, RclrsError>
812813
where
813-
F: FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + 'static,
814+
F: Fn(DynamicMessage, MessageInfo) + Send + Sync + 'static,
814815
{
815816
DynamicSubscriptionState::<Node>::create(
816817
topic_type,
@@ -821,6 +822,25 @@ impl NodeState {
821822
)
822823
}
823824

825+
#[cfg(feature = "dyn_msg")]
826+
pub fn create_async_dynamic_subscription<'a, F>(
827+
&self,
828+
topic_type: MessageTypeName,
829+
options: impl Into<SubscriptionOptions<'a>>,
830+
callback: F,
831+
) -> Result<DynamicSubscription, RclrsError>
832+
where
833+
F: FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + Sync + 'static,
834+
{
835+
DynamicSubscriptionState::<Node>::create(
836+
topic_type,
837+
options,
838+
NodeAsyncDynamicSubscriptionCallback(Box::new(callback)),
839+
&self.handle,
840+
self.commands.async_worker_commands(),
841+
)
842+
}
843+
824844
/// Creates a [`Subscription`] with an async callback.
825845
///
826846
/// # Behavior

rclrs/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ impl<Payload: 'static + Send + Sync> WorkerState<Payload> {
277277
callback: F,
278278
) -> Result<WorkerDynamicSubscription<Payload>, RclrsError>
279279
where
280-
F: FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + 'static,
280+
F: FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + Sync + 'static,
281281
{
282282
DynamicSubscriptionState::<Worker<Payload>>::create(
283283
topic_type,

0 commit comments

Comments
 (0)