Skip to content

Commit 98eef19

Browse files
committed
Add action client/server entities to wait set
They still aren't handled in the .wait() method, though.
1 parent f197f24 commit 98eef19

File tree

4 files changed

+233
-29
lines changed

4 files changed

+233
-29
lines changed

rclrs/src/action/client.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{error::ToResult, rcl_bindings::*, Node, RclrsError, ENTITY_LIFECYCLE_MUTEX};
1+
use crate::{
2+
error::ToResult, rcl_bindings::*, wait::WaitableNumEntities, Node, RclrsError,
3+
ENTITY_LIFECYCLE_MUTEX,
4+
};
25
use std::{
36
ffi::CString,
47
marker::PhantomData,
@@ -45,16 +48,18 @@ impl Drop for ActionClientHandle {
4548
pub trait ActionClientBase: Send + Sync {
4649
/// Internal function to get a reference to the `rcl` handle.
4750
fn handle(&self) -> &ActionClientHandle;
51+
fn num_entities(&self) -> &WaitableNumEntities;
4852
// /// Tries to take a new request and run the callback with it.
4953
// fn execute(&self) -> Result<(), RclrsError>;
5054
}
5155

52-
pub struct ActionClient<T>
56+
pub struct ActionClient<ActionT>
5357
where
54-
T: rosidl_runtime_rs::Action,
58+
ActionT: rosidl_runtime_rs::Action,
5559
{
56-
_marker: PhantomData<fn() -> T>,
60+
_marker: PhantomData<fn() -> ActionT>,
5761
pub(crate) handle: Arc<ActionClientHandle>,
62+
num_entities: WaitableNumEntities,
5863
}
5964

6065
impl<T> ActionClient<T>
@@ -106,9 +111,23 @@ where
106111
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
107112
});
108113

114+
let mut num_entities = WaitableNumEntities::default();
115+
unsafe {
116+
rcl_action_client_wait_set_get_num_entities(
117+
&*handle.lock(),
118+
&mut num_entities.num_subscriptions,
119+
&mut num_entities.num_guard_conditions,
120+
&mut num_entities.num_timers,
121+
&mut num_entities.num_clients,
122+
&mut num_entities.num_services,
123+
)
124+
.ok()?;
125+
}
126+
109127
Ok(Self {
110128
_marker: Default::default(),
111129
handle,
130+
num_entities,
112131
})
113132
}
114133
}
@@ -120,4 +139,8 @@ where
120139
fn handle(&self) -> &ActionClientHandle {
121140
&self.handle
122141
}
142+
143+
fn num_entities(&self) -> &WaitableNumEntities {
144+
&self.num_entities
145+
}
123146
}

