Skip to content

Commit 837f248

Browse files
committed
Squash async worker changes
Signed-off-by: Michael X. Grey <[email protected]> Signed-off-by: Michael X. Grey <[email protected]>
1 parent acec981 commit 837f248

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+5932
-2349
lines changed

examples/minimal_client_service/src/minimal_client.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,27 @@ fn main() -> Result<(), Error> {
88

99
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
1010

11-
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
12-
1311
println!("Starting client");
1412

1513
while !client.service_is_ready()? {
1614
std::thread::sleep(std::time::Duration::from_millis(10));
1715
}
1816

19-
client.async_send_request_with_callback(
20-
&request,
21-
move |response: example_interfaces::srv::AddTwoInts_Response| {
22-
println!(
23-
"Result of {} + {} is: {}",
24-
request.a, request.b, response.sum
25-
);
26-
},
27-
)?;
17+
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
18+
19+
let response: Promise<example_interfaces::srv::AddTwoInts_Response> = client.call(&request).unwrap();
2820

29-
std::thread::sleep(std::time::Duration::from_millis(500));
21+
let promise = executor.commands().run(async move {
22+
let response = response.await.unwrap();
23+
println!(
24+
"Result of {} + {} is: {}",
25+
request.a, request.b, response.sum,
26+
);
27+
});
3028

3129
println!("Waiting for response");
3230
executor
33-
.spin(SpinOptions::default())
34-
.first_error()
35-
.map_err(|err| err.into())
31+
.spin(SpinOptions::new().until_promise_resolved(promise))
32+
.first_error()?;
33+
Ok(())
3634
}

examples/minimal_client_service/src/minimal_client_async.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@ async fn main() -> Result<(), Error> {
1717

1818
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
1919

20-
let future = client.call_async(&request);
20+
let promise = client.call_then(
21+
&request,
22+
move |response: example_interfaces::srv::AddTwoInts_Response| {
23+
println!(
24+
"Result of {} + {} is: {}",
25+
request.a, request.b, response.sum,
26+
);
27+
}
28+
).unwrap();
2129

2230
println!("Waiting for response");
23-
24-
let rclrs_spin = tokio::task::spawn_blocking(move || executor.spin(SpinOptions::default()));
25-
26-
let response = future.await?;
27-
println!(
28-
"Result of {} + {} is: {}",
29-
request.a, request.b, response.sum
30-
);
31-
32-
rclrs_spin.await.ok();
31+
executor
32+
.spin(SpinOptions::new().until_promise_resolved(promise))
33+
.first_error()?;
3334
Ok(())
3435
}

examples/minimal_client_service/src/minimal_service.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@ use anyhow::{Error, Result};
22
use rclrs::*;
33

