Skip to content

Commit b845051

Browse files
committed
Add action client/server entities to wait set
They still aren't handled in the .wait() method, though.
1 parent ec5117d commit b845051

File tree

4 files changed

+229
-28
lines changed

4 files changed

+229
-28
lines changed

rclrs/src/action/client.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{error::ToResult, rcl_bindings::*, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX};
1+
use crate::{
2+
error::ToResult, rcl_bindings::*, wait::WaitableNumEntities, NodeHandle, RclrsError,
3+
ENTITY_LIFECYCLE_MUTEX,
4+
};
25
use std::{
36
ffi::CString,
47
marker::PhantomData,
@@ -45,16 +48,18 @@ impl Drop for ActionClientHandle {
4548
pub trait ActionClientBase: Send + Sync {
4649
/// Internal function to get a reference to the `rcl` handle.
4750
fn handle(&self) -> &ActionClientHandle;
51+
fn num_entities(&self) -> &WaitableNumEntities;
4852
// /// Tries to take a new request and run the callback with it.
4953
// fn execute(&self) -> Result<(), RclrsError>;
5054
}
5155

52-
pub struct ActionClient<T>
56+
pub struct ActionClient<ActionT>
5357
where
54-
T: rosidl_runtime_rs::Action,
58+
ActionT: rosidl_runtime_rs::Action,
5559
{
56-
_marker: PhantomData<fn() -> T>,
60+
_marker: PhantomData<fn() -> ActionT>,
5761
pub(crate) handle: Arc<ActionClientHandle>,
62+
num_entities: WaitableNumEntities,
5863
}
5964

6065
impl<T> ActionClient<T>
@@ -106,9 +111,23 @@ where
106111
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
107112
});
108113

114+
let mut num_entities = WaitableNumEntities::default();
115+
unsafe {
116+
rcl_action_client_wait_set_get_num_entities(
117+
&*handle.lock(),
118+
&mut num_entities.num_subscriptions,
119+
&mut num_entities.num_guard_conditions,
120+
&mut num_entities.num_timers,
121+
&mut num_entities.num_clients,
122+
&mut num_entities.num_services,
123+
)
124+
.ok()?;
125+
}
126+
109127
Ok(Self {
110128
_marker: Default::default(),
111129
handle,
130+
num_entities,
112131
})
113132
}
114133
}
@@ -120,4 +139,8 @@ where
120139
fn handle(&self) -> &ActionClientHandle {
121140
&self.handle
122141
}
142+
143+
fn num_entities(&self) -> &WaitableNumEntities {
144+
&self.num_entities
145+
}
123146
}

