Skip to content

Commit e86707e

Browse files
committed
Merge in shared state pattern
Signed-off-by: Michael X. Grey <[email protected]>
2 parents 2a7ff60 + 2cff0b7 commit e86707e

File tree

16 files changed

+163
-88
lines changed

16 files changed

+163
-88
lines changed

examples/minimal_pub_sub/src/minimal_two_nodes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use anyhow::{Error, Result};
77

88
struct MinimalSubscriber {
99
num_messages: AtomicU32,
10-
node: Arc<rclrs::Node>,
11-
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
10+
node: rclrs::Node,
11+
subscription: Mutex<Option<rclrs::Subscription<std_msgs::msg::String>>>,
1212
}
1313

1414
impl MinimalSubscriber {

examples/rust_pubsub/src/simple_publisher.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{sync::Arc, thread, time::Duration};
33
use std_msgs::msg::String as StringMsg;
44

55
struct SimplePublisherNode {
6-
publisher: Arc<Publisher<StringMsg>>,
6+
publisher: Publisher<StringMsg>,
77
}
88

99
impl SimplePublisherNode {
@@ -26,12 +26,11 @@ impl SimplePublisherNode {
2626

2727
fn main() -> Result<(), RclrsError> {
2828
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
29-
let publisher = Arc::new(SimplePublisher::new(&executor).unwrap());
30-
let publisher_other_thread = Arc::clone(&publisher);
29+
let node = Arc::new(SimplePublisher::new(&executor).unwrap());
3130
let mut count: i32 = 0;
3231
thread::spawn(move || loop {
3332
thread::sleep(Duration::from_millis(1000));
34-
count = publisher_other_thread.publish_data(count).unwrap();
33+
count = node.publish_data(count).unwrap();
3534
});
3635
executor.spin(SpinOptions::default())
3736
}

examples/rust_pubsub/src/simple_subscriber.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,10 @@ impl SimpleSubscriptionNode {
3737
}
3838
fn main() -> Result<(), RclrsError> {
3939
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
40-
let subscription = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap());
41-
let subscription_other_thread = Arc::clone(&subscription);
40+
let node = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap());
4241
thread::spawn(move || loop {
4342
thread::sleep(Duration::from_millis(1000));
44-
subscription_other_thread.data_callback().unwrap()
43+
node.data_callback().unwrap()
4544
});
4645
executor.spin(SpinOptions::default())
4746
}

rclrs/src/client.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::HashMap,
33
ffi::CString,
4-
sync::{Arc, Mutex, MutexGuard},
4+
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
55
};
66

77
use rosidl_runtime_rs::Message;
@@ -23,12 +23,25 @@ pub use client_output::*;
2323

2424
/// Main class responsible for sending requests to a ROS service.
2525
///
26-
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
27-
/// ensure that [`Node`][2]s can track all the clients that have been created.
26+
/// Create a client using [`Node::create_client`][1].
2827
///
29-
/// [1]: crate::Node::create_client
30-
/// [2]: crate::Node
31-
pub struct Client<T>
28+
/// Receiving responses requires the node's executor to [spin][2].
29+
///
30+
/// [1]: crate::NodeState::create_client
31+
/// [2]: crate::spin
32+
pub type Client<T> = Arc<ClientState<T>>;
33+
34+
/// The inner state of a [`Client`].
35+
///
36+
/// This is public so that you can choose to create a [`Weak`][1] reference to it
37+
/// if you want to be able to refer to a [`Client`] in a non-owning way. It is
38+
/// generally recommended to manage the `ClientState` inside of an [`Arc`],
39+
/// and [`Client`] is provided as a convenience alias for that.
40+
///
41+
/// The public API of the [`Client`] type is implemented via `ClientState`.
42+
///
43+
/// [1]: std::sync::Weak
44+
pub struct ClientState<T>
3245
where
3346
T: rosidl_runtime_rs::Service,
3447
{
@@ -38,7 +51,7 @@ where
3851
lifecycle: WaitableLifecycle,
3952
}
4053

41-
impl<T> Client<T>
54+
impl<T> ClientState<T>
4255
where
4356
T: rosidl_runtime_rs::Service,
4457
{
@@ -171,7 +184,7 @@ where
171184

172185
/// Creates a new client.
173186
pub(crate) fn create<'a>(
174-
node: &Arc<Node>,
187+
node: &Node,
175188
options: impl Into<ClientOptions<'a>>,
176189
) -> Result<Arc<Self>, RclrsError>
177190
// This uses pub(crate) visibility to avoid instantiating this struct outside
@@ -407,7 +420,7 @@ struct ClientHandle {
407420
rcl_client: Mutex<rcl_client_t>,
408421
/// We store the whole node here because we use some of its user-facing API
409422
/// in some of the Client methods.
410-
node: Arc<Node>,
423+
node: Node,
411424
}
412425

