Skip to content

Commit e79f46f

Browse files
First integration with async workers
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent 4fd4059 commit e79f46f

File tree

4 files changed

+159
-19
lines changed

4 files changed

+159
-19
lines changed

rclrs/src/dynamic_message/dynamic_subscription.rs

Lines changed: 150 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,188 @@
1+
use std::any::Any;
12
use std::boxed::Box;
23
use std::ffi::{CStr, CString};
4+
use std::ops::{Deref, DerefMut};
35
use std::sync::atomic::AtomicBool;
46
use std::sync::{Arc, Mutex};
57

8+
use futures::future::BoxFuture;
9+
610
use super::{
711
get_type_support_handle, get_type_support_library, DynamicMessage, DynamicMessageMetadata,
812
MessageStructure,
913
};
1014
use crate::rcl_bindings::*;
1115
use crate::{
12-
ENTITY_LIFECYCLE_MUTEX, Waitable,
16+
ENTITY_LIFECYCLE_MUTEX, Waitable, RclPrimitive, MessageInfo, RclPrimitiveKind, RclPrimitiveHandle, WorkScope,
1317
Node, QoSProfile, RclReturnCode, RclrsError, ToResult, NodeHandle, WorkerCommands, WaitableLifecycle, SubscriptionHandle,
1418
};
1519

20+
struct DynamicSubscriptionExecutable<Payload> {
21+
handle: Arc<SubscriptionHandle>,
22+
callback: Arc<Mutex<DynamicSubscriptionCallback<Payload>>>,
23+
commands: Arc<WorkerCommands>,
24+
metadata: Arc<DynamicMessageMetadata>,
25+
}
26+
27+
// TODO(luca) consider making these enums if we want different callback types
28+
// TODO(luca) make fields private
29+
pub struct NodeDynamicSubscriptionCallback(pub Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send>);
30+
pub struct WorkerDynamicSubscriptionCallback<Payload>(pub Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send>);
31+
32+
impl Deref for NodeDynamicSubscriptionCallback {
33+
type Target = Box<dyn FnMut(DynamicMessage, MessageInfo)-> BoxFuture<'static, ()> + Send>;
34+
fn deref(&self) -> &Self::Target {
35+
&self.0
36+
}
37+
}
38+
39+
impl DerefMut for NodeDynamicSubscriptionCallback {
40+
fn deref_mut(&mut self) -> &mut Self::Target {
41+
&mut self.0
42+
}
43+
}
44+
45+
impl<Payload> Deref for WorkerDynamicSubscriptionCallback<Payload> {
46+
type Target = Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send>;
47+
fn deref(&self) -> &Self::Target {
48+
&self.0
49+
}
50+
}
51+
52+
impl<Payload> DerefMut for WorkerDynamicSubscriptionCallback<Payload> {
53+
fn deref_mut(&mut self) -> &mut Self::Target {
54+
&mut self.0
55+
}
56+
}
57+
58+
//pub trait NodeDynamicSubscriptionCallback = FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send;
59+
//pub trait WorkerDynamicSubscriptionCallback<Payload> = FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send;
60+
61+
pub enum DynamicSubscriptionCallback<Payload> {
62+
/// A callback with the message and the message info as arguments.
63+
Node(NodeDynamicSubscriptionCallback),
64+
/// A callback with the payload, message, and the message info as arguments.
65+
Worker(WorkerDynamicSubscriptionCallback<Payload>),
66+
}
67+
68+
impl From<NodeDynamicSubscriptionCallback> for DynamicSubscriptionCallback<()> {
69+
fn from(value: NodeDynamicSubscriptionCallback) -> Self {
70+
DynamicSubscriptionCallback::Node(value)
71+
}
72+
}
73+
74+
impl<Payload> From<WorkerDynamicSubscriptionCallback<Payload>> for DynamicSubscriptionCallback<Payload> {
75+
fn from(value: WorkerDynamicSubscriptionCallback<Payload>) -> Self {
76+
DynamicSubscriptionCallback::Worker(value)
77+
}
78+
}
79+
80+
impl<Payload: 'static> DynamicSubscriptionCallback<Payload> {
81+
pub(super) fn execute(
82+
&mut self,
83+
executable: &DynamicSubscriptionExecutable<Payload>,
84+
any_payload: &mut dyn Any,
85+
commands: &WorkerCommands,
86+
) -> Result<(), RclrsError> {
87+
let Some(payload) = any_payload.downcast_mut::<Payload>() else {
88+
return Err(RclrsError::InvalidPayload {
89+
expected: std::any::TypeId::of::<Payload>(),
90+
received: (*any_payload).type_id(),
91+
});
92+
};
93+
match self {
94+
Self::Node(cb) => {
95+
let (msg, msg_info) = executable.take()?;
96+
commands.run_async(cb(msg, msg_info));
97+
}
98+
Self::Worker(cb) => {
99+
let (msg, msg_info) = executable.take()?;
100+
cb(payload, msg, msg_info);
101+
}
102+
}
103+
Ok(())
104+
}
105+
}
106+
107+
impl<Payload> DynamicSubscriptionExecutable<Payload> {
108+
pub fn take(&self) -> Result<(DynamicMessage, MessageInfo), RclrsError> {
109+
let mut dynamic_message = self.metadata.create()?;
110+
let rmw_message = dynamic_message.storage.as_mut_ptr();
111+
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
112+
let rcl_subscription = &mut *self.handle.lock();
113+
unsafe {
114+
// SAFETY: The first two pointers are valid/initialized, and do not need to be valid
115+
// beyond the function call.
116+
// The latter two pointers are explicitly allowed to be NULL.
117+
rcl_take(
118+
rcl_subscription,
119+
rmw_message as *mut _,
120+
&mut message_info,
121+
std::ptr::null_mut(),
122+
)
123+
.ok()?
124+
};
125+
Ok((dynamic_message, MessageInfo::from_rmw_message_info(&message_info)))
126+
}
127+
}
128+
129+
impl<Payload: 'static> RclPrimitive for DynamicSubscriptionExecutable<Payload>
130+
{
131+
unsafe fn execute(&mut self, payload: &mut dyn Any) -> Result<(), RclrsError> {
132+
self.callback
133+
.lock()
134+
.unwrap()
135+
.execute(&self, payload, &self.commands)
136+
}
137+
138+
fn kind(&self) -> RclPrimitiveKind {
139+
RclPrimitiveKind::Subscription
140+
}
141+
142+
fn handle(&self) -> RclPrimitiveHandle {
143+
RclPrimitiveHandle::Subscription(self.handle.lock())
144+
}
145+
}
146+
16147
/// Struct for receiving messages whose type is only known at runtime.
17-
pub struct DynamicSubscription {
148+
pub struct DynamicSubscription<Scope>
149+
where
150+
Scope: WorkScope,
151+
{
18152
/// This handle is used to access the data that rcl holds for this subscription.
19153
handle: Arc<SubscriptionHandle>,
20154
/// This allows us to replace the callback in the subscription task.
21155
///
22156
/// Holding onto this sender will keep the subscription task alive. Once
23157
/// this sender is dropped, the subscription task will end itself.
24-
pub callback: Arc<Mutex<AnySubscriptionCallback<T, Scope::Payload>>>,
25-
// pub callback: Mutex<Box<dyn FnMut(DynamicMessage) + 'static + Send>>,
158+
// pub callback: Arc<Mutex<AnySubscriptionCallback<T, Scope::Payload>>>,
159+
// pub callback: Arc<Mutex<Box<dyn FnMut(DynamicMessage) + 'static + Send>>>,
160+
callback: Arc<Mutex<DynamicSubscriptionCallback<Scope::Payload>>>,
26161
/// Holding onto this keeps the waiter for this subscription alive in the
27162
/// wait set of the executor.
28163
#[allow(unused)]
29164
lifecycle: WaitableLifecycle,
30-
metadata: DynamicMessageMetadata,
165+
metadata: Arc<DynamicMessageMetadata>,
31166
// This is the regular type support library, not the introspection one.
32167
#[allow(dead_code)]
33168
type_support_library: Arc<libloading::Library>,
34169
}
35170

36-
impl DynamicSubscription {
171+
impl<Scope> DynamicSubscription<Scope>
172+
where
173+
Scope: WorkScope
174+
{
37175
/// Creates a new dynamic subscription.
38176
///
39177
/// This is not a public function, by the same rationale as `Subscription::new()`.
40-
pub(crate) fn new<F>(
178+
pub(crate) fn new(
41179
topic: &str,
42180
topic_type: &str,
43181
qos: QoSProfile,
44-
callback: F,
182+
callback: impl Into<DynamicSubscriptionCallback<Scope::Payload>>,
45183
node_handle: &Arc<NodeHandle>,
46184
commands: &Arc<WorkerCommands>,
47185
) -> Result<Arc<Self>, RclrsError>
48-
where
49-
F: FnMut(DynamicMessage) + 'static + Send,
50186
{
51187
// TODO(luca) a lot of duplication with nomral, refactor
52188
// This loads the introspection type support library.
@@ -103,13 +239,15 @@ impl DynamicSubscription {
103239
node_handle: Arc::clone(node_handle),
104240
});
105241

106-
let callback = Arc::new(Mutex::new(callback));
242+
let callback = Arc::new(Mutex::new(callback.into()));
243+
let metadata = Arc::new(metadata);
107244

108245
let (waitable, lifecycle) = Waitable::new(
109-
Box::new(SubscriptionExecutable {
246+
Box::new(DynamicSubscriptionExecutable {
110247
handle: Arc::clone(&handle),
111248
callback: Arc::clone(&callback),
112249
commands: Arc::clone(commands),
250+
metadata: Arc::clone(&metadata),
113251
}),
114252
Some(Arc::clone(commands.get_guard_condition())),
115253
);

rclrs/src/dynamic_message/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{error::Error, fmt};
22

33
/// An error related to creating a dynamic message based on the name of the message's type.
4+
// TODO(luca) we need PartialEq (and maybe Eq?) for testing
45
#[derive(Debug)]
56
pub enum DynamicMessageError {
67
/// The type support library was not found because no matching prefix was sourced.

rclrs/src/node.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::dynamic_message::{DynamicMessage, DynamicSubscription};
1818

1919
pub use graph::*;
2020

21+
use futures::future::BoxFuture;
2122
mod node_graph_task;
2223
use node_graph_task::*;
2324

@@ -43,8 +44,8 @@ use rosidl_runtime_rs::Message;
4344
use crate::{
4445
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
4546
IntoAsyncServiceCallback, IntoAsyncSubscriptionCallback, IntoNodeServiceCallback,
46-
IntoNodeSubscriptionCallback, LogParams, Logger, ParameterBuilder, ParameterInterface,
47-
ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError,
47+
IntoNodeSubscriptionCallback, LogParams, Logger, ParameterBuilder, ParameterInterface, dynamic_message::NodeDynamicSubscriptionCallback,
48+
ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError, MessageInfo,
4849
Service, ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState,
4950
TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX,
5051
};
@@ -806,12 +807,12 @@ impl NodeState {
806807
topic_type: &str,
807808
qos: QoSProfile,
808809
callback: F,
809-
) -> Result<Arc<DynamicSubscription>, RclrsError>
810+
) -> Result<Arc<DynamicSubscription<Node>>, RclrsError>
810811
where
811-
F: FnMut(DynamicMessage) + 'static + Send,
812+
F: FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + 'static,
812813
{
813814
let subscription = DynamicSubscription::new(
814-
topic, topic_type, qos, callback, &self.handle, self.commands.async_worker_commands(),
815+
topic, topic_type, qos, NodeDynamicSubscriptionCallback(Box::new(callback)), &self.handle, self.commands.async_worker_commands(),
815816
)?;
816817
// TODO(luca) similar API to above?
817818
Ok(subscription)

rclrs/src/subscription.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ unsafe impl Send for rcl_subscription_t {}
327327
///
328328
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
329329
pub(crate) struct SubscriptionHandle {
330-
rcl_subscription: Mutex<rcl_subscription_t>,
331-
node_handle: Arc<NodeHandle>,
330+
pub(crate) rcl_subscription: Mutex<rcl_subscription_t>,
331+
pub(crate) node_handle: Arc<NodeHandle>,
332332
}
333333

334334
impl SubscriptionHandle {

0 commit comments

Comments
 (0)