Skip to content

Commit 965ca22

Browse files
committed
Integrated the Timer into the WaitSet.
Signed-off-by: Agustin Alba Chicar <[email protected]>
1 parent 1095351 commit 965ca22

File tree

3 files changed

+134
-13
lines changed

3 files changed

+134
-13
lines changed

rclrs/src/executor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ impl SingleThreadedExecutor {
4848
})
4949
{
5050
let wait_set = WaitSet::new_for_node(&node)?;
51-
let ready_entities = wait_set.wait(timeout)?;
51+
let mut ready_entities = wait_set.wait(timeout)?;
52+
53+
for ready_timer in ready_entities.timers.iter_mut() {
54+
ready_timer.execute()?;
55+
}
5256

5357
for ready_subscription in ready_entities.subscriptions {
5458
ready_subscription.execute()?;

rclrs/src/timer.rs

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,24 @@ use crate::{
22
clock::Clock, context::Context, error::RclrsError, rcl_bindings::*, to_rclrs_result
33
};
44
// use std::fmt::Debug;
5-
use std::sync::{Arc, Mutex};
5+
use std::sync::{atomic::AtomicBool, Arc, Mutex};
66

7-
pub type TimerCallback = Box<dyn FnMut(i64) + Send + Sync>;
7+
pub type TimerCallback = Box<dyn Fn(i64) + Send + Sync>;
88

99
// #[derive(Debug)]
1010
pub struct Timer {
11-
rcl_timer: Arc<Mutex<rcl_timer_t>>,
11+
pub(crate) rcl_timer: Arc<Mutex<rcl_timer_t>>,
1212
callback: Option<TimerCallback>,
13+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
1314
}
1415

1516
impl Timer {
1617
/// Creates a new timer (constructor)
1718
pub fn new(clock: &Clock, context: &Context, period: i64) -> Result<Timer, RclrsError> {
18-
Self::with_callback(clock, context, period, None)
19+
Self::new_with_callback(clock, context, period, None)
1920
}
2021

21-
pub fn with_callback(clock: &Clock, context: &Context, period: i64, callback: Option<TimerCallback>) -> Result<Timer, RclrsError> {
22+
pub fn new_with_callback(clock: &Clock, context: &Context, period: i64, callback: Option<TimerCallback>) -> Result<Timer, RclrsError> {
2223
let mut rcl_timer;
2324
let timer_init_result = unsafe {
2425
// SAFETY: Getting a default value is always safe.
@@ -43,6 +44,7 @@ impl Timer {
4344
Timer {
4445
rcl_timer: Arc::new(Mutex::new(rcl_timer)),
4546
callback,
47+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
4648
}
4749
})
4850
}
@@ -114,8 +116,7 @@ impl Timer {
114116
})
115117
}
116118

117-
/// Resets the timer, setting the last call time to now
118-
pub fn reset(&mut self) -> Result<(), RclrsError>
119+
pub fn reset(&self) -> Result<(), RclrsError>
119120
{
120121
let mut rcl_timer = self.rcl_timer.lock().unwrap();
121122
to_rclrs_result(unsafe {rcl_timer_reset(&mut *rcl_timer)})
@@ -144,7 +145,20 @@ impl Timer {
144145
is_ready
145146
})
146147
}
147-
// handle() -> RCLC Timer Type
148+
149+
pub(crate) fn execute(&self) -> Result<(), RclrsError>
150+
{
151+
if self.is_ready()?
152+
{
153+
let time_since_last_call = self.time_since_last_call()?;
154+
self.call()?;
155+
if let Some(ref callback) = self.callback
156+
{
157+
callback(time_since_last_call);
158+
}
159+
}
160+
Ok(())
161+
}
148162
}
149163

150164
/// 'Drop' trait implementation to be able to release the resources
@@ -158,6 +172,12 @@ impl Drop for rcl_timer_t {
158172
}
159173
}
160174

