Skip to content

Commit 8fed9dd

Browse files
committed
Reworking Commands API
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 8f8dfe9 commit 8fed9dd

File tree

4 files changed

+50
-24
lines changed

4 files changed

+50
-24
lines changed

rclrs/src/executor.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl Executor {
122122
/// while the executor is spinning.
123123
pub struct ExecutorCommands {
124124
context: Context,
125-
channel: Box<dyn ExecutorChannel>,
125+
channel: Arc<dyn ExecutorChannel>,
126126
halt_spinning: Arc<AtomicBool>,
127127
wakeup_wait_set: Arc<GuardCondition>,
128128
}
@@ -216,31 +216,60 @@ impl ExecutorCommands {
216216
pub(crate) fn get_guard_condition(&self) -> &Arc<GuardCondition> {
217217
&self.wakeup_wait_set
218218
}
219+
220+
pub(crate) fn as_worker_commands(&self) -> WorkerCommands {
221+
WorkerCommands::new(Arc::clone(&self.channel).as_worker_channel())
222+
}
223+
}
224+
225+
#[derive(Clone)]
226+
pub(crate) struct WorkerCommands {
227+
channel: Arc<dyn WorkerChannel>,
228+
}
229+
230+
impl WorkerCommands {
231+
pub(crate) fn new(channel: Arc<dyn WorkerChannel>) -> Self {
232+
Self { channel }
233+
}
234+
235+
pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
236+
self.channel.add_to_waitset(waitable);
237+
}
238+
239+
pub(crate) fn run<F>(&self, f: F)
240+
where
241+
F: 'static + Future<Output = ()> + Send,
242+
{
243+
self.channel.add_async_task(Box::pin(f));
244+
}
219245
}
220246

