Skip to content

Commit 1f1d826

Browse files
committed
Refining merge
Signed-off-by: Michael X. Grey <[email protected]>
1 parent e86707e commit 1f1d826

File tree

9 files changed

+107
-88
lines changed

9 files changed

+107
-88
lines changed

rclrs/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::HashMap,
33
ffi::CString,
4-
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
4+
sync::{Arc, Mutex, MutexGuard},
55
};
66

77
use rosidl_runtime_rs::Message;

rclrs/src/executor.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ impl Executor {
4646
/// [`SpinOptions`] can be used to automatically stop the spinning when
4747
/// certain conditions are met. Use `SpinOptions::default()` to allow the
4848
/// Executor to keep spinning indefinitely.
49-
pub fn spin(&mut self, options: SpinOptions) {
49+
pub fn spin(&mut self, options: SpinOptions) -> Result<(), RclrsError> {
5050
let conditions = self.make_spin_conditions(options);
51-
self.runtime.spin(conditions);
51+
self.runtime.spin(conditions)
5252
}
5353

5454
/// Spin the Executor as an async task. This does not block the current thread.
@@ -60,20 +60,23 @@ impl Executor {
6060
/// The async task will run until the [`SpinConditions`] stop the Executor
6161
/// from spinning. The output of the async task will be the restored Executor,
6262
/// which you can use to resume spinning after the task is finished.
63-
pub async fn spin_async(self, options: SpinOptions) -> Self {
63+
pub async fn spin_async(self, options: SpinOptions) -> (Self, Result<(), RclrsError>) {
6464
let conditions = self.make_spin_conditions(options);
6565
let Self {
6666
context,
6767
commands,
6868
runtime,
6969
} = self;
7070

71-
let runtime = runtime.spin_async(conditions).await;
72-
Self {
73-
context,
74-
commands,
75-
runtime,
76-
}
71+
let (runtime, result) = runtime.spin_async(conditions).await;
72+
(
73+
Self {
74+
context,
75+
commands,
76+
runtime,
77+
},
78+
result,
79+
)
7780
}
7881

7982
/// Creates a new executor using the provided runtime. Users of rclrs should
@@ -231,7 +234,7 @@ pub trait ExecutorRuntime: Send {
231234

232235
/// Tell the runtime to spin while blocking any further execution until the
233236
/// spinning is complete.
234-
fn spin(&mut self, conditions: SpinConditions);
237+
fn spin(&mut self, conditions: SpinConditions) -> Result<(), RclrsError>;
235238

236239
/// Tell the runtime to spin asynchronously, not blocking the current
237240
/// thread. The runtime instance will be consumed by this function, but it
@@ -240,7 +243,7 @@ pub trait ExecutorRuntime: Send {
240243
fn spin_async(
241244
self: Box<Self>,
242245
conditions: SpinConditions,
243-
) -> BoxFuture<'static, Box<dyn ExecutorRuntime>>;
246+
) -> BoxFuture<'static, (Box<dyn ExecutorRuntime>, Result<(), RclrsError>)>;
244247
}
245248

246249
/// A bundle of optional conditions that a user may want to impose on how long

rclrs/src/executor/basic_executor.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414

1515
use crate::{
1616
executor::{ExecutorChannel, ExecutorRuntime, SpinConditions},
17-
Context, WaitSetRunner, Waitable,
17+
Context, RclrsError, WaitSetRunner, Waitable,
1818
};
1919

2020
/// The implementation of this runtime is based off of the async Rust reference book:
@@ -40,7 +40,7 @@ pub struct BasicExecutorRuntime {
4040
}
4141

4242
impl ExecutorRuntime for BasicExecutorRuntime {
43-
fn spin(&mut self, mut conditions: SpinConditions) {
43+
fn spin(&mut self, mut conditions: SpinConditions) -> Result<(), RclrsError> {
4444
self.process_spin_conditions(&mut conditions);
4545

4646
let wait_set_runner = self.wait_set_runner.take().expect(
@@ -92,19 +92,20 @@ impl ExecutorRuntime for BasicExecutorRuntime {
9292
}
9393
}
9494

95-
self.wait_set_runner = Some(
96-
wait_set_receiver.recv().expect(
97-
"Basic executor failed to receive the WaitSetRunner at the end of its spinning. \
98-
This is a critical bug in rclrs. \
99-
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
100-
)
95+
let (runner, result) = wait_set_receiver.recv().expect(
96+
"Basic executor failed to receive the WaitSetRunner at the end of its spinning. \
97+
This is a critical bug in rclrs. \
98+
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
10199
);
100+
101+
self.wait_set_runner = Some(runner);
102+
result
102103
}
103104

104105
fn spin_async(
105106
mut self: Box<Self>,
106107
conditions: SpinConditions,
107-
) -> BoxFuture<'static, Box<dyn ExecutorRuntime>> {
108+
) -> BoxFuture<'static, (Box<dyn ExecutorRuntime>, Result<(), RclrsError>)> {
108109
let (sender, receiver) = oneshot::channel();
109110
// Create a thread to run the executor. We should not run the executor
110111
// as an async task because it blocks its current thread while running.
@@ -117,8 +118,8 @@ impl ExecutorRuntime for BasicExecutorRuntime {
117118
// executor. But that would probably require us to introduce a new
118119
// dependency such as tokio.
119120
std::thread::spawn(move || {
120-
self.spin(conditions);
121-
sender.send(self as Box<dyn ExecutorRuntime>).ok();
121+
let result = self.spin(conditions);
122+
sender.send((self as Box<dyn ExecutorRuntime>, result)).ok();
122123
});
123124

124125
Box::pin(async move {

rclrs/src/node.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ use rosidl_runtime_rs::Message;
3131
use crate::{
3232
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
3333
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise,
34-
Publisher, PublisherOptions, PublisherState, QoSProfile, RclrsError, Service,
35-
ServiceAsyncCallback, ServiceCallback, ServiceOptions, ServiceState, Subscription,
36-
SubscriptionAsyncCallback, SubscriptionCallback, SubscriptionOptions, SubscriptionState,
37-
TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
34+
Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceAsyncCallback,
35+
ServiceCallback, ServiceOptions, ServiceState, Subscription, SubscriptionAsyncCallback,
36+
SubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams,
37+
ENTITY_LIFECYCLE_MUTEX,
3838
};
3939

4040
/// A processing unit that can communicate with other nodes.

rclrs/src/parameter/service.rs

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use rosidl_runtime_rs::Sequence;
99
use super::ParameterMap;
1010
use crate::{
1111
parameter::{DeclaredValue, ParameterKind, ParameterStorage},
12-
rmw_request_id_t, IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service,
12+
IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service,
1313
};
1414

1515
// The variables only exist to keep a strong reference to the services and are technically unused.
@@ -437,11 +437,13 @@ mod tests {
437437
!not_finished
438438
});
439439

440-
executor.spin(
441-
SpinOptions::new()
442-
.until_promise_resolved(promise)
443-
.timeout(Duration::from_secs(1)),
444-
);
440+
executor
441+
.spin(
442+
SpinOptions::new()
443+
.until_promise_resolved(promise)
444+
.timeout(Duration::from_secs(1)),
445+
)
446+
.unwrap();
445447

446448
Ok(())
447449
}
@@ -453,11 +455,13 @@ mod tests {
453455
client_node.create_client::<ListParameters>("/list/node/list_parameters")?;
454456

455457
// return Ok(());
456-
executor.spin(
457-
SpinOptions::default()
458-
.until_promise_resolved(list_client.notify_on_service_ready())
459-
.timeout(Duration::from_secs(2)),
460-
);
458+
executor
459+
.spin(
460+
SpinOptions::default()
461+
.until_promise_resolved(list_client.notify_on_service_ready())
462+
.timeout(Duration::from_secs(2)),
463+
)
464+
.unwrap();
461465

462466
// List all parameters
463467
let callback_ran = Arc::new(AtomicBool::new(false));
@@ -484,11 +488,13 @@ mod tests {
484488
})
485489
.unwrap();
486490

487-
executor.spin(
488-
SpinOptions::default()
489-
.until_promise_resolved(promise)
490-
.timeout(Duration::from_secs(5)),
491-
);
491+
executor
492+
.spin(
493+
SpinOptions::default()
494+
.until_promise_resolved(promise)
495+
.timeout(Duration::from_secs(5)),
496+
)
497+
.unwrap();
492498
assert!(callback_ran.load(Ordering::Acquire));
493499

494500
// Limit depth, namespaced parameter is not returned
@@ -508,7 +514,9 @@ mod tests {
508514
})
509515
.unwrap();
510516

511-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
517+
executor
518+
.spin(SpinOptions::default().until_promise_resolved(promise))
519+
.unwrap();
512520
assert!(callback_ran.load(Ordering::Acquire));
513521

514522
// Filter by prefix, just return the requested one with the right prefix
@@ -529,7 +537,9 @@ mod tests {
529537
})
530538
.unwrap();
531539

532-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
540+
executor
541+
.spin(SpinOptions::default().until_promise_resolved(promise))
542+
.unwrap();
533543
assert!(callback_ran.load(Ordering::Acquire));
534544

535545
// If prefix is equal to names, parameters should be returned
@@ -550,7 +560,9 @@ mod tests {
550560
})
551561
.unwrap();
552562

553-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
563+
executor
564+
.spin(SpinOptions::default().until_promise_resolved(promise))
565+
.unwrap();
554566
assert!(callback_ran.load(Ordering::Acquire));
555567

556568
Ok(())
@@ -578,7 +590,9 @@ mod tests {
578590
let clients_ready = client_node
579591
.notify_on_graph_change_with_period(Duration::from_millis(1), clients_ready_condition);
580592

581-
executor.spin(SpinOptions::default().until_promise_resolved(clients_ready));
593+
executor
594+
.spin(SpinOptions::default().until_promise_resolved(clients_ready))
595+
.unwrap();
582596

583597
// Get an existing parameter
584598
let callback_ran = Arc::new(AtomicBool::new(false));
@@ -596,7 +610,9 @@ mod tests {
596610
})
597611
.unwrap();
598612

599-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
613+
executor
614+
.spin(SpinOptions::default().until_promise_resolved(promise))
615+
.unwrap();
600616
assert!(callback_ran.load(Ordering::Acquire));
601617

602618
// Getting both existing and non existing parameters, missing one should return
@@ -617,7 +633,9 @@ mod tests {
617633
})
618634
.unwrap();
619635

620-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
636+
executor
637+
.spin(SpinOptions::default().until_promise_resolved(promise))
638+
.unwrap();
621639
assert!(callback_ran.load(Ordering::Acquire));
622640

623641
// Set a mix of existing, non existing, dynamic and out of range parameters
@@ -717,7 +735,9 @@ mod tests {
717735
})
718736
.unwrap();
719737

720-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
738+
executor
739+
.spin(SpinOptions::default().until_promise_resolved(promise))
740+
.unwrap();
721741
assert!(callback_ran.load(Ordering::Acquire));
722742

723743
// Set the node to use undeclared parameters and try to set one
@@ -746,7 +766,9 @@ mod tests {
746766
})
747767
.unwrap();
748768

749-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
769+
executor
770+
.spin(SpinOptions::default().until_promise_resolved(promise))
771+
.unwrap();
750772
assert!(callback_ran.load(Ordering::Acquire));
751773

752774
// With set_parameters_atomically, if one fails all should fail
@@ -765,7 +787,9 @@ mod tests {
765787
)
766788
.unwrap();
767789

768-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
790+
executor
791+
.spin(SpinOptions::default().until_promise_resolved(promise))
792+
.unwrap();
769793
assert!(callback_ran.load(Ordering::Acquire));
770794

771795
Ok(())
@@ -789,7 +813,9 @@ mod tests {
789813
let promise = client_node
790814
.notify_on_graph_change_with_period(Duration::from_millis(1), clients_ready_condition);
791815

792-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
816+
executor
817+
.spin(SpinOptions::default().until_promise_resolved(promise))
818+
.unwrap();
793819

794820
// Describe all parameters
795821
let request = DescribeParameters_Request {
@@ -836,7 +862,9 @@ mod tests {
836862
})
837863
.unwrap();
838864

839-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
865+
executor
866+
.spin(SpinOptions::default().until_promise_resolved(promise))
867+
.unwrap();
840868
assert!(callback_ran.load(Ordering::Acquire));
841869

842870
// If a describe parameters request is sent with a non existing parameter, an empty
@@ -860,7 +888,9 @@ mod tests {
860888
})
861889
.unwrap();
862890

863-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
891+
executor
892+
.spin(SpinOptions::default().until_promise_resolved(promise))
893+
.unwrap();
864894
assert!(callback_ran.load(Ordering::Acquire));
865895

866896
// Get all parameter types, including a non existing one that will be NOT_SET
@@ -888,7 +918,9 @@ mod tests {
888918
})
889919
.unwrap();
890920

