Skip to content

Commit 8f8dfe9

Browse files
committed
Beginning implementation of Worker concept
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 6d3f7e4 commit 8f8dfe9

19 files changed

+978
-494
lines changed

rclrs/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
any::Any,
23
collections::HashMap,
34
ffi::CString,
45
sync::{Arc, Mutex, MutexGuard},
@@ -299,7 +300,7 @@ impl<T> RclPrimitive for ClientExecutable<T>
299300
where
300301
T: rosidl_runtime_rs::Service,
301302
{
302-
fn execute(&mut self) -> Result<(), RclrsError> {
303+
unsafe fn execute(&mut self, payload: &mut dyn Any) -> Result<(), RclrsError> {
303304
self.board.lock().unwrap().execute(&self.handle)
304305
}
305306

rclrs/src/error.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ pub enum RclrsError {
3535
/// The guard condition that you tried to trigger is not owned by the
3636
/// [`GuardCondition`][crate::GuardCondition] instance.
3737
UnownedGuardCondition,
38+
/// The payload given to a primitive that belongs to a worker was the wrong
39+
/// type.
40+
InvalidPayload {
41+
/// The payload type expected by the primitive
42+
expected: std::any::TypeId,
43+
/// The payload type given by the worker
44+
received: std::any::TypeId,
45+
}
3846
}
3947

4048
impl Display for RclrsError {
@@ -57,6 +65,12 @@ impl Display for RclrsError {
5765
"Could not trigger guard condition because it is not owned by rclrs"
5866
)
5967
}
68+
RclrsError::InvalidPayload { expected, received } => {
69+
write!(
70+
f,
71+
"Received invalid payload: expected {expected:?}, received {received:?}",
72+
)
73+
}
6074
}
6175
}
6276
}
@@ -92,6 +106,7 @@ impl Error for RclrsError {
92106
// It should be easy to do this using the thiserror crate.
93107
RclrsError::AlreadyAddedToWaitSet => None,
94108
RclrsError::UnownedGuardCondition => None,
109+
RclrsError::InvalidPayload { .. } => None,
95110
}
96111
}
97112
}
@@ -369,6 +384,10 @@ impl ToResult for rcl_ret_t {
369384
pub trait RclrsErrorFilter {
370385
/// If the result was a timeout error, change it to `Ok(())`.
371386
fn timeout_ok(self) -> Result<(), RclrsError>;
387+
388+
/// If a subscription, service, or client take failed, change the result
389+
/// to be `Ok(())`.
390+
fn take_failed_ok(self) -> Result<(), RclrsError>;
372391
}
373392

374393
impl RclrsErrorFilter for Result<(), RclrsError> {
@@ -390,4 +409,18 @@ impl RclrsErrorFilter for Result<(), RclrsError> {
390409
}
391410
}
392411
}
412+
413+
fn take_failed_ok(self) -> Result<(), RclrsError> {
414+
match self {
415+
Err(RclrsError::RclError {
416+
code: RclReturnCode::SubscriptionTakeFailed | RclReturnCode::ServiceTakeFailed | RclReturnCode::ClientTakeFailed,
417+
..
418+
}) => {
419+
// Spurious wakeup - this may happen even when a waitset indicated that
420+
// work was ready, so we won't report it as an error
421+
Ok(())
422+
}
423+
other => other,
424+
}
425+
}
393426
}

rclrs/src/executor.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use futures::{
88
future::{select, BoxFuture, Either},
99
};
1010
use std::{
11+
any::Any,
1112
future::Future,
1213
sync::{
1314
atomic::{AtomicBool, Ordering},
@@ -90,7 +91,7 @@ impl Executor {
9091
context: Context {
9192
handle: Arc::clone(&context),
9293
},
93-
channel: runtime.channel(),
94+
channel: runtime.default_channel(),
9495
halt_spinning: Arc::new(AtomicBool::new(false)),
9596
wakeup_wait_set: Arc::new(wakeup_wait_set),
9697
});
@@ -217,20 +218,29 @@ impl ExecutorCommands {
217218
}
218219
}
219220

