Skip to content

Commit d201992

Browse files
committed
Add initial tests and improve ergonomics
Signed-off-by: Michael X. Grey <[email protected]>
1 parent a9bd493 commit d201992

16 files changed

+636
-101
lines changed

rclrs/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@ path = "src/lib.rs"
1717
# Needed for dynamically finding type support libraries
1818
ament_rs = { version = "0.2", optional = true }
1919

20+
# Needed to create the GoalClientStream
21+
async-stream = "*"
22+
2023
# Needed for uploading documentation to docs.rs
2124
cfg-if = "1.0.0"
2225

2326
# Needed for clients
2427
futures = "0.3"
2528

29+
# Needed for racing futures
30+
futures-lite = { version = "2.6", features = ["std", "race"] }
31+
2632
# Needed for the runtime-agnostic timeout feature
2733
async-std = "1.13"
2834

@@ -42,6 +48,9 @@ serde-big-array = { version = "0.5.1", optional = true }
4248
# into the tokio runtime.
4349
tokio = { version = "1", features = ["sync"] }
4450

51+
# Needed to combine concurrent streams for easier ergonomics in action clients
52+
tokio-stream = "0.1"
53+
4554
# Needed by action clients to generate UUID values for their goals
4655
uuid = { version = "1", features = ["v4"] }
4756

rclrs/src/action.rs

Lines changed: 148 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ pub(crate) mod action_server;
1010
pub use action_server::*;
1111