891-
executor.spin(SpinOptions::default().until_promise_resolved(promise));
921+
executor
922+
.spin(SpinOptions::default().until_promise_resolved(promise))
923+
.unwrap();
892924
assert!(callback_ran.load(Ordering::Acquire));
893925

894926
Ok(())

rclrs/src/service.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use std::{
22
boxed::Box,
33
ffi::{CStr, CString},
4-
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
4+
sync::{Arc, Mutex, MutexGuard},
55
};
66

7-
use rosidl_runtime_rs::Message;
8-
97
use crate::{
108
error::ToResult, rcl_bindings::*, ExecutorCommands, IntoPrimitiveOptions, NodeHandle,
119
QoSProfile, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable,

rclrs/src/test_helpers/graph_helpers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::{Context, IntoNodeOptions, Node, RclrsError};
2-
use std::sync::Arc;
32

43
pub(crate) struct TestGraph {
54
pub node1: Node,

rclrs/src/wait_set.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,9 @@ mod tests {
270270
let start = std::time::Instant::now();
271271
// This should stop spinning right away because the guard condition was
272272
// already triggered.
273-
executor.spin(SpinOptions::spin_once().timeout(Duration::from_secs(10)));
273+
executor
274+
.spin(SpinOptions::spin_once().timeout(Duration::from_secs(10)))
275+
.unwrap();
274276

275277
// If it took more than a second to finish spinning then something is
276278
// probably wrong.

0 commit comments

Comments
 (0)