175+
impl PartialEq for Timer {
176+
fn eq(&self, other: &Self) -> bool {
177+
Arc::ptr_eq(&self.rcl_timer, &other.rcl_timer)
178+
}
179+
}
180+
161181
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
162182
// they are running in. Therefore, this type can be safely sent to another thread.
163183
unsafe impl Send for rcl_timer_t {}
@@ -282,7 +302,7 @@ mod tests {
282302
let clock = Clock::steady();
283303
let context = Context::new(vec![]).unwrap();
284304
let period_ns: i64 = 2e6 as i64; // 2 milliseconds.
285-
let mut dut = Timer::new(&clock, &context, period_ns).unwrap();
305+
let dut = Timer::new(&clock, &context, period_ns).unwrap();
286306
let elapsed = period_ns - dut.time_until_next_call().unwrap();
287307
assert!(elapsed < tolerance , "elapsed before reset: {}", elapsed);
288308
thread::sleep(time::Duration::from_millis(1));
@@ -297,7 +317,7 @@ mod tests {
297317
let clock = Clock::steady();
298318
let context = Context::new(vec![]).unwrap();
299319
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
300-
let mut dut = Timer::new(&clock, &context, period_ns).unwrap();
320+
let dut = Timer::new(&clock, &context, period_ns).unwrap();
301321
let elapsed = period_ns - dut.time_until_next_call().unwrap();
302322
assert!(elapsed < tolerance , "elapsed before reset: {}", elapsed);
303323

@@ -337,8 +357,35 @@ mod tests {
337357
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
338358
let foo = Arc::new(Mutex::new(0i64));
339359
let foo_callback = foo.clone();
340-
let dut = Timer::with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
360+
let dut = Timer::new_with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
341361
dut.callback.unwrap()(123);
342362
assert_eq!(*foo.lock().unwrap(), 123);
343363
}
364+
365+
#[test]
366+
fn test_execute_when_is_not_ready() {
367+
let clock = Clock::steady();
368+
let context = Context::new(vec![]).unwrap();
369+
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
370+
let foo = Arc::new(Mutex::new(0i64));
371+
let foo_callback = foo.clone();
372+
let dut = Timer::new_with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
373+
assert!(dut.execute().is_ok());
374+
assert_eq!(*foo.lock().unwrap(), 0i64);
375+
}
376+
377+
#[test]
378+
fn test_execute_when_is_ready() {
379+
let clock = Clock::steady();
380+
let context = Context::new(vec![]).unwrap();
381+
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
382+
let foo = Arc::new(Mutex::new(0i64));
383+
let foo_callback = foo.clone();
384+
let dut = Timer::new_with_callback(&clock, &context, period_ns, Some(Box::new(move |x| *foo_callback.lock().unwrap() = x ))).unwrap();
385+
thread::sleep(time::Duration::from_micros(1500));
386+
assert!(dut.execute().is_ok());
387+
let x = *foo.lock().unwrap();
388+
assert!(x > 1500000i64);
389+
assert!(x < 1600000i64);
390+
}
344391
}

rclrs/src/wait.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{sync::Arc, time::Duration, vec::Vec};
2020
use crate::{
2121
error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult},
2222
rcl_bindings::*,
23-
ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase,
23+
ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, Timer
2424
};
2525

2626
mod exclusivity_guard;
@@ -51,6 +51,7 @@ pub struct WaitSet {
5151
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
5252
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
5353
handle: WaitSetHandle,
54+
timers: Vec<ExclusivityGuard<Arc<Timer>>>,
5455
}
5556

5657
/// A list of entities that are ready, returned by [`WaitSet::wait`].
@@ -63,6 +64,8 @@ pub struct ReadyEntities {
6364
pub guard_conditions: Vec<Arc<GuardCondition>>,
6465
/// A list of services that have potentially received requests.
6566
pub services: Vec<Arc<dyn ServiceBase>>,
67+
/// TODO
68+
pub timers: Vec<Arc<Timer>>,
6669
}
6770

6871
impl Drop for rcl_wait_set_t {
@@ -127,6 +130,7 @@ impl WaitSet {
127130
rcl_wait_set,
128131
context_handle: Arc::clone(&context.handle),
129132
},
133+
timers: Vec::new(),
130134
})
131135
}
132136