221+
pub trait WorkerChannel: Send + Sync {
222+
/// Add new entities to the waitset of the executor.
223+
fn add_to_waitset(&self, new_entity: Waitable);
224+
}
225+
220226
/// This trait defines the interface for passing new items into an executor to
221227
/// run.
222-
pub trait ExecutorChannel: Send + Sync {
228+
pub trait ExecutorChannel: WorkerChannel {
223229
/// Add a new item for the executor to run.
224230
fn add_async_task(&self, f: BoxFuture<'static, ()>);
225231

226-
/// Add new entities to the waitset of the executor.
227-
fn add_to_waitset(&self, new_entity: Waitable);
232+
/// Create a new channel specific to a worker whose payload must be
233+
/// initialized with the given function.
234+
fn create_worker_channel(
235+
&self,
236+
payload: Box<dyn Any + Send>,
237+
) -> Box<dyn ExecutorChannel>;
228238
}
229239

230240
/// This trait defines the interface for having an executor run.
231241
pub trait ExecutorRuntime: Send {
232242
/// Get a channel that can add new items for the executor to run.
233-
fn channel(&self) -> Box<dyn ExecutorChannel>;
243+
fn default_channel(&self) -> Box<dyn ExecutorChannel>;
234244

235245
/// Tell the runtime to spin while blocking any further execution until the
236246
/// spinning is complete.

rclrs/src/executor/basic_executor.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures::{
44
task::{waker_ref, ArcWake},
55
};
66
use std::{
7+
any::Any,
78
sync::{
89
atomic::{AtomicBool, Ordering},
910
mpsc::{channel, Receiver, Sender},
@@ -13,7 +14,7 @@ use std::{
1314
};
1415

1516
use crate::{
16-
executor::{ExecutorChannel, ExecutorRuntime, SpinConditions},
17+
executor::{ExecutorChannel, ExecutorRuntime, SpinConditions, WorkerChannel},
1718
Context, RclrsError, WaitSetRunner, Waitable,
1819
};
1920

@@ -131,7 +132,7 @@ impl ExecutorRuntime for BasicExecutorRuntime {
131132
})
132133
}
133134

134-
fn channel(&self) -> Box<dyn ExecutorChannel> {
135+
fn default_channel(&self) -> Box<dyn ExecutorChannel> {
135136
let waitable_sender = self.wait_set_runner.as_ref().expect(
136137
"The wait set runner of the basic executor is missing while creating a channel. \
137138
This is a critical bug in rclrs. \
@@ -195,14 +196,23 @@ struct BasicExecutorChannel {
195196
waitable_sender: UnboundedSender<Waitable>,
196197
}
197198

199+
impl WorkerChannel for BasicExecutorChannel {
200+
fn add_to_waitset(&self, new_entity: Waitable) {
201+
// TODO(@mxgrey): Log errors here once logging becomes available.
202+
self.waitable_sender.unbounded_send(new_entity).ok();
203+
}
204+
}
205+
198206
impl ExecutorChannel for BasicExecutorChannel {
199207
fn add_async_task(&self, f: BoxFuture<'static, ()>) {
200208
self.task_sender.add_async_task(f);
201209
}
202210

203-
fn add_to_waitset(&self, new_entity: Waitable) {
204-
// TODO(@mxgrey): Log errors here once logging becomes available.
205-
self.waitable_sender.unbounded_send(new_entity).ok();
211+
fn create_worker_channel(
212+
&self,
213+
payload: Box<dyn Any + Send>,
214+
) -> Box<dyn ExecutorChannel> {
215+
206216
}
207217
}
208218

rclrs/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod time;
2222
mod time_source;
2323
mod vendor;
2424
mod wait_set;
25+
mod worker;
2526

2627
#[cfg(test)]
2728
mod test_helpers;
@@ -48,3 +49,4 @@ pub use subscription::*;
4849
pub use time::*;
4950
use time_source::*;
5051
pub use wait_set::*;
52+
pub use worker::*;

rclrs/src/node.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use crate::{
3232
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
3333
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise,
3434
Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceAsyncCallback,
35-
ServiceCallback, ServiceOptions, ServiceState, Subscription, SubscriptionAsyncCallback,
36-
SubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams,
35+
ServiceCallback, ServiceOptions, ServiceState, Subscription, IntoAsyncSubscriptionCallback,
36+
IntoNodeSubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams,
3737
ENTITY_LIFECYCLE_MUTEX,
3838
};
3939

@@ -492,14 +492,14 @@ impl NodeState {
492492
pub fn create_subscription<'a, T, Args>(
493493
&self,
494494
options: impl Into<SubscriptionOptions<'a>>,
495-
callback: impl SubscriptionCallback<T, Args>,
495+
callback: impl IntoNodeSubscriptionCallback<T, Args>,
496496
) -> Result<Subscription<T>, RclrsError>
497497
where
498498
T: Message,
499499
{
500500
SubscriptionState::<T>::create(
501501
options,
502-
callback.into_subscription_callback(),
502+
callback.into_node_subscription_callback(),
503503
&self.handle,
504504
&self.commands,
505505
)
@@ -527,14 +527,14 @@ impl NodeState {
527527
pub fn create_async_subscription<'a, T, Args>(
528528
&self,
529529
options: impl Into<SubscriptionOptions<'a>>,
530-
callback: impl SubscriptionAsyncCallback<T, Args>,
530+
callback: impl IntoAsyncSubscriptionCallback<T, Args>,
531531
) -> Result<Subscription<T>, RclrsError>
532532
where
533533
T: Message,
534534
{
535535
SubscriptionState::<T>::create(
536536
options,
537-
callback.into_subscription_async_callback(),
537+
callback.into_async_subscription_callback(),
538538
&self.handle,
539539
&self.commands,
540540
)

rclrs/src/service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{
22
boxed::Box,
33
ffi::{CStr, CString},
44
sync::{Arc, Mutex, MutexGuard},
5+
any::Any,
56
};
67

78
use crate::{
@@ -51,7 +52,7 @@ pub type Service<T> = Arc<ServiceState<T>>;
5152
/// The public API of the [`Service`] type is implemented via `ServiceState`.
5253
///
5354
/// [1]: std::sync::Weak
54-
pub struct ServiceState<T>
55+
pub struct ServiceState<T, Payload = ()>
5556
where
5657
T: rosidl_runtime_rs::Service,
5758
{
@@ -212,7 +213,7 @@ impl<T> RclPrimitive for ServiceExecutable<T>
212213
where
213214
T: rosidl_runtime_rs::Service,
214215
{
215-
fn execute(&mut self) -> Result<(), RclrsError> {
216+
unsafe fn execute(&mut self, payload: &mut dyn Any) -> Result<(), RclrsError> {
216217
if let Err(err) = self
217218
.callback
218219
.lock()

0 commit comments

Comments
 (0)