413426
impl ClientHandle {

rclrs/src/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Executor {
3434
pub fn create_node<'a>(
3535
&'a self,
3636
options: impl IntoNodeOptions<'a>,
37-
) -> Result<Arc<Node>, RclrsError> {
37+
) -> Result<Node, RclrsError> {
3838
let options = options.into_node_options();
3939
let node = options.build(&self.commands)?;
4040
Ok(node)
@@ -128,7 +128,7 @@ impl ExecutorCommands {
128128
pub fn create_node<'a>(
129129
self: &Arc<Self>,
130130
options: impl IntoNodeOptions<'a>,
131-
) -> Result<Arc<Node>, RclrsError> {
131+
) -> Result<Node, RclrsError> {
132132
let options = options.into_node_options();
133133
options.build(self)
134134
}

rclrs/src/node.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ use async_std::future::timeout;
2929
use rosidl_runtime_rs::Message;
3030

3131
use crate::{
32-
rcl_bindings::*, Client, ClientOptions, Clock, ContextHandle, ExecutorCommands, LogParams,
33-
Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise, Publisher,
34-
PublisherOptions, QoSProfile, RclrsError, Service, ServiceAsyncCallback, ServiceCallback,
35-
ServiceOptions, Subscription, SubscriptionAsyncCallback, SubscriptionCallback,
36-
SubscriptionOptions, TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
32+
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
33+
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise,
34+
Publisher, PublisherOptions, PublisherState, QoSProfile, RclrsError, Service,
35+
ServiceAsyncCallback, ServiceCallback, ServiceOptions, ServiceState, Subscription,
36+
SubscriptionAsyncCallback, SubscriptionCallback, SubscriptionOptions, SubscriptionState,
37+
TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
3738
};
3839

3940
/// A processing unit that can communicate with other nodes.
@@ -78,7 +79,19 @@ use crate::{
7879
/// [5]: crate::NodeOptions::new
7980
/// [6]: crate::NodeOptions::namespace
8081
/// [7]: crate::Executor::create_node
81-
pub struct Node {
82+
83+
pub type Node = Arc<NodeState>;
84+
85+
/// The inner state of a [`Node`].
86+
///
87+
/// This is public so that you can choose to put it inside a [`Weak`] if you
88+
/// want to be able to refer to a [`Node`] in a non-owning way. It is generally
89+
/// recommended to manage the [`NodeState`] inside of an [`Arc`], and [`Node`]
90+
/// recommended to manage the `NodeState` inside of an [`Arc`], and [`Node`]
91+
/// is provided as convenience alias for that.
92+
///
93+
/// The public API of the [`Node`] type is implemented via `NodeState`.
94+
pub struct NodeState {
8295
time_source: TimeSource,
8396
parameter: ParameterInterface,
8497
logger: Logger,
@@ -132,23 +145,23 @@ impl Drop for NodeHandle {
132145
}
133146
}
134147

135-
impl Eq for Node {}
148+
impl Eq for NodeState {}
136149

137-
impl PartialEq for Node {
150+
impl PartialEq for NodeState {
138151
fn eq(&self, other: &Self) -> bool {
139152
Arc::ptr_eq(&self.handle, &other.handle)
140153
}
141154
}
142155

143-
impl fmt::Debug for Node {
156+
impl fmt::Debug for NodeState {
144157
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
145158
f.debug_struct("Node")
146159
.field("fully_qualified_name", &self.fully_qualified_name())
147160
.finish()
148161
}
149162
}
150163

151-
impl Node {
164+
impl NodeState {
152165
/// Returns the clock associated with this node.
153166
pub fn get_clock(&self) -> Clock {
154167
self.time_source.get_clock()
@@ -206,7 +219,7 @@ impl Node {
206219
/// Returns the fully qualified name of the node.
207220
///
208221
/// The fully qualified name of the node is the node namespace combined with the node name.
209-
/// It is subject to the remappings shown in [`Node::name()`] and [`Node::namespace()`].
222+
/// It is subject to the remappings shown in [`NodeState::name()`] and [`NodeState::namespace()`].
210223
///
211224
/// # Example
212225
/// ```
@@ -261,11 +274,11 @@ impl Node {
261274
pub fn create_client<'a, T>(
262275
self: &Arc<Self>,
263276
options: impl Into<ClientOptions<'a>>,
264-
) -> Result<Arc<Client<T>>, RclrsError>
277+
) -> Result<Client<T>, RclrsError>
265278
where
266279
T: rosidl_runtime_rs::Service,
267280
{
268-
Client::<T>::create(self, options)
281+
ClientState::<T>::create(self, options)
269282
}
270283

271284
/// Creates a [`Publisher`][1].
@@ -305,11 +318,11 @@ impl Node {
305318
pub fn create_publisher<'a, T>(
306319
&self,
307320
options: impl Into<PublisherOptions<'a>>,
308-
) -> Result<Arc<Publisher<T>>, RclrsError>
321+
) -> Result<Publisher<T>, RclrsError>
309322
where
310323
T: Message,
311324
{
312-
Publisher::<T>::create(Arc::clone(&self.handle), options)
325+
PublisherState::<T>::create(Arc::clone(&self.handle), options)
313326
}
314327

315328
/// Creates a [`Service`] with an ordinary callback.
@@ -373,11 +386,11 @@ impl Node {
373386
&self,
374387
options: impl Into<ServiceOptions<'a>>,
375388
callback: impl ServiceCallback<T, Args>,
376-
) -> Result<Arc<Service<T>>, RclrsError>
389+
) -> Result<Service<T>, RclrsError>
377390
where
378391
T: rosidl_runtime_rs::Service,
379392
{
380-
Service::<T>::create(
393+
ServiceState::<T>::create(
381394
options,
382395
callback.into_service_callback(),
383396
&self.handle,
@@ -408,11 +421,11 @@ impl Node {
408421
&self,
409422
options: impl Into<ServiceOptions<'a>>,
410423
callback: impl ServiceAsyncCallback<T, Args>,
411-
) -> Result<Arc<Service<T>>, RclrsError>
424+
) -> Result<Service<T>, RclrsError>
412425
where
413426
T: rosidl_runtime_rs::Service,
414427
{
415-
Service::<T>::create(
428+
ServiceState::<T>::create(
416429
options,
417430
callback.into_service_async_callback(),
418431
&self.handle,
@@ -480,11 +493,11 @@ impl Node {
480493
&self,
481494
options: impl Into<SubscriptionOptions<'a>>,
482495
callback: impl SubscriptionCallback<T, Args>,
483-
) -> Result<Arc<Subscription<T>>, RclrsError>
496+
) -> Result<Subscription<T>, RclrsError>
484497
where
485498
T: Message,
486499
{
487-
Subscription::<T>::create(
500+
SubscriptionState::<T>::create(
488501
options,
489502
callback.into_subscription_callback(),
490503
&self.handle,
@@ -515,11 +528,11 @@ impl Node {
515528
&self,
516529
options: impl Into<SubscriptionOptions<'a>>,
517530
callback: impl SubscriptionAsyncCallback<T, Args>,
518-
) -> Result<Arc<Subscription<T>>, RclrsError>
531+
) -> Result<Subscription<T>, RclrsError>
519532
where
520533
T: Message,
521534
{
522-
Subscription::<T>::create(
535+
SubscriptionState::<T>::create(
523536
options,
524537
callback.into_subscription_async_callback(),
525538
&self.handle,
@@ -689,7 +702,7 @@ impl Node {
689702
}
690703
}
691704

692-
impl<'a> ToLogParams<'a> for &'a Node {
705+
impl<'a> ToLogParams<'a> for &'a NodeState {
693706
fn to_log_params(self) -> LogParams<'a> {
694707
self.logger().to_log_params()
695708
}

rclrs/src/node/graph.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
ffi::{CStr, CString},
44
};
55

6-
use crate::{rcl_bindings::*, Node, RclrsError, ToResult};
6+
use crate::{rcl_bindings::*, NodeState, RclrsError, ToResult};
77

88
impl Drop for rmw_names_and_types_t {
99
fn drop(&mut self) {
@@ -57,7 +57,7 @@ pub struct TopicEndpointInfo {
5757
pub topic_type: String,
5858
}
5959

60-
impl Node {
60+
impl NodeState {
6161
/// Returns a list of topic names and types for publishers associated with a node.
6262
pub fn get_publisher_names_and_types_by_node(
6363
&self,

rclrs/src/node/node_options.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use futures::channel::mpsc::unbounded;
99
use crate::{
1010
node::node_graph_task::{node_graph_task, NodeGraphAction},
1111
rcl_bindings::*,
12-
ClockType, ExecutorCommands, GuardCondition, Logger, Node, NodeHandle, ParameterInterface,
13-
QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX, QOS_PROFILE_CLOCK,
12+
ClockType, ExecutorCommands, GuardCondition, Logger, Node, NodeHandle, NodeState,
13+
ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX,
14+
QOS_PROFILE_CLOCK,
1415
};
1516

1617
/// This trait helps to build [`NodeOptions`] which can be passed into
@@ -289,7 +290,7 @@ impl<'a> NodeOptions<'a> {
289290
///
290291
/// Only used internally. Downstream users should call
291292
/// [`Executor::create_node`].
292-
pub(crate) fn build(self, commands: &Arc<ExecutorCommands>) -> Result<Arc<Node>, RclrsError> {
293+
pub(crate) fn build(self, commands: &Arc<ExecutorCommands>) -> Result<Node, RclrsError> {
293294
let node_name = CString::new(self.name).map_err(|err| RclrsError::StringContainsNul {
294295
err,
295296
s: self.name.to_owned(),
@@ -388,7 +389,7 @@ impl<'a> NodeOptions<'a> {
388389
graph_change_guard_condition,
389390
));
390391

391-
let node = Arc::new(Node {
392+
let node = Arc::new(NodeState {
392393
time_source: TimeSource::builder(self.clock_type)
393394
.clock_qos(self.clock_qos)
394395
.build(),

rclrs/src/parameter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ enum DeclaredValue {
8484
}
8585

8686
/// Builder used to declare a parameter. Obtain this by calling
87-
/// [`crate::Node::declare_parameter`].
87+
/// [`crate::NodeState::declare_parameter`].
8888
#[must_use]
8989
pub struct ParameterBuilder<'a, T: ParameterVariant> {
9090
name: Arc<str>,

0 commit comments

Comments
 (0)