rclrs/src/action/server.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
use crate::{action::{GoalResponse, GoalUuid, CancelResponse, ServerGoalHandle}, error::ToResult, rcl_bindings::*, Clock, Node, RclrsError, ENTITY_LIFECYCLE_MUTEX};
1+
use crate::{
2+
action::{CancelResponse, GoalResponse, GoalUuid, ServerGoalHandle},
3+
error::ToResult,
4+
rcl_bindings::*,
5+
wait::WaitableNumEntities,
6+
Clock, Node, RclrsError, ENTITY_LIFECYCLE_MUTEX,
7+
};
28
use std::{
39
ffi::CString,
410
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
@@ -44,6 +50,7 @@ impl Drop for ActionServerHandle {
4450
pub trait ActionServerBase: Send + Sync {
4551
/// Internal function to get a reference to the `rcl` handle.
4652
fn handle(&self) -> &ActionServerHandle;
53+
fn num_entities(&self) -> &WaitableNumEntities;
4754
// /// Tries to take a new request and run the callback with it.
4855
// fn execute(&self) -> Result<(), RclrsError>;
4956
}
@@ -57,6 +64,7 @@ where
5764
ActionT: rosidl_runtime_rs::Action,
5865
{
5966
pub(crate) handle: Arc<ActionServerHandle>,
67+
num_entities: WaitableNumEntities,
6068
goal_callback: Box<GoalCallback<ActionT>>,
6169
cancel_callback: Box<CancelCallback<ActionT>>,
6270
accepted_callback: Box<AcceptedCallback<ActionT>>,
@@ -121,8 +129,22 @@ where
121129
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
122130
});
123131

132+
let mut num_entities = WaitableNumEntities::default();
133+
unsafe {
134+
rcl_action_server_wait_set_get_num_entities(
135+
&*handle.lock(),
136+
&mut num_entities.num_subscriptions,
137+
&mut num_entities.num_guard_conditions,
138+
&mut num_entities.num_timers,
139+
&mut num_entities.num_clients,
140+
&mut num_entities.num_services,
141+
)
142+
.ok()?;
143+
}
144+
124145
Ok(Self {
125146
handle,
147+
num_entities,
126148
goal_callback: Box::new(goal_callback),
127149
cancel_callback: Box::new(cancel_callback),
128150
accepted_callback: Box::new(accepted_callback),
@@ -137,4 +159,8 @@ where
137159
fn handle(&self) -> &ActionServerHandle {
138160
&self.handle
139161
}
162+
163+
fn num_entities(&self) -> &WaitableNumEntities {
164+
&self.num_entities
165+
}
140166
}

rclrs/src/node.rs

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ use std::{
1919
use rosidl_runtime_rs::Message;
2020

2121
use crate::{
22-
rcl_bindings::*, ActionClient, ActionClientBase, ActionServer, ActionServerBase, CancelResponse, Client, ClientBase, ClientOptions,
23-
ClientState, Clock, ContextHandle, GoalResponse, GoalUuid, GuardCondition, LogParams, Logger,
24-
ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher,
25-
PublisherOptions, PublisherState, RclrsError, ServerGoalHandle, Service, ServiceBase,
26-
ServiceOptions, ServiceState, Subscription, SubscriptionBase, SubscriptionCallback,
27-
SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
22+
rcl_bindings::*, ActionClient, ActionClientBase, ActionServer, ActionServerBase,
23+
CancelResponse, Client, ClientBase, ClientOptions, ClientState, Clock, ContextHandle,
24+
GoalResponse, GoalUuid, GuardCondition, LogParams, Logger, ParameterBuilder,
25+
ParameterInterface, ParameterVariant, Parameters, Publisher, PublisherOptions, PublisherState,
26+
RclrsError, ServerGoalHandle, Service, ServiceBase, ServiceOptions, ServiceState, Subscription,
27+
SubscriptionBase, SubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource,
28+
ToLogParams, ENTITY_LIFECYCLE_MUTEX,
2829
};
2930

3031
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -284,7 +285,10 @@ impl NodeState {
284285
T: rosidl_runtime_rs::Service,
285286
{
286287
let client = Arc::new(ClientState::<T>::new(self, options)?);
287-
self.clients_mtx.lock().unwrap().push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
288+
self.clients_mtx
289+
.lock()
290+
.unwrap()
291+
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
288292
Ok(client)
289293
}
290294

@@ -300,7 +304,9 @@ impl NodeState {
300304
T: rosidl_runtime_rs::Action,
301305
{
302306
let action_client = Arc::new(ActionClient::<T>::new(self, topic)?);
303-
self.action_clients_mtx.lock().unwrap()
307+
self.action_clients_mtx
308+
.lock()
309+
.unwrap()
304310
.push(Arc::downgrade(&action_client) as Weak<dyn ActionClientBase>);
305311
Ok(action_client)
306312
}
@@ -330,7 +336,9 @@ impl NodeState {
330336
handle_cancel,
331337
handle_accepted,
332338
)?);
333-
self.action_servers_mtx.lock().unwrap()
339+
self.action_servers_mtx
340+
.lock()
341+
.unwrap()
334342
.push(Arc::downgrade(&action_server) as Weak<dyn ActionServerBase>);
335343
Ok(action_server)
336344
}
@@ -348,7 +356,9 @@ impl NodeState {
348356
Arc::clone(&self.handle.context_handle),
349357
None,
350358
));
351-
self.guard_conditions_mtx.lock().unwrap()
359+
self.guard_conditions_mtx
360+
.lock()
361+
.unwrap()
352362
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
353363
guard_condition
354364
}
@@ -369,7 +379,9 @@ impl NodeState {
369379
Arc::clone(&self.handle.context_handle),
370380
Some(Box::new(callback) as Box<dyn Fn() + Send + Sync>),
371381
));
372-
self.guard_conditions_mtx.lock().unwrap()
382+
self.guard_conditions_mtx
383+
.lock()
384+
.unwrap()
373385
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
374386
guard_condition
375387
}
@@ -469,7 +481,9 @@ impl NodeState {
469481
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
470482
{
471483
let service = Arc::new(ServiceState::<T>::new(self, options, callback)?);
472-
self.services_mtx.lock().unwrap()
484+
self.services_mtx
485+
.lock()
486+
.unwrap()
473487
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
474488
Ok(service)
475489
}
@@ -524,36 +538,63 @@ impl NodeState {
524538
T: Message,
525539
{
526540
let subscription = Arc::new(SubscriptionState::<T>::new(self, options, callback)?);
527-
self.subscriptions_mtx.lock()
541+
self.subscriptions_mtx
542+
.lock()
528543
.unwrap()
529544
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
530545
Ok(subscription)
531546
}
532547

533548
/// Returns the subscriptions that have not been dropped yet.
534549
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
535-
self.subscriptions_mtx.lock().unwrap()
550+
self.subscriptions_mtx
551+
.lock()
552+
.unwrap()
536553
.iter()
537554
.filter_map(Weak::upgrade)
538555
.collect()
539556
}
540557

541558
pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
542-
self.clients_mtx.lock().unwrap()
559+
self.clients_mtx
560+
.lock()
561+
.unwrap()
543562
.iter()
544563
.filter_map(Weak::upgrade)
545564
.collect()
546565
}
547566

548567
pub(crate) fn live_guard_conditions(&self) -> Vec<Arc<GuardCondition>> {
549-
self.guard_conditions_mtx.lock().unwrap()
568+
self.guard_conditions_mtx
569+
.lock()
570+
.unwrap()
550571
.iter()
551572
.filter_map(Weak::upgrade)
552573
.collect()
553574
}
554575

555576
pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
556-
self.services_mtx.lock().unwrap()
577+
self.services_mtx
578+
.lock()
579+
.unwrap()
580+
.iter()
581+
.filter_map(Weak::upgrade)
582+
.collect()
583+
}
584+
585+
pub(crate) fn live_action_clients(&self) -> Vec<Arc<dyn ActionClientBase>> {
586+
self.action_clients_mtx
587+
.lock()
588+
.unwrap()
589+
.iter()
590+
.filter_map(Weak::upgrade)
591+
.collect()
592+
}
593+
594+
pub(crate) fn live_action_servers(&self) -> Vec<Arc<dyn ActionServerBase>> {
595+
self.action_servers_mtx
596+
.lock()
597+
.unwrap()
557598
.iter()
558599
.filter_map(Weak::upgrade)
559600
.collect()

0 commit comments

Comments
 (0)