Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,30 @@ impl CreateBasicExecutor for Context {
self.create_executor(runtime)
}
}

#[cfg(test)]
mod tests {
use crate::*;
use std::time::Duration;

#[test]
fn test_timeout() {
let context = Context::default();
let mut executor = context.create_basic_executor();
let _node = executor
.create_node(&format!("test_timeout_{}", line!()))
.unwrap();

for _ in 0..10 {
let r = executor.spin(SpinOptions::default().timeout(Duration::from_millis(1)));
assert_eq!(r.len(), 1);
assert!(matches!(
r[0],
RclrsError::RclError {
code: RclReturnCode::Timeout,
..
}
));
}
}
}
37 changes: 22 additions & 15 deletions rclrs/src/wait_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,32 +114,39 @@ impl WaitSet {
})
}
};

// SAFETY: The comments in rcl mention "This function cannot operate on the same wait set
// in multiple threads, and the wait sets may not share content."
// We cannot currently guarantee that the wait sets may not share content, but it is
// mentioned in the doc comment for `add_subscription`.
// Also, the rcl_wait_set is obviously valid.
match unsafe { rcl_wait(&mut self.handle.rcl_wait_set, timeout_ns) }.ok() {
Ok(_) => (),
// * The we have exclusive access to rcl_wait_set because this is a
// mutable borrow of WaitSet, which houses rcl_wait_set.
// * We guarantee that the wait sets do not share content by funneling
// the waitable of each primitive to one (and only one) WaitSet when
// the primitive gets constructed. The waitables are never allowed to
// move between wait sets.
let r = match unsafe { rcl_wait(&mut self.handle.rcl_wait_set, timeout_ns) }.ok() {
Ok(_) => Ok(()),
Err(error) => match error {
RclrsError::RclError { code, msg } => match code {
RclReturnCode::WaitSetEmpty => (),
_ => return Err(RclrsError::RclError { code, msg }),
RclReturnCode::WaitSetEmpty => Ok(()),
_ => Err(RclrsError::RclError { code, msg }),
},
_ => return Err(error),
_ => Err(error),
},
}
};

// Remove any waitables that are no longer being used
for waitable in self.primitives.values_mut() {
waitable.retain(|w| w.in_use());
}

// For the remaining entities, check if they were activated and then run
// the callback for those that were.
for waiter in self.primitives.values_mut().flat_map(|v| v) {
if waiter.is_ready(&self.handle.rcl_wait_set) {
f(&mut *waiter.primitive)?;
// Do not check the readiness if an error was reported.
if !r.is_err() {
// For the remaining entities, check if they were activated and then run
// the callback for those that were.
for waiter in self.primitives.values_mut().flat_map(|v| v) {
if waiter.is_ready(&self.handle.rcl_wait_set) {
f(&mut *waiter.primitive)?;
}
}
}

Expand All @@ -160,7 +167,7 @@ impl WaitSet {
);
}

Ok(())
r
}

/// Get a count of the different kinds of entities in the wait set.
Expand Down
Loading