221247
pub trait WorkerChannel: Send + Sync {
248+
/// Add a new item for the executor to run.
249+
fn add_async_task(&self, f: BoxFuture<'static, ()>);
250+
222251
/// Add new entities to the waitset of the executor.
223252
fn add_to_waitset(&self, new_entity: Waitable);
253+
254+
/// Allows an ExecutorChannel to be cast to a WorkerChannel.
255+
fn as_worker_channel(self: Arc<Self>) -> Arc<dyn WorkerChannel + 'static>;
224256
}
225257

226258
/// This trait defines the interface for passing new items into an executor to
227259
/// run.
228260
pub trait ExecutorChannel: WorkerChannel {
229-
/// Add a new item for the executor to run.
230-
fn add_async_task(&self, f: BoxFuture<'static, ()>);
231-
232261
/// Create a new channel specific to a worker whose payload must be
233262
/// initialized with the given function.
234-
fn create_worker_channel(
263+
fn create_worker(
235264
&self,
236265
payload: Box<dyn Any + Send>,
237-
) -> Box<dyn ExecutorChannel>;
266+
) -> Arc<dyn WorkerChannel>;
238267
}
239268

240269
/// This trait defines the interface for having an executor run.
241270
pub trait ExecutorRuntime: Send {
242271
/// Get a channel that can add new items for the executor to run.
243-
fn default_channel(&self) -> Box<dyn ExecutorChannel>;
272+
fn default_channel(&self) -> Arc<dyn ExecutorChannel>;
244273

245274
/// Tell the runtime to spin while blocking any further execution until the
246275
/// spinning is complete.

rclrs/src/subscription.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use rosidl_runtime_rs::{Message, RmwMessage};
88

99
use crate::{
10-
error::ToResult, qos::QoSProfile, rcl_bindings::*, ExecutorCommands, IntoPrimitiveOptions,
10+
error::ToResult, qos::QoSProfile, rcl_bindings::*, WorkerCommands, IntoPrimitiveOptions,
1111
Node, NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable,
1212
WaitableLifecycle, Worker, ENTITY_LIFECYCLE_MUTEX,
1313
};
@@ -121,7 +121,7 @@ where
121121
options: impl Into<SubscriptionOptions<'a>>,
122122
callback: AnySubscriptionCallback<T, Scope::Payload>,
123123
node_handle: &Arc<NodeHandle>,
124-
commands: &Arc<ExecutorCommands>,
124+
commands: WorkerCommands,
125125
) -> Result<Arc<Self>, RclrsError> {
126126
let SubscriptionOptions { topic, qos } = options.into();
127127
let callback = Arc::new(Mutex::new(callback));
@@ -169,7 +169,7 @@ where
169169
Box::new(SubscriptionExecutable {
170170
handle: Arc::clone(&handle),
171171
callback: Arc::clone(&callback),
172-
commands: Arc::clone(commands),
172+
commands: commands.clone(),
173173
}),
174174
Some(Arc::clone(commands.get_guard_condition())),
175175
);
@@ -218,7 +218,7 @@ impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for SubscriptionOptions<'a> {
218218
struct SubscriptionExecutable<T: Message, Payload> {
219219
handle: Arc<SubscriptionHandle>,
220220
callback: Arc<Mutex<AnySubscriptionCallback<T, Payload>>>,
221-
commands: Arc<ExecutorCommands>,
221+
commands: WorkerCommands,
222222
}
223223

224224
impl<T, Payload: 'static> RclPrimitive for SubscriptionExecutable<T, Payload>

rclrs/src/subscription/any_subscription_callback.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use rosidl_runtime_rs::Message;
22

33
use crate::{
44
subscription::SubscriptionHandle,
5-
error::ToResult, rcl_bindings::*, ExecutorCommands, RclReturnCode, RclrsError,
5+
error::ToResult, rcl_bindings::*, WorkerCommands, RclReturnCode, RclrsError,
66
ReadOnlyLoanedMessage, MessageInfo, NodeSubscriptionCallback, WorkerSubscriptionCallback,
77
};
88

@@ -21,7 +21,7 @@ impl<T: Message, Payload: 'static> AnySubscriptionCallback<T, Payload> {
2121
&mut self,
2222
handle: &Arc<SubscriptionHandle>,
2323
payload: &mut dyn Any,
24-
commands: &Arc<ExecutorCommands>,
24+
commands: &WorkerCommands,
2525
) -> Result<(), RclrsError> {
2626
match self {
2727
Self::Node(node) => node.execute(handle, commands),

rclrs/src/subscription/node_subscription_callback.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use rosidl_runtime_rs::Message;
22

33
use super::{MessageInfo, SubscriptionHandle};
4-
use crate::{
5-
ExecutorCommands, RclReturnCode, RclrsError,
6-
ReadOnlyLoanedMessage, RclrsErrorFilter,
7-
};
4+
use crate::{WorkerCommands, RclrsError, ReadOnlyLoanedMessage, RclrsErrorFilter};
85

96
use futures::future::BoxFuture;
107

@@ -40,33 +37,33 @@ impl<T: Message> NodeSubscriptionCallback<T> {
4037
pub(super) fn execute(
4138
&mut self,
4239
handle: &Arc<SubscriptionHandle>,
43-
commands: &Arc<ExecutorCommands>,
40+
commands: &WorkerCommands,
4441
) -> Result<(), RclrsError> {
4542
let mut evaluate = || {
4643
match self {
4744
NodeSubscriptionCallback::Regular(cb) => {
4845
let (msg, _) = handle.take::<T>()?;
49-
let _ = commands.run(cb(msg));
46+
commands.run(cb(msg));
5047
}
5148
NodeSubscriptionCallback::RegularWithMessageInfo(cb) => {
5249
let (msg, msg_info) = handle.take::<T>()?;
53-
let _ = commands.run(cb(msg, msg_info));
50+
commands.run(cb(msg, msg_info));
5451
}
5552
NodeSubscriptionCallback::Boxed(cb) => {
5653
let (msg, _) = handle.take_boxed::<T>()?;
57-
let _ = commands.run(cb(msg));
54+
commands.run(cb(msg));
5855
}
5956
NodeSubscriptionCallback::BoxedWithMessageInfo(cb) => {
6057
let (msg, msg_info) = handle.take_boxed::<T>()?;
61-
let _ = commands.run(cb(msg, msg_info));
58+
commands.run(cb(msg, msg_info));
6259
}
6360
NodeSubscriptionCallback::Loaned(cb) => {
6461
let (msg, _) = handle.take_loaned::<T>()?;
65-
let _ = commands.run(cb(msg));
62+
commands.run(cb(msg));
6663
}
6764
NodeSubscriptionCallback::LoanedWithMessageInfo(cb) => {
6865
let (msg, msg_info) = handle.take_loaned::<T>()?;
69-
let _ = commands.run(cb(msg, msg_info));
66+
commands.run(cb(msg, msg_info));
7067
}
7168
}
7269
Ok(())

0 commit comments

Comments
 (0)