rclrs/src/action/server.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
use crate::{
2-
action::{GoalResponse, GoalUuid, CancelResponse, ServerGoalHandle}, error::ToResult, rcl_bindings::*, Clock, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
2+
action::{CancelResponse, GoalResponse, GoalUuid, ServerGoalHandle},
3+
error::ToResult,
4+
rcl_bindings::*,
5+
wait::WaitableNumEntities,
6+
Clock, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
37
};
48
use std::{
59
ffi::CString,
@@ -46,6 +50,7 @@ impl Drop for ActionServerHandle {
4650
pub trait ActionServerBase: Send + Sync {
4751
/// Internal function to get a reference to the `rcl` handle.
4852
fn handle(&self) -> &ActionServerHandle;
53+
fn num_entities(&self) -> &WaitableNumEntities;
4954
// /// Tries to take a new request and run the callback with it.
5055
// fn execute(&self) -> Result<(), RclrsError>;
5156
}
@@ -59,6 +64,7 @@ where
5964
ActionT: rosidl_runtime_rs::Action,
6065
{
6166
pub(crate) handle: Arc<ActionServerHandle>,
67+
num_entities: WaitableNumEntities,
6268
goal_callback: Box<GoalCallback<ActionT>>,
6369
cancel_callback: Box<CancelCallback<ActionT>>,
6470
accepted_callback: Box<AcceptedCallback<ActionT>>,
@@ -123,8 +129,22 @@ where
123129
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
124130
});
125131

132+
let mut num_entities = WaitableNumEntities::default();
133+
unsafe {
134+
rcl_action_server_wait_set_get_num_entities(
135+
&*handle.lock(),
136+
&mut num_entities.num_subscriptions,
137+
&mut num_entities.num_guard_conditions,
138+
&mut num_entities.num_timers,
139+
&mut num_entities.num_clients,
140+
&mut num_entities.num_services,
141+
)
142+
.ok()?;
143+
}
144+
126145
Ok(Self {
127146
handle,
147+
num_entities,
128148
goal_callback: Box::new(goal_callback),
129149
cancel_callback: Box::new(cancel_callback),
130150
accepted_callback: Box::new(accepted_callback),
@@ -139,4 +159,8 @@ where
139159
fn handle(&self) -> &ActionServerHandle {
140160
&self.handle
141161
}
162+
163+
fn num_entities(&self) -> &WaitableNumEntities {
164+
&self.num_entities
165+
}
142166
}

rclrs/src/node.rs

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ use rosidl_runtime_rs::Message;
1313

1414
pub use self::{builder::*, graph::*};
1515
use crate::{
16-
rcl_bindings::*, ActionClient, ActionClientBase, ActionServer, ActionServerBase, CancelResponse, Client, ClientBase, Clock,
17-
Context, ContextHandle, GoalResponse, GoalUuid, GuardCondition, ParameterBuilder,
18-
ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError,
19-
ServerGoalHandle, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback,
20-
TimeSource, ENTITY_LIFECYCLE_MUTEX,
16+
rcl_bindings::*, ActionClient, ActionClientBase, ActionServer, ActionServerBase,
17+
CancelResponse, Client, ClientBase, Clock, Context, ContextHandle, GoalResponse, GoalUuid,
18+
GuardCondition, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher,
19+
QoSProfile, RclrsError, ServerGoalHandle, Service, ServiceBase, Subscription, SubscriptionBase,
20+
SubscriptionCallback, TimeSource, ENTITY_LIFECYCLE_MUTEX,
2121
};
2222

2323
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -209,7 +209,10 @@ impl Node {
209209
T: rosidl_runtime_rs::Service,
210210
{
211211
let client = Arc::new(Client::<T>::new(Arc::clone(&self.handle), topic)?);
212-
self.clients_mtx.lock().unwrap().push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
212+
self.clients_mtx
213+
.lock()
214+
.unwrap()
215+
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
213216
Ok(client)
214217
}
215218

@@ -222,7 +225,9 @@ impl Node {
222225
T: rosidl_runtime_rs::Action,
223226
{
224227
let action_client = Arc::new(ActionClient::<T>::new(Arc::clone(&self.handle), topic)?);
225-
self.action_clients_mtx.lock().unwrap()
228+
self.action_clients_mtx
229+
.lock()
230+
.unwrap()
226231
.push(Arc::downgrade(&action_client) as Weak<dyn ActionClientBase>);
227232
Ok(action_client)
228233
}
@@ -252,7 +257,9 @@ impl Node {
252257
handle_cancel,
253258
handle_accepted,
254259
)?);
255-
self.action_servers_mtx.lock().unwrap()
260+
self.action_servers_mtx
261+
.lock()
262+
.unwrap()
256263
.push(Arc::downgrade(&action_server) as Weak<dyn ActionServerBase>);
257264
Ok(action_server)
258265
}
@@ -271,7 +278,9 @@ impl Node {
271278
Arc::clone(&self.handle.context_handle),
272279
None,
273280
));
274-
self.guard_conditions_mtx.lock().unwrap()
281+
self.guard_conditions_mtx
282+
.lock()
283+
.unwrap()
275284
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
276285
guard_condition
277286
}
@@ -293,7 +302,9 @@ impl Node {
293302
Arc::clone(&self.handle.context_handle),
294303
Some(Box::new(callback) as Box<dyn Fn() + Send + Sync>),
295304
));
296-
self.guard_conditions_mtx.lock().unwrap()
305+
self.guard_conditions_mtx
306+
.lock()
307+
.unwrap()
297308
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
298309
guard_condition
299310
}
@@ -332,7 +343,9 @@ impl Node {
332343
topic,
333344
callback,
334345
)?);
335-
self.services_mtx.lock().unwrap()
346+
self.services_mtx
347+
.lock()
348+
.unwrap()
336349
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
337350
Ok(service)
338351
}
@@ -356,36 +369,63 @@ impl Node {
356369
qos,
357370
callback,
358371
)?);
359-
self.subscriptions_mtx.lock()
372+
self.subscriptions_mtx
373+
.lock()
360374
.unwrap()
361375
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
362376
Ok(subscription)
363377
}
364378

365379
/// Returns the subscriptions that have not been dropped yet.
366380
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
367-
self.subscriptions_mtx.lock().unwrap()
381+
self.subscriptions_mtx
382+
.lock()
383+
.unwrap()
368384
.iter()
369385
.filter_map(Weak::upgrade)
370386
.collect()
371387
}
372388

373389
pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
374-
self.clients_mtx.lock().unwrap()
390+
self.clients_mtx
391+
.lock()
392+
.unwrap()
375393
.iter()
376394
.filter_map(Weak::upgrade)
377395
.collect()
378396
}
379397

380398
pub(crate) fn live_guard_conditions(&self) -> Vec<Arc<GuardCondition>> {
381-
self.guard_conditions_mtx.lock().unwrap()
399+
self.guard_conditions_mtx
400+
.lock()
401+
.unwrap()
382402
.iter()
383403
.filter_map(Weak::upgrade)
384404
.collect()
385405
}
386406

387407
pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
388-
self.services_mtx.lock().unwrap()
408+
self.services_mtx
409+
.lock()
410+
.unwrap()
411+
.iter()
412+
.filter_map(Weak::upgrade)
413+
.collect()
414+
}
415+
416+
pub(crate) fn live_action_clients(&self) -> Vec<Arc<dyn ActionClientBase>> {
417+
self.action_clients_mtx
418+
.lock()
419+
.unwrap()
420+
.iter()
421+
.filter_map(Weak::upgrade)
422+
.collect()
423+
}
424+
425+
pub(crate) fn live_action_servers(&self) -> Vec<Arc<dyn ActionServerBase>> {
426+
self.action_servers_mtx
427+
.lock()
428+
.unwrap()
389429
.iter()
390430
.filter_map(Weak::upgrade)
391431
.collect()

0 commit comments

Comments
 (0)