Skip to content

Commit c99f410

Browse files
committed
Migrate subscription to use shared callback instead of an async task
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 8f0f192 commit c99f410

File tree

3 files changed

+55
-78
lines changed

3 files changed

+55
-78
lines changed

rclrs/src/parameter/service.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,8 +514,6 @@ mod tests {
514514
dbg!();
515515
std::io::stdout().lock().flush().unwrap();
516516

517-
return Ok(());
518-
519517
// Limit depth, namespaced parameter is not returned
520518
let callback_ran = Arc::new(AtomicBool::new(false));
521519
let callback_ran_inner = Arc::clone(&callback_ran);
@@ -793,6 +791,12 @@ mod tests {
793791
let request = SetParameters_Request {
794792
parameters: seq![undeclared_bool],
795793
};
794+
795+
// Clone test.node here so that we don't move the whole test bundle into
796+
// the closure, which would cause the test node to be fully dropped
797+
// after the closure is called.
798+
let test_node = Arc::clone(&test.node);
799+
796800
let promise = set_client
797801
.call_then(
798802
&request,
@@ -801,7 +805,7 @@ mod tests {
801805
// Setting the undeclared parameter is now allowed
802806
assert!(response.results[0].successful);
803807
assert_eq!(
804-
test.node.use_undeclared_parameters().get("undeclared_bool"),
808+
test_node.use_undeclared_parameters().get("undeclared_bool"),
805809
Some(ParameterValue::Bool(true))
806810
);
807811
callback_ran_inner.store(true, Ordering::Release);

rclrs/src/subscription.rs

Lines changed: 17 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ use std::{
55

66
use rosidl_runtime_rs::{Message, RmwMessage};
77

8-
use futures::channel::mpsc::{unbounded, UnboundedSender, TrySendError};
9-
108
use crate::{
119
error::ToResult,
1210
qos::QoSProfile,
1311
rcl_bindings::*,
1412
ExecutorCommands, NodeHandle, RclrsError, Waitable, Executable, ExecutableHandle,
15-
ExecutableKind, GuardCondition, WaitableLifecycle, ENTITY_LIFECYCLE_MUTEX,
13+
ExecutableKind, WaitableLifecycle, ENTITY_LIFECYCLE_MUTEX,
1614
};
1715

1816
mod any_subscription_callback;
@@ -30,9 +28,6 @@ pub use message_info::*;
3028
mod readonly_loaned_message;
3129
pub use readonly_loaned_message::*;
3230

33-
mod subscription_task;
34-
use subscription_task::*;
35-
3631
/// Struct for receiving messages of type `T`.
3732
///
3833
/// The only way to instantiate a subscription is via [`Node::create_subscription()`][2]
@@ -59,7 +54,7 @@ where
5954
///
6055
/// Holding onto this sender will keep the subscription task alive. Once
6156
/// this sender is dropped, the subscription task will end itself.
62-
action: UnboundedSender<SubscriptionAction<T>>,
57+
callback: Arc<Mutex<AnySubscriptionCallback<T>>>,
6358
/// Holding onto this keeps the waiter for this subscription alive in the
6459
/// wait set of the executor.
6560
lifecycle: WaitableLifecycle,
@@ -91,9 +86,9 @@ where
9186
pub fn set_callback<Args>(
9287
&self,
9388
callback: impl SubscriptionCallback<T, Args>,
94-
) -> Result<(), TrySendError<SubscriptionAction<T>>> {
89+
) {
9590
let callback = callback.into_subscription_callback();
96-
self.action.unbounded_send(SubscriptionAction::SetCallback(callback))
91+
*self.callback.lock().unwrap() = callback;
9792
}
9893

9994
/// Set the callback of this subscription, replacing the callback that was
@@ -103,9 +98,9 @@ where
10398
pub fn set_async_callback<Args>(
10499
&self,
105100
callback: impl SubscriptionAsyncCallback<T, Args>,
106-
) -> Result<(), TrySendError<SubscriptionAction<T>>> {
101+
) {
107102
let callback = callback.into_subscription_async_callback();
108-
self.action.unbounded_send(SubscriptionAction::SetCallback(callback))
103+
*self.callback.lock().unwrap() = callback;
109104
}
110105

111106
/// Used by [`Node`][crate::Node] to create a new subscription.
@@ -116,40 +111,8 @@ where
116111
node_handle: &Arc<NodeHandle>,
117112
commands: &Arc<ExecutorCommands>,
118113
) -> Result<Arc<Self>, RclrsError> {
119-
let (sender, receiver) = unbounded();
120-
let (subscription, waiter) = Self::new(
121-
topic,
122-
qos,
123-
sender,
124-
Arc::clone(&node_handle),
125-
Arc::clone(commands.get_guard_condition()),
126-
)?;
114+
let callback = Arc::new(Mutex::new(callback));
127115

128-
commands.run(subscription_task(
129-
callback,
130-
receiver,
131-
Arc::clone(&subscription.handle),
132-
Arc::clone(&commands),
133-
));
134-
135-
commands.add_to_wait_set(waiter);
136-
137-
Ok(subscription)
138-
}
139-
140-
/// Instantiate the Subscription.
141-
fn new(
142-
topic: &str,
143-
qos: QoSProfile,
144-
action: UnboundedSender<SubscriptionAction<T>>,
145-
node_handle: Arc<NodeHandle>,
146-
guard_condition: Arc<GuardCondition>,
147-
) -> Result<(Arc<Self>, Waitable), RclrsError>
148-
// This uses pub(crate) visibility to avoid instantiating this struct outside
149-
// [`Node::create_subscription`], see the struct's documentation for the rationale
150-
where
151-
T: Message,
152-
{
153116
// SAFETY: Getting a zero-initialized value is always safe.
154117
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
155118
let type_support =
@@ -186,35 +149,35 @@ where
186149

187150
let handle = Arc::new(SubscriptionHandle {
188151
rcl_subscription: Mutex::new(rcl_subscription),
189-
node_handle,
152+
node_handle: Arc::clone(node_handle),
190153
});
191154

192-
let (waiter, lifecycle) = Waitable::new(
155+
let (waitable, lifecycle) = Waitable::new(
193156
Box::new(SubscriptionExecutable {
194157
handle: Arc::clone(&handle),
195-
action: action.clone(),
158+
callback: Arc::clone(&callback),
159+
commands: Arc::clone(commands),
196160
}),
197-
Some(guard_condition),
161+
Some(Arc::clone(commands.get_guard_condition())),
198162
);
163+
commands.add_to_wait_set(waitable);
199164

200-
let subscription = Arc::new(Self { handle, action, lifecycle });
201-
202-
Ok((subscription, waiter))
165+
Ok(Arc::new(Self { handle, callback, lifecycle }))
203166
}
204167
}
205168

206169
struct SubscriptionExecutable<T: Message> {
207170
handle: Arc<SubscriptionHandle>,
208-
action: UnboundedSender<SubscriptionAction<T>>,
171+
callback: Arc<Mutex<AnySubscriptionCallback<T>>>,
172+
commands: Arc<ExecutorCommands>,
209173
}
210174

211175
impl<T> Executable for SubscriptionExecutable<T>
212176
where
213177
T: Message,
214178
{
215179
fn execute(&mut self) -> Result<(), RclrsError> {
216-
self.action.unbounded_send(SubscriptionAction::Execute).ok();
217-
Ok(())
180+
self.callback.lock().unwrap().execute(&self.handle, &self.commands)
218181
}
219182

220183
fn kind(&self) -> crate::ExecutableKind {

rclrs/src/wait.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ pub use waitable::*;
3131

3232
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3333
pub struct WaitSet {
34-
entities: HashMap<ExecutableKind, Vec<Waitable>>,
35-
pub(crate) handle: WaitSetHandle,
34+
primitives: HashMap<ExecutableKind, Vec<Waitable>>,
35+
handle: WaitSetHandle,
3636
}
3737

3838
// SAFETY: While the rcl_wait_set_t does have some interior mutability (because it has
@@ -53,8 +53,8 @@ impl WaitSet {
5353
context_handle: Arc::clone(&context.handle),
5454
};
5555

56-
let mut wait_set = Self { entities: HashMap::new(), handle };
57-
wait_set.register_rcl_entities()?;
56+
let mut wait_set = Self { primitives: HashMap::new(), handle };
57+
wait_set.register_rcl_primitives()?;
5858
Ok(wait_set)
5959
}
6060

@@ -68,10 +68,10 @@ impl WaitSet {
6868
return Err(RclrsError::AlreadyAddedToWaitSet);
6969
}
7070
let kind = entity.executable.kind();
71-
self.entities.entry(kind).or_default().push(entity);
71+
self.primitives.entry(kind).or_default().push(entity);
7272
}
7373
self.resize_rcl_containers()?;
74-
self.register_rcl_entities()?;
74+
self.register_rcl_primitives()?;
7575
Ok(())
7676
}
7777

@@ -80,13 +80,8 @@ impl WaitSet {
8080
/// This effectively resets the wait set to the state it was in after being created by
8181
/// [`WaitSet::new`].
8282
pub fn clear(&mut self) {
83-
self.entities.clear();
84-
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
85-
// valid, which it always is in our case. Hence, only debug_assert instead of returning
86-
// Result.
87-
// SAFETY: No preconditions for this function (besides passing in a valid wait set).
88-
let ret = unsafe { rcl_wait_set_clear(&mut self.handle.rcl_wait_set) };
89-
debug_assert_eq!(ret, 0);
83+
self.primitives.clear();
84+
self.rcl_clear();
9085
}
9186

9287
/// Blocks until the wait set is ready, or until the timeout has been exceeded.
@@ -147,13 +142,13 @@ impl WaitSet {
147142
}
148143

149144
// Remove any waitables that are no longer being used
150-
for waiter in self.entities.values_mut() {
151-
waiter.retain(|w| w.in_use());
145+
for waitable in self.primitives.values_mut() {
146+
waitable.retain(|w| w.in_use());
152147
}
153148

154149
// For the remaining entities, check if they were activated and then run
155150
// the callback for those that were.
156-
for waiter in self.entities.values_mut().flat_map(|v| v) {
151+
for waiter in self.primitives.values_mut().flat_map(|v| v) {
157152
if waiter.is_ready(&self.handle.rcl_wait_set) {
158153
f(&mut *waiter.executable)?;
159154
}
@@ -168,16 +163,19 @@ impl WaitSet {
168163
// Note that self.clear() will not change the allocated size of each rcl
169164
// entity container, so we do not need to resize before re-registering
170165
// the rcl entities.
171-
self.clear();
172-
self.register_rcl_entities();
166+
self.rcl_clear();
167+
if let Err(err) = self.register_rcl_primitives() {
168+
// TODO(@mxgrey): Log this error when logging is available
169+
eprintln!("Error while registering rcl primitives: {err}");
170+
}
173171

174172
Ok(())
175173
}
176174

177175
/// Get a count of the different kinds of entities in the wait set.
178176
pub fn count(&self) -> WaitableCount {
179177
let mut c = WaitableCount::new();
180-
for (kind, collection) in &self.entities {
178+
for (kind, collection) in &self.primitives {
181179
c.add(*kind, collection.len());
182180
}
183181
c
@@ -191,6 +189,18 @@ impl WaitSet {
191189
Ok(())
192190
}
193191

192+
/// Clear only the rcl_wait_set. This is done so that we can safely repopulate
193+
/// it to perform another wait. This does not effect the entities that we
194+
/// consider to still be in the wait set.
195+
fn rcl_clear(&mut self) {
196+
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
197+
// valid, which it always is in our case. Hence, only debug_assert instead of returning
198+
// Result.
199+
// SAFETY: No preconditions for this function (besides passing in a valid wait set).
200+
let ret = unsafe { rcl_wait_set_clear(&mut self.handle.rcl_wait_set) };
201+
debug_assert_eq!(ret, 0);
202+
}
203+
194204
/// Registers all the waitable entities with the rcl wait set.
195205
///
196206
/// # Errors
@@ -199,8 +209,8 @@ impl WaitSet {
199209
/// then there is a bug in rclrs.
200210
///
201211
/// [1]: crate::RclReturnCode
202-
fn register_rcl_entities(&mut self) -> Result<(), RclrsError> {
203-
for entity in self.entities.values_mut().flat_map(|c| c) {
212+
fn register_rcl_primitives(&mut self) -> Result<(), RclrsError> {
213+
for entity in self.primitives.values_mut().flat_map(|c| c) {
204214
entity.add_to_wait_set(&mut self.handle.rcl_wait_set)?;
205215
}
206216
Ok(())

0 commit comments

Comments
 (0)