@@ -21,6 +21,10 @@ use crate::{
21
21
GuardCondition , ExecutorWorkerOptions ,
22
22
} ;
23
23
24
+ static FAILED_TO_SEND_WORKER : & ' static str =
25
+ "Failed to send the new runner. This should never happen. \
26
+ Please report this to the rclrs maintainers with a minimal reproducible example.";
27
+
24
28
/// The implementation of this runtime is based off of the async Rust reference book:
25
29
/// https://rust-lang.github.io/async-book/02_execution/04_executor.html
26
30
///
@@ -52,7 +56,14 @@ impl AllGuardConditions {
52
56
fn trigger ( & self ) {
53
57
self . inner . lock ( ) . unwrap ( ) . retain ( |guard_condition| {
54
58
if let Some ( guard_condition) = guard_condition. upgrade ( ) {
55
- guard_condition. trigger ( ) ;
59
+ if let Err ( err) = guard_condition. trigger ( ) {
60
+ log_fatal ! (
61
+ "rclrs.executor.basic_executor" ,
62
+ "Failed to trigger a guard condition. This should never happen. \
63
+ Please report this to the rclrs maintainers with a minimal reproducible example. \
64
+ Error: {err}",
65
+ ) ;
66
+ }
56
67
true
57
68
} else {
58
69
false
@@ -61,7 +72,13 @@ impl AllGuardConditions {
61
72
}
62
73
63
74
fn push ( & self , guard_condition : Weak < GuardCondition > ) {
64
- self . inner . lock ( ) . unwrap ( ) . push ( guard_condition) ;
75
+ let mut inner = self . inner . lock ( ) . unwrap ( ) ;
76
+ if inner. iter ( ) . find ( |other| guard_condition. ptr_eq ( other) ) . is_some ( ) {
77
+ // This guard condition is already known
78
+ return ;
79
+ }
80
+
81
+ inner. push ( guard_condition) ;
65
82
}
66
83
}
67
84
@@ -81,11 +98,20 @@ impl ExecutorRuntime for BasicExecutorRuntime {
81
98
// is returned to self. Therefore we create this blocking channel to
82
99
// prevent the function from returning until the WaitSetRunner has been
83
100
// re-obtained.
84
- let ( worker_sender , worker_receiver ) = channel ( ) ;
101
+ let ( worker_result_sender , worker_result_receiver ) = channel ( ) ;
85
102
86
103
// Use this atomic bool to recognize when we should stop spinning.
87
104
let workers_finished = Arc :: new ( AtomicBool :: new ( false ) ) ;
88
105
106
+ for runner in self . wait_set_runners . drain ( ..) {
107
+ if let Err ( err) = self . new_worker_sender . unbounded_send ( runner) {
108
+ log_fatal ! (
109
+ "rclrs.executor.basic_executor" ,
110
+ "{FAILED_TO_SEND_WORKER} Error: {err}" ,
111
+ ) ;
112
+ }
113
+ }
114
+
89
115
// Use this to terminate the spinning once the wait set is finished.
90
116
let workers_finished_clone = Arc :: clone ( & workers_finished) ;
91
117
self . task_sender . add_async_task ( Box :: pin ( async move {
@@ -95,7 +121,14 @@ impl ExecutorRuntime for BasicExecutorRuntime {
95
121
conditions,
96
122
) . await ;
97
123
98
- worker_sender. send ( workers) ;
124
+ if let Err ( err) = worker_result_sender. send ( workers) {
125
+ log_fatal ! (
126
+ "rclrs.executor.basic_executor" ,
127
+ "Failed to send a runner result. This should never happen. \
128
+ Please report this to the rclrs maintainers with a minimal \
129
+ reproducible example. Error: {err}",
130
+ ) ;
131
+ }
99
132
workers_finished_clone. store ( true , Ordering :: Release ) ;
100
133
} ) ) ;
101
134
@@ -116,7 +149,7 @@ impl ExecutorRuntime for BasicExecutorRuntime {
116
149
}
117
150
}
118
151
119
- let ( runners, new_worker_receiver, errors) = worker_receiver . recv ( ) . expect (
152
+ let ( runners, new_worker_receiver, errors) = worker_result_receiver . recv ( ) . expect (
120
153
"Basic executor failed to receive the WaitSetRunner at the end of its spinning. \
121
154
This is a critical bug in rclrs. \
122
155
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
@@ -190,7 +223,7 @@ impl BasicExecutorRuntime {
190
223
// TODO(@mxgrey): We should change this to a log when logging
191
224
// becomes available.
192
225
log_warn ! (
193
- "basic_executor" ,
226
+ "rclrs.executor. basic_executor" ,
194
227
"Sender for SpinOptions::until_promise_resolved was \
195
228
dropped, so the Promise will never be fulfilled. \
196
229
Spinning will stop now. Error message: {err}"
@@ -246,7 +279,14 @@ impl ExecutorChannel for BasicExecutorChannel {
246
279
) -> Arc < dyn WorkerChannel > {
247
280
let runner = WaitSetRunner :: new ( options) ;
248
281
let waitable_sender = runner. sender ( ) ;
249
- self . new_worker_sender . unbounded_send ( runner) ;
282
+
283
+ if let Err ( err) = self . new_worker_sender . unbounded_send ( runner) {
284
+ log_fatal ! (
285
+ "rclrs.executor.basic_executor" ,
286
+ "{FAILED_TO_SEND_WORKER} Error: {err}" ,
287
+ ) ;
288
+ }
289
+
250
290
Arc :: new ( BasicWorkerChannel {
251
291
waitable_sender,
252
292
task_sender : self . task_sender . clone ( ) ,
@@ -321,6 +361,26 @@ async fn manage_workers(
321
361
let mut finished_runners: Vec < WaitSetRunner > = Vec :: new ( ) ;
322
362
let mut errors: Vec < RclrsError > = Vec :: new ( ) ;
323
363
364
+ let add_runner = |
365
+ new_runner : Option < WaitSetRunner > ,
366
+ active_runners : & mut Vec < _ > ,
367
+ finished_runners : & mut Vec < _ > ,
368
+ | {
369
+ if let Some ( runner) = new_runner {
370
+ all_guard_conditions. push( Arc :: downgrade( runner. guard_condition( ) ) ) ;
371
+ if conditions. halt_spinning. load( Ordering :: Acquire ) {
372
+ finished_runners. push ( runner) ;
373
+ } else {
374
+ active_runners. push ( runner. run ( conditions. clone ( ) ) ) ;
375
+ }
376
+ }
377
+ } ;
378
+
379
+ // We expect to start with at least one worker
380
+ let ( initial_worker, new_worker_receiver) = new_workers. await ;
381
+ new_workers = new_worker_receiver. into_future ( ) ;
382
+ add_runner ( initial_worker, & mut active_runners, & mut finished_runners) ;
383
+
324
384
while !active_runners. is_empty ( ) {
325
385
let next_event = select (
326
386
select_all ( active_runners) ,
@@ -341,7 +401,7 @@ async fn manage_workers(
341
401
}
342
402
Err ( _) => {
343
403
log_fatal ! (
344
- "basic_executor" ,
404
+ "rclrs. basic_executor" ,
345
405
"WaitSetRunner unexpectedly dropped. This should never happen. \
346
406
Please report this to the rclrs maintainers with a minimal \
347
407
reproducible example.",
@@ -353,21 +413,12 @@ async fn manage_workers(
353
413
new_workers = new_worker_stream;
354
414
}
355
415
Either :: Right ( (
356
- ( new_worker, new_worker_stream ) ,
416
+ ( new_worker, new_worker_receiver ) ,
357
417
remaining_workers,
358
418
) ) => {
359
419
active_runners = remaining_workers. into_inner ( ) ;
360
-
361
- if let Some ( runner) = new_worker {
362
- all_guard_conditions. push ( Arc :: downgrade ( runner. guard_condition ( ) ) ) ;
363
- if conditions. halt_spinning . load ( Ordering :: Acquire ) {
364
- finished_runners. push ( runner) ;
365
- } else {
366
- active_runners. push ( runner. run ( conditions. clone ( ) ) ) ;
367
- }
368
- }
369
-
370
- new_workers = new_worker_stream. into_future ( ) ;
420
+ add_runner ( new_worker, & mut active_runners, & mut finished_runners) ;
421
+ new_workers = new_worker_receiver. into_future ( ) ;
371
422
}
372
423
}
373
424
} ;
0 commit comments