@@ -178,6 +182,7 @@ impl WaitSet {
178182
self.guard_conditions.clear();
179183
self.clients.clear();
180184
self.services.clear();
185+
self.timers.clear();
181186
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
182187
// valid, which it always is in our case. Hence, only debug_assert instead of returning
183188
// Result.
@@ -311,6 +316,27 @@ impl WaitSet {
311316
Ok(())
312317
}
313318

319+
/// TBD
320+
pub fn add_timer(&mut self, timer: Arc<Timer>) -> Result<(), RclrsError> {
321+
let exclusive_timer = ExclusivityGuard::new(
322+
Arc::clone(&timer),
323+
Arc::clone(&timer.in_use_by_wait_set),
324+
)?;
325+
unsafe {
326+
// SAFETY: I'm not sure if it's required, but the timer pointer will remain valid
327+
// for as long as the wait set exists, because it's stored in self.timers.
328+
// Passing in a null pointer for the third argument is explicitly allowed.
329+
rcl_wait_set_add_timer(
330+
&mut self.handle.rcl_wait_set,
331+
&* (*(*timer).rcl_timer).lock().unwrap() as *const _, // TODO :)
332+
core::ptr::null_mut(),
333+
)
334+
}
335+
.ok()?;
336+
self.timers.push(exclusive_timer);
337+
Ok(())
338+
}
339+
314340
/// Blocks until the wait set is ready, or until the timeout has been exceeded.
315341
///
316342
/// If the timeout is `None` then this function will block indefinitely until
@@ -365,6 +391,7 @@ impl WaitSet {
365391
clients: Vec::new(),
366392
guard_conditions: Vec::new(),
367393
services: Vec::new(),
394+
timers: Vec::new(),
368395
};
369396
for (i, subscription) in self.subscriptions.iter().enumerate() {
370397
// SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is
@@ -409,13 +436,24 @@ impl WaitSet {
409436
ready_entities.services.push(Arc::clone(&service.waitable));
410437
}
411438
}
439+
440+
for (i, timer) in self.timers.iter().enumerate() {
441+
// SAFETY: The `timers` entry is an array of pointers, and this dereferencing is
442+
// equivalent to
443+
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
444+
let wait_set_entry = unsafe { *self.handle.rcl_wait_set.timers.add(i) };
445+
if !wait_set_entry.is_null() {
446+
ready_entities.timers.push(Arc::clone(&timer.waitable));
447+
}
448+
}
412449
Ok(ready_entities)
413450
}
414451
}
415452

416453
#[cfg(test)]
417454
mod tests {
418455
use super::*;
456+
use crate::{clock::Clock};
419457

420458
#[test]
421459
fn traits() {
@@ -440,4 +478,36 @@ mod tests {
440478

441479
Ok(())
442480
}
481+
482+
#[test]
483+
fn timer_in_wait_not_set_readies() -> Result<(), RclrsError> {
484+
let context = Context::new([])?;
485+
let clock = Clock::steady();
486+
let period: i64 = 1e6 as i64; // 1 milliseconds.
487+
let timer = Arc::new(Timer::new(&clock, &context, period)?);
488+
489+
let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?;
490+
wait_set.add_timer(timer.clone())?;
491+
492+
let readies = wait_set.wait(Some(std::time::Duration::from_micros(0)))?;
493+
assert!(!readies.timers.contains(&timer));
494+
495+
Ok(())
496+
}
497+
498+
#[test]
499+
fn timer_in_wait_set_readies() -> Result<(), RclrsError> {
500+
let context = Context::new([])?;
501+
let clock = Clock::steady();
502+
let period: i64 = 1e6 as i64; // 1 milliseconds.
503+
let timer = Arc::new(Timer::new(&clock, &context, period)?);
504+
505+
let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?;
506+
wait_set.add_timer(timer.clone())?;
507+
508+
let readies = wait_set.wait(Some(std::time::Duration::from_micros(1500)))?;
509+
assert!(readies.timers.contains(&timer));
510+
511+
Ok(())
512+
}
443513
}

0 commit comments

Comments
 (0)