44
fn handle_service(
5-
_request_header: &rclrs::rmw_request_id_t,
65
request: example_interfaces::srv::AddTwoInts_Request,
6+
info: ServiceInfo,
77
) -> example_interfaces::srv::AddTwoInts_Response {
8-
println!("request: {} + {}", request.a, request.b);
8+
let timestamp = info
9+
.received_timestamp
10+
.map(|t| format!(" at [{t:?}]"))
11+
.unwrap_or(String::new());
12+
13+
println!("request{timestamp}: {} + {}", request.a, request.b);
914
example_interfaces::srv::AddTwoInts_Response {
1015
sum: request.a + request.b,
1116
}

examples/minimal_pub_sub/src/minimal_subscriber.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ fn main() -> Result<(), Error> {
77

88
let node = executor.create_node("minimal_subscriber")?;
99

10-
let mut num_messages: usize = 0;
11-
12-
let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
10+
let worker = node.create_worker::<usize>(0);
11+
let _subscription = worker.create_subscription::<std_msgs::msg::String, _>(
1312
"topic",
14-
move |msg: std_msgs::msg::String| {
15-
num_messages += 1;
13+
move |num_messages: &mut usize, msg: std_msgs::msg::String| {
14+
*num_messages += 1;
1615
println!("I heard: '{}'", msg.data);
17-
println!("(Got {} messages so far)", num_messages);
16+
println!("(Got {} messages so far)", *num_messages);
1817
},
1918
)?;
2019

20+
println!("Waiting for messages...");
2121
executor
2222
.spin(SpinOptions::default())
23-
.first_error()
24-
.map_err(|err| err.into())
23+
.first_error()?;
24+
Ok(())
2525
}
Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,43 @@
11
use rclrs::*;
2-
use std::sync::{
3-
atomic::{AtomicU32, Ordering},
4-
Arc, Mutex,
5-
};
2+
use std::sync::Arc;
63

74
use anyhow::{Error, Result};
85

9-
struct MinimalSubscriber {
10-
num_messages: AtomicU32,
6+
struct MinimalSubscriberNode {
7+
#[allow(unused)]
8+
subscription: WorkerSubscription<std_msgs::msg::String, SubscriptionData>,
9+
}
10+
11+
struct SubscriptionData {
1112
node: Node,
12-
subscription: Mutex<Option<Subscription<std_msgs::msg::String>>>,
13+
num_messages: usize,
1314
}
1415

15-
impl MinimalSubscriber {
16-
pub fn new(executor: &Executor, name: &str, topic: &str) -> Result<Arc<Self>, RclrsError> {
16+
impl MinimalSubscriberNode {
17+
pub fn new(executor: &Executor, name: &str, topic: &str) -> Result<Self, RclrsError> {
1718
let node = executor.create_node(name)?;
18-
let minimal_subscriber = Arc::new(MinimalSubscriber {
19-
num_messages: 0.into(),
20-
node,
21-
subscription: None.into(),
22-
});
23-
24-
let minimal_subscriber_aux = Arc::clone(&minimal_subscriber);
25-
let subscription = minimal_subscriber
26-
.node
27-
.create_subscription::<std_msgs::msg::String, _>(
28-
topic,
29-
move |msg: std_msgs::msg::String| {
30-
minimal_subscriber_aux.callback(msg);
31-
},
32-
)?;
33-
*minimal_subscriber.subscription.lock().unwrap() = Some(subscription);
34-
Ok(minimal_subscriber)
35-
}
3619

37-
fn callback(&self, msg: std_msgs::msg::String) {
38-
self.num_messages.fetch_add(1, Ordering::SeqCst);
39-
println!("[{}] I heard: '{}'", self.node.name(), msg.data);
40-
println!(
41-
"[{}] (Got {} messages so far)",
42-
self.node.name(),
43-
self.num_messages.load(Ordering::SeqCst)
20+
let worker = node.create_worker::<SubscriptionData>(
21+
SubscriptionData {
22+
node: Arc::clone(&node),
23+
num_messages: 0,
24+
}
4425
);
26+
27+
let subscription = worker.create_subscription(
28+
topic,
29+
|data: &mut SubscriptionData, msg: std_msgs::msg::String| {
30+
data.num_messages += 1;
31+
println!("[{}] I heard: '{}'", data.node.name(), msg.data);
32+
println!(
33+
"[{}] (Got {} messages so far)",
34+
data.node.name(),
35+
data.num_messages,
36+
);
37+
}
38+
)?;
39+
40+
Ok(MinimalSubscriberNode { subscription })
4541
}
4642
}
4743

@@ -50,13 +46,15 @@ fn main() -> Result<(), Error> {
5046
let publisher_node = executor.create_node("minimal_publisher")?;
5147

5248
let _subscriber_node_one =
53-
MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?;
49+
MinimalSubscriberNode::new(&executor, "minimal_subscriber_one", "topic")?;
5450
let _subscriber_node_two =
55-
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;
51+
MinimalSubscriberNode::new(&executor, "minimal_subscriber_two", "topic")?;
5652

5753
let publisher = publisher_node.create_publisher::<std_msgs::msg::String>("topic")?;
5854

59-
std::thread::spawn(move || -> Result<(), RclrsError> {
55+
// TODO(@mxgrey): Replace this with a timer once we have the Timer feature
56+
// merged in.
57+
std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
6058
let mut message = std_msgs::msg::String::default();
6159
let mut publish_count: u32 = 1;
6260
loop {
@@ -69,7 +67,8 @@ fn main() -> Result<(), Error> {
6967
});
7068

7169
executor
72-
.spin(SpinOptions::default())
73-
.first_error()
74-
.map_err(|err| err.into())
70+
.spin(rclrs::SpinOptions::default())
71+
.first_error()?;
72+
Ok(())
7573
}
74+

examples/minimal_pub_sub/src/zero_copy_subscriber.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,18 @@ fn main() -> Result<(), Error> {
66

77
let node = executor.create_node("minimal_subscriber")?;
88

9-
let mut num_messages: usize = 0;
10-
11-
let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
9+
let worker = node.create_worker::<usize>(0);
10+
let _subscription = worker.create_subscription::<std_msgs::msg::UInt32, _>(
1211
"topic",
13-
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
14-
num_messages += 1;
12+
move |num_messages: &mut usize, msg: ReadOnlyLoanedMessage<std_msgs::msg::UInt32>| {
13+
*num_messages += 1;
1514
println!("I heard: '{}'", msg.data);
16-
println!("(Got {} messages so far)", num_messages);
15+
println!("(Got {} messages so far)", *num_messages);
1716
},
1817
)?;
1918

2019
executor
2120
.spin(SpinOptions::default())
22-
.first_error()
23-
.map_err(|err| err.into())
21+
.first_error()?;
22+
Ok(())
2423
}

examples/rust_pubsub/src/simple_publisher.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ use rclrs::*;
22
use std::{thread, time::Duration};
33
use std_msgs::msg::String as StringMsg;
44

5-
struct SimplePublisher {
5+
struct SimplePublisherNode {
66
publisher: Publisher<StringMsg>,
77
}
88

9-
impl SimplePublisher {
9+
impl SimplePublisherNode {
1010
fn new(executor: &Executor) -> Result<Self, RclrsError> {
1111
let node = executor.create_node("simple_publisher").unwrap();
12-
let publisher = node.create_publisher("publish_hello").unwrap();
12+
let publisher = node
13+
.create_publisher("publish_hello")
14+
.unwrap();
1315
Ok(Self { publisher })
1416
}
1517

@@ -24,11 +26,14 @@ impl SimplePublisher {
2426

2527
fn main() -> Result<(), RclrsError> {
2628
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
27-
let publisher = SimplePublisher::new(&executor).unwrap();
29+
let node = SimplePublisherNode::new(&executor).unwrap();
2830
let mut count: i32 = 0;
31+
32+
// TODO(@mxgrey): Replace this with a timer once the Timer feature
33+
// is merged.
2934
thread::spawn(move || loop {
3035
thread::sleep(Duration::from_millis(1000));
31-
count = publisher.publish_data(count).unwrap();
36+
count = node.publish_data(count).unwrap();
3237
});
3338
executor.spin(SpinOptions::default()).first_error()
3439
}

examples/rust_pubsub/src/simple_subscriber.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use std::{
77
use std_msgs::msg::String as StringMsg;
88

99
pub struct SimpleSubscriptionNode {
10-
_subscriber: Subscription<StringMsg>,
10+
#[allow(unused)]
11+
subscriber: Subscription<StringMsg>,
1112
data: Arc<Mutex<Option<StringMsg>>>,
1213
}
1314

@@ -16,12 +17,15 @@ impl SimpleSubscriptionNode {
1617
let node = executor.create_node("simple_subscription").unwrap();
1718
let data: Arc<Mutex<Option<StringMsg>>> = Arc::new(Mutex::new(None));
1819
let data_mut: Arc<Mutex<Option<StringMsg>>> = Arc::clone(&data);
19-
let _subscriber = node
20-
.create_subscription::<StringMsg, _>("publish_hello", move |msg: StringMsg| {
21-
*data_mut.lock().unwrap() = Some(msg);
22-
})
20+
let subscriber = node
21+
.create_subscription::<StringMsg, _>(
22+
"publish_hello",
23+
move |msg: StringMsg| {
24+
*data_mut.lock().unwrap() = Some(msg);
25+
},
26+
)
2327
.unwrap();
24-
Ok(Self { _subscriber, data })
28+
Ok(Self { subscriber, data })
2529
}
2630
fn data_callback(&self) -> Result<(), RclrsError> {
2731
if let Some(data) = self.data.lock().unwrap().as_ref() {
@@ -34,10 +38,13 @@ impl SimpleSubscriptionNode {
3438
}
3539
fn main() -> Result<(), RclrsError> {
3640
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
37-
let subscription = SimpleSubscriptionNode::new(&executor).unwrap();
41+
let node = SimpleSubscriptionNode::new(&executor).unwrap();
42+
43+
// TODO(@mxgrey): Replace this thread with a timer when the Timer feature
44+
// gets merged.
3845
thread::spawn(move || loop {
3946
thread::sleep(Duration::from_millis(1000));
40-
subscription.data_callback().unwrap()
47+
node.data_callback().unwrap()
4148
});
4249
executor.spin(SpinOptions::default()).first_error()
4350
}

examples/worker_demo/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "worker_demo"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
rclrs = "0.4"
8+
std_msgs = "*"

0 commit comments

Comments
 (0)