Skip to content

Commit 4af6efe

Browse files
committed
Migrated all primitives to new worker architecture
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 5ae2a94 commit 4af6efe

20 files changed

+795
-537
lines changed

rclrs/src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ where
185185

186186
/// Creates a new client.
187187
pub(crate) fn create<'a>(
188-
node: &Node,
189188
options: impl Into<ClientOptions<'a>>,
189+
node: &Node,
190190
) -> Result<Arc<Self>, RclrsError>
191191
// This uses pub(crate) visibility to avoid instantiating this struct outside
192192
// [`Node::create_client`], see the struct's documentation for the rationale
@@ -231,12 +231,12 @@ where
231231
}
232232
}
233233

234+
let commands = node.commands().async_worker_commands();
234235
let handle = Arc::new(ClientHandle {
235236
rcl_client: Mutex::new(rcl_client),
236237
node: Arc::clone(&node),
237238
});
238239

239-
let commands = node.commands();
240240
let board = Arc::new(Mutex::new(ClientRequestBoard::new()));
241241

242242
let (waitable, lifecycle) = Waitable::new(
@@ -300,7 +300,7 @@ impl<T> RclPrimitive for ClientExecutable<T>
300300
where
301301
T: rosidl_runtime_rs::Service,
302302
{
303-
unsafe fn execute(&mut self, payload: &mut dyn Any) -> Result<(), RclrsError> {
303+
unsafe fn execute(&mut self, _: &mut dyn Any) -> Result<(), RclrsError> {
304304
self.board.lock().unwrap().execute(&self.handle)
305305
}
306306

rclrs/src/executor.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl Executor {
9191
E: 'static + ExecutorRuntime + Send,
9292
{
9393
let executor_channel = runtime.channel();
94-
let node_worker_commands = ExecutorCommands::impl_create_worker_commands(
94+
let async_worker_commands = ExecutorCommands::impl_create_worker_commands(
9595
&Context { handle: Arc::clone(&context) },
9696
&*executor_channel,
9797
Box::new(()),
@@ -103,7 +103,7 @@ impl Executor {
103103
},
104104
executor_channel,
105105
halt_spinning: Arc::new(AtomicBool::new(false)),
106-
node_worker_commands,
106+
async_worker_commands,
107107
});
108108

109109
Self {
@@ -130,7 +130,7 @@ impl Executor {
130130
pub struct ExecutorCommands {
131131
context: Context,
132132
executor_channel: Arc<dyn ExecutorChannel>,
133-
node_worker_commands: Arc<WorkerCommands>,
133+
async_worker_commands: Arc<WorkerCommands>,
134134
halt_spinning: Arc<AtomicBool>,
135135
}
136136

@@ -169,7 +169,7 @@ impl ExecutorCommands {
169169
F::Output: Send,
170170
{
171171
let (mut sender, receiver) = oneshot::channel();
172-
self.node_worker_commands.channel.add_async_task(Box::pin(async move {
172+
self.async_worker_commands.channel.add_async_task(Box::pin(async move {
173173
let cancellation = sender.cancellation();
174174
let output = match select(cancellation, std::pin::pin!(f)).await {
175175
// The task was cancelled
@@ -204,7 +204,7 @@ impl ExecutorCommands {
204204
F::Output: Send,
205205
{
206206
let (sender, receiver) = oneshot::channel();
207-
self.node_worker_commands.channel.add_async_task(Box::pin(async move {
207+
self.async_worker_commands.channel.add_async_task(Box::pin(async move {
208208
sender.send(f.await).ok();
209209
}));
210210
receiver
@@ -216,23 +216,23 @@ impl ExecutorCommands {
216216
}
217217

218218
pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
219-
self.node_worker_commands.channel.add_to_waitset(waitable);
219+
self.async_worker_commands.channel.add_to_waitset(waitable);
220220
}
221221

222222
#[cfg(test)]
223223
pub(crate) fn wake_all_wait_sets(&self) {
224224
self.executor_channel.wake_all_wait_sets();
225225
}
226226

227-
pub(crate) fn node_worker_commands(&self) -> &Arc<WorkerCommands> {
228-
&self.node_worker_commands
227+
pub(crate) fn async_worker_commands(&self) -> &Arc<WorkerCommands> {
228+
&self.async_worker_commands
229229
}
230230

231231
pub(crate) fn create_worker_commands(&self, payload: Box<dyn Any + Send>) -> Arc<WorkerCommands> {
232232
Self::impl_create_worker_commands(&self.context, &*self.executor_channel, payload)
233233
}
234234

235-
/// We separate out this impl function so that we can create the node worker
235+
/// We separate out this impl function so that we can create the async worker
236236
/// before the [`ExecutorCommands`] is finished being constructed.
237237
fn impl_create_worker_commands(
238238
context: &Context,

rclrs/src/node.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use rosidl_runtime_rs::Message;
3131
use crate::{
3232
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
3333
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise,
34-
Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceAsyncCallback,
35-
ServiceCallback, ServiceOptions, ServiceState, Subscription, IntoAsyncSubscriptionCallback,
34+
Publisher, PublisherOptions, PublisherState, RclrsError, Service, IntoAsyncServiceCallback,
35+
IntoNodeServiceCallback, ServiceOptions, ServiceState, Subscription, IntoAsyncSubscriptionCallback,
3636
IntoNodeSubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams,
3737
ENTITY_LIFECYCLE_MUTEX,
3838
};
@@ -278,7 +278,7 @@ impl NodeState {
278278
where
279279
T: rosidl_runtime_rs::Service,
280280
{
281-
ClientState::<T>::create(self, options)
281+
ClientState::<T>::create(options, self)
282282
}
283283

284284
/// Creates a [`Publisher`][1].
@@ -322,7 +322,7 @@ impl NodeState {
322322
where
323323
T: Message,
324324
{
325-
PublisherState::<T>::create(Arc::clone(&self.handle), options)
325+
PublisherState::<T>::create(options, Arc::clone(&self.handle))
326326
}
327327

328328
/// Creates a [`Service`] with an ordinary callback.
@@ -385,16 +385,16 @@ impl NodeState {
385385
pub fn create_service<'a, T, Args>(
386386
&self,
387387
options: impl Into<ServiceOptions<'a>>,
388-
callback: impl ServiceCallback<T, Args>,
388+
callback: impl IntoNodeServiceCallback<T, Args>,
389389
) -> Result<Service<T>, RclrsError>
390390
where
391391
T: rosidl_runtime_rs::Service,
392392
{
393-
ServiceState::<T>::create(
393+
ServiceState::<T, Node>::create(
394394
options,
395-
callback.into_service_callback(),
395+
callback.into_node_service_callback(),
396396
&self.handle,
397-
&self.commands,
397+
self.commands.async_worker_commands(),
398398
)
399399
}
400400

@@ -420,16 +420,16 @@ impl NodeState {
420420
pub fn create_async_service<'a, T, Args>(
421421
&self,
422422
options: impl Into<ServiceOptions<'a>>,
423-
callback: impl ServiceAsyncCallback<T, Args>,
423+
callback: impl IntoAsyncServiceCallback<T, Args>,
424424
) -> Result<Service<T>, RclrsError>
425425
where
426426
T: rosidl_runtime_rs::Service,
427427
{
428-
ServiceState::<T>::create(
428+
ServiceState::<T, Node>::create(
429429
options,
430-
callback.into_service_async_callback(),
430+
callback.into_async_service_callback(),
431431
&self.handle,
432-
&self.commands,
432+
self.commands.async_worker_commands(),
433433
)
434434
}
435435

@@ -501,7 +501,7 @@ impl NodeState {
501501
options,
502502
callback.into_node_subscription_callback(),
503503
&self.handle,
504-
self.commands.node_worker_commands(),
504+
self.commands.async_worker_commands(),
505505
)
506506
}
507507

@@ -536,7 +536,7 @@ impl NodeState {
536536
options,
537537
callback.into_async_subscription_callback(),
538538
&self.handle,
539-
self.commands.node_worker_commands(),
539+
self.commands.async_worker_commands(),
540540
)
541541
}
542542

rclrs/src/publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ where
9696
///
9797
/// Node and namespace changes are always applied _before_ topic remapping.
9898
pub(crate) fn create<'a>(
99-
node_handle: Arc<NodeHandle>,
10099
options: impl Into<PublisherOptions<'a>>,
100+
node_handle: Arc<NodeHandle>,
101101
) -> Result<Arc<Self>, RclrsError>
102102
where
103103
T: Message,

0 commit comments

Comments
 (0)