Skip to content

Commit 4fd4059

Browse files
WIP working through subscriptions
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent 780c660 commit 4fd4059

File tree

5 files changed

+28
-9
lines changed

5 files changed

+28
-9
lines changed

rclrs/src/dynamic_message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
88
use std::{
99
fmt::{self, Display},
10+
ops::Deref,
1011
path::PathBuf,
1112
sync::Arc,
1213
};

rclrs/src/dynamic_message/dynamic_publisher.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::{
99
use crate::error::{RclrsError, ToResult};
1010
use crate::qos::QoSProfile;
1111
use crate::rcl_bindings::*;
12-
use crate::Node;
12+
use crate::NodeHandle;
1313

1414
/// Struct for sending messages of type `T`.
1515
///
@@ -23,7 +23,7 @@ use crate::Node;
2323
/// [1]: crate::spin
2424
pub struct DynamicPublisher {
2525
rcl_publisher_mtx: Mutex<rcl_publisher_t>,
26-
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
26+
node_handle: Arc<NodeHandle>,
2727
metadata: DynamicMessageMetadata,
2828
// This is the regular type support library, not the introspection one.
2929
#[allow(dead_code)]
@@ -32,11 +32,12 @@ pub struct DynamicPublisher {
3232

3333
impl Drop for DynamicPublisher {
3434
fn drop(&mut self) {
35+
let mut rcl_node = self.node_handle.rcl_node.lock().unwrap();
3536
unsafe {
3637
// SAFETY: No preconditions for this function (besides the arguments being valid).
3738
rcl_publisher_fini(
3839
self.rcl_publisher_mtx.get_mut().unwrap(),
39-
&mut *self.rcl_node_mtx.lock().unwrap(),
40+
&mut *rcl_node,
4041
);
4142
}
4243
}
@@ -54,7 +55,7 @@ impl DynamicPublisher {
5455
///
5556
/// Node and namespace changes are always applied _before_ topic remapping.
5657
pub fn new(
57-
node: &Node,
58+
node_handle: &Arc<NodeHandle>,
5859
topic: &str,
5960
topic_type: &str,
6061
qos: QoSProfile,
@@ -83,7 +84,7 @@ impl DynamicPublisher {
8384
err,
8485
s: topic.into(),
8586
})?;
86-
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();
87+
let rcl_node = node_handle.rcl_node.lock().unwrap();
8788

8889
// SAFETY: No preconditions for this function.
8990
let mut publisher_options = unsafe { rcl_publisher_get_default_options() };
@@ -96,7 +97,7 @@ impl DynamicPublisher {
9697
// TODO: type support?
9798
rcl_publisher_init(
9899
&mut rcl_publisher,
99-
rcl_node,
100+
&*rcl_node,
100101
type_support_ptr,
101102
topic_c_string.as_ptr(),
102103
&publisher_options,
@@ -106,7 +107,7 @@ impl DynamicPublisher {
106107

107108
Ok(Self {
108109
rcl_publisher_mtx: Mutex::new(rcl_publisher),
109-
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
110+
node_handle: node_handle.clone(),
110111
metadata,
111112
type_support_library,
112113
})

rclrs/src/dynamic_message/dynamic_subscription.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ pub struct DynamicSubscription {
2121
///
2222
/// Holding onto this sender will keep the subscription task alive. Once
2323
/// this sender is dropped, the subscription task will end itself.
24-
// callback: Arc<Mutex<AnySubscriptionCallback<T, Scope::Payload>>>,
25-
pub callback: Mutex<Box<dyn FnMut(DynamicMessage) + 'static + Send>>,
24+
pub callback: Arc<Mutex<AnySubscriptionCallback<T, Scope::Payload>>>,
25+
// pub callback: Mutex<Box<dyn FnMut(DynamicMessage) + 'static + Send>>,
2626
/// Holding onto this keeps the waiter for this subscription alive in the
2727
/// wait set of the executor.
2828
#[allow(unused)]
@@ -103,6 +103,8 @@ impl DynamicSubscription {
103103
node_handle: Arc::clone(node_handle),
104104
});
105105

106+
let callback = Arc::new(Mutex::new(callback));
107+
106108
let (waitable, lifecycle) = Waitable::new(
107109
Box::new(SubscriptionExecutable {
108110
handle: Arc::clone(&handle),

rclrs/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
fmt::{self, Display},
55
};
66

7+
#[cfg(feature = "dyn_msg")]
78
use crate::dynamic_message::DynamicMessageError;
89
use crate::{rcl_bindings::*, DeclarationError};
910

@@ -33,6 +34,7 @@ pub enum RclrsError {
3334
},
3435
/// It was attempted to add a waitable to a wait set twice.
3536
AlreadyAddedToWaitSet,
37+
#[cfg(feature = "dyn_msg")]
3638
/// An error while creating dynamic message.
3739
DynamicMessageError {
3840
/// The error containing more detailed information.
@@ -100,6 +102,7 @@ impl Display for RclrsError {
100102
"Could not add entity to wait set because it was already added to a wait set"
101103
)
102104
}
105+
#[cfg(feature = "dyn_msg")]
103106
RclrsError::DynamicMessageError { .. } => {
104107
write!(f, "Could not create dynamic message")
105108
}
@@ -131,6 +134,7 @@ impl Display for RclrsError {
131134
}
132135
}
133136

137+
#[cfg(feature = "dyn_msg")]
134138
impl From<DynamicMessageError> for RclrsError {
135139
fn from(err: DynamicMessageError) -> Self {
136140
Self::DynamicMessageError { err }
@@ -167,6 +171,7 @@ impl Error for RclrsError {
167171
// TODO(@mxgrey): We should provide source information for these other types.
168172
// It should be easy to do this using the thiserror crate.
169173
RclrsError::AlreadyAddedToWaitSet => None,
174+
#[cfg(feature = "dyn_msg")]
170175
RclrsError::DynamicMessageError { err } => Some(err).map(|e| e as &dyn Error),
171176
RclrsError::NegativeDuration(_) => None,
172177
RclrsError::UnownedGuardCondition => None,

rclrs/src/subscription.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
1313
};
1414

15+
#[cfg(feature = "dyn_msg")]
1516
use crate::dynamic_message::DynamicMessage;
1617

1718
mod any_subscription_callback;
@@ -279,6 +280,7 @@ impl<'a> DynamicSubscriptionOptions<'a> {
279280
}
280281
}
281282

283+
/*
282284
impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for DynamicSubscriptionOptions<'a> {
283285
fn from(value: T) -> Self {
284286
let primitive = value.into_primitive_options();
@@ -287,6 +289,7 @@ impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for DynamicSubscriptionOptions<'a>
287289
options
288290
}
289291
}
292+
*/
290293

291294
struct SubscriptionExecutable<T: Message, Payload> {
292295
handle: Arc<SubscriptionHandle>,
@@ -361,6 +364,13 @@ impl SubscriptionHandle {
361364
Ok((T::from_rmw_message(rmw_message), message_info))
362365
}
363366

367+
/*
368+
#[cfg(feature = "dyn_msg")]
369+
fn take_dynamic(&self) -> Result<(DynamicMessage, MessageInfo), RclrsError> {
370+
371+
}
372+
*/
373+
364374
/// This is a version of take() that returns a boxed message.
365375
///
366376
/// This can be more efficient for messages containing large arrays.

0 commit comments

Comments
 (0)