1212
use crate::{
13-
rcl_bindings::RCL_ACTION_UUID_SIZE,
13+
rcl_bindings::*,
1414
vendor::builtin_interfaces::msg::Time,
15+
DropGuard,
1516
log_error,
1617
};
1718
use std::fmt;
@@ -95,7 +96,7 @@ impl CancelResponseCode {
9596

9697
impl From<i8> for CancelResponseCode {
9798
fn from(value: i8) -> Self {
98-
if value <= 0 && value <= 3 {
99+
if 0 <= value && value <= 3 {
99100
unsafe {
100101
// SAFETY: We have already ensured that the integer value is
101102
// within the acceptable range for the enum, so transmuting is
@@ -177,7 +178,7 @@ pub enum GoalStatusCode {
177178

178179
impl From<i8> for GoalStatusCode {
179180
fn from(value: i8) -> Self {
180-
if value <= 0 && value <= 6 {
181+
if 0 <= value && value <= 6 {
181182
unsafe {
182183
// SAFETY: We have already ensured that the integer value is
183184
// within the acceptable range for the enum, so transmuting is
@@ -208,3 +209,147 @@ pub struct GoalStatus {
208209
/// client, so care should be taken when using this time value.
209210
pub stamp: Time,
210211
}
212+
213+
fn empty_goal_status_array() -> DropGuard<rcl_action_goal_status_array_t> {
214+
DropGuard::new(
215+
unsafe {
216+
// SAFETY: No preconditions
217+
let mut array = rcl_action_get_zero_initialized_goal_status_array();
218+
array.allocator = rcutils_get_default_allocator();
219+
array
220+
},
221+
|mut goal_statuses| unsafe {
222+
// SAFETY: The goal_status array is either zero-initialized and empty or populated by
223+
// `rcl_action_get_goal_status_array`. In either case, it can be safely finalized.
224+
rcl_action_goal_status_array_fini(&mut goal_statuses);
225+
}
226+
)
227+
}
228+
229+
#[cfg(test)]
230+
mod tests {
231+
use crate::*;
232+
use example_interfaces::action::{Fibonacci, Fibonacci_Goal, Fibonacci_Result, Fibonacci_Feedback};
233+
use tokio::sync::mpsc::unbounded_channel;
234+
use futures::StreamExt;
235+
use std::time::Duration;
236+
237+
#[test]
238+
fn test_action_success() {
239+
let mut executor = Context::default().create_basic_executor();
240+
241+
let node = executor.create_node(&format!("test_action_success_{}", line!())).unwrap();
242+
let action_name = format!("test_action_success_{}_action", line!());
243+
let _action_server = node.create_action_server(
244+
&action_name,
245+
fibonacci_action,
246+
).unwrap();
247+
248+
let client = node.create_action_client::<Fibonacci>(&action_name).unwrap();
249+
250+
let order_10_sequence = [1, 1, 2, 3, 5, 8, 13, 21, 34, 55];
251+
252+
let request = client.request_goal(Fibonacci_Goal {
253+
order: 10,
254+
});
255+
256+
let promise = executor.commands().run(async move {
257+
let mut goal_client_stream = request.await.unwrap().stream();
258+
let mut expected_feedback_len = 0;
259+
while let Some(event) = goal_client_stream.next().await {
260+
match event {
261+
GoalEvent::Feedback(feedback) => {
262+
expected_feedback_len += 1;
263+
assert_eq!(feedback.sequence.len(), expected_feedback_len);
264+
}
265+
GoalEvent::Status(s) => {
266+
assert!(
267+
matches!(s.code, GoalStatusCode::Unknown | GoalStatusCode::Executing | GoalStatusCode::Succeeded),
268+
"Actual code: {:?}",
269+
s.code,
270+
);
271+
}
272+
GoalEvent::Result((status, result)) => {
273+
assert_eq!(status, GoalStatusCode::Succeeded);
274+
assert_eq!(result.sequence, order_10_sequence);
275+
return;
276+
}
277+
}
278+
}
279+
});
280+
281+
executor.spin(SpinOptions::default().until_promise_resolved(promise));
282+
283+
let request = client.request_goal(Fibonacci_Goal {
284+
order: 10,
285+
});
286+
287+
let promise = executor.commands().run(async move {
288+
let (status, result) = request.await.unwrap().result.await;
289+
assert_eq!(status, GoalStatusCode::Succeeded);
290+
assert_eq!(result.sequence, order_10_sequence);
291+
});
292+
293+
executor.spin(SpinOptions::default().until_promise_resolved(promise));
294+
}
295+
296+
async fn fibonacci_action(handle: RequestedGoal<Fibonacci>) -> TerminatedGoal {
297+
let goal_order = handle.goal().order;
298+
if goal_order < 0 {
299+
return handle.reject();
300+
}
301+
302+
let mut result = Fibonacci_Result::default();
303+
304+
let executing = match handle.accept().begin() {
305+
BeginAcceptedGoal::Execute(executing) => executing,
306+
BeginAcceptedGoal::Cancel(cancelling) => {
307+
return cancelling.cancelled_with(result);
308+
}
309+
};
310+
311+
let (sender, mut receiver) = unbounded_channel();
312+
std::thread::spawn(move || {
313+
314+
let mut previous = 0;
315+
let mut current = 1;
316+
317+
for _ in 0..goal_order {
318+
if let Err(_) = sender.send(current) {
319+
// The action has been cancelled early, so just drop this thread.
320+
return;
321+
}
322+
323+
let next = previous + current;
324+
previous = current;
325+
current = next;
326+
std::thread::sleep(Duration::from_micros(10));
327+
}
328+
});
329+
330+
let mut sequence = Vec::new();
331+
loop {
332+
match executing.unless_cancel_requested(receiver.recv()).await {
333+
Ok(Some(next)) => {
334+
// We have a new item in the sequence
335+
sequence.push(next);
336+
executing.publish_feedback(
337+
Fibonacci_Feedback {
338+
sequence: sequence.clone(),
339+
}
340+
);
341+
}
342+
Ok(None) => {
343+
// The sequence has finished
344+
result.sequence = sequence;
345+
return executing.succeeded_with(result);
346+
}
347+
Err(_) => {
348+
// The action has been cancelled
349+
result.sequence = sequence;
350+
return executing.begin_cancelling().cancelled_with(result);
351+
}
352+
}
353+
}
354+
}
355+
}

0 commit comments

Comments
 (0)