11use futures:: {
2- channel:: { mpsc:: { UnboundedSender , UnboundedReceiver , unbounded} , oneshot} ,
3- future:: { BoxFuture , select, select_all, Either } ,
4- task:: { waker_ref, ArcWake } ,
2+ channel:: {
3+ mpsc:: { unbounded, UnboundedReceiver , UnboundedSender } ,
4+ oneshot,
5+ } ,
6+ future:: { select, select_all, BoxFuture , Either } ,
57 stream:: StreamFuture ,
8+ task:: { waker_ref, ArcWake } ,
69 StreamExt ,
710} ;
811use std:: {
@@ -16,9 +19,9 @@ use std::{
1619} ;
1720
1821use crate :: {
19- WeakActivityListener , ExecutorChannel , ExecutorRuntime , SpinConditions , WorkerChannel ,
20- RclrsError , WaitSetRunner , WaitSetRunConditions , Waitable , log_warn , log_debug , log_fatal ,
21- GuardCondition , ExecutorWorkerOptions , PayloadTask ,
22+ log_debug , log_fatal , log_warn , ExecutorChannel , ExecutorRuntime , ExecutorWorkerOptions ,
23+ GuardCondition , PayloadTask , RclrsError , SpinConditions , WaitSetRunConditions , WaitSetRunner ,
24+ Waitable , WeakActivityListener , WorkerChannel ,
2225} ;
2326
2427static FAILED_TO_SEND_WORKER : & ' static str =
@@ -73,7 +76,11 @@ impl AllGuardConditions {
7376
7477 fn push ( & self , guard_condition : Weak < GuardCondition > ) {
7578 let mut inner = self . inner . lock ( ) . unwrap ( ) ;
76- if inner. iter ( ) . find ( |other| guard_condition. ptr_eq ( other) ) . is_some ( ) {
79+ if inner
80+ . iter ( )
81+ . find ( |other| guard_condition. ptr_eq ( other) )
82+ . is_some ( )
83+ {
7784 // This guard condition is already known
7885 return ;
7986 }
@@ -115,11 +122,7 @@ impl ExecutorRuntime for BasicExecutorRuntime {
115122 // Use this to terminate the spinning once the wait set is finished.
116123 let workers_finished_clone = Arc :: clone ( & workers_finished) ;
117124 self . task_sender . add_async_task ( Box :: pin ( async move {
118- let workers = manage_workers (
119- new_workers,
120- all_guard_conditions,
121- conditions,
122- ) . await ;
125+ let workers = manage_workers ( new_workers, all_guard_conditions, conditions) . await ;
123126
124127 if let Err ( err) = worker_result_sender. send ( workers) {
125128 log_fatal ! (
@@ -271,10 +274,7 @@ struct BasicExecutorChannel {
271274}
272275
273276impl ExecutorChannel for BasicExecutorChannel {
274- fn create_worker (
275- & self ,
276- options : ExecutorWorkerOptions ,
277- ) -> Arc < dyn WorkerChannel > {
277+ fn create_worker ( & self , options : ExecutorWorkerOptions ) -> Arc < dyn WorkerChannel > {
278278 let runner = WaitSetRunner :: new ( options) ;
279279 let waitable_sender = runner. waitable_sender ( ) ;
280280 let payload_task_sender = runner. payload_task_sender ( ) ;
@@ -387,16 +387,19 @@ async fn manage_workers(
387387 mut new_workers : StreamFuture < UnboundedReceiver < WaitSetRunner > > ,
388388 all_guard_conditions : AllGuardConditions ,
389389 conditions : WaitSetRunConditions ,
390- ) -> ( Vec < WaitSetRunner > , StreamFuture < UnboundedReceiver < WaitSetRunner > > , Vec < RclrsError > ) {
391- let mut active_runners: Vec < oneshot:: Receiver < ( WaitSetRunner , Result < ( ) , RclrsError > ) > > = Vec :: new ( ) ;
390+ ) -> (
391+ Vec < WaitSetRunner > ,
392+ StreamFuture < UnboundedReceiver < WaitSetRunner > > ,
393+ Vec < RclrsError > ,
394+ ) {
395+ let mut active_runners: Vec < oneshot:: Receiver < ( WaitSetRunner , Result < ( ) , RclrsError > ) > > =
396+ Vec :: new ( ) ;
392397 let mut finished_runners: Vec < WaitSetRunner > = Vec :: new ( ) ;
393398 let mut errors: Vec < RclrsError > = Vec :: new ( ) ;
394399
395- let add_runner = |
396- new_runner : Option < WaitSetRunner > ,
397- active_runners : & mut Vec < _ > ,
398- finished_runners : & mut Vec < _ > ,
399- | {
400+ let add_runner = |new_runner : Option < WaitSetRunner > ,
401+ active_runners : & mut Vec < _ > ,
402+ finished_runners : & mut Vec < _ > | {
400403 if let Some ( runner) = new_runner {
401404 all_guard_conditions. push ( Arc :: downgrade ( runner. guard_condition ( ) ) ) ;
402405 if conditions. halt_spinning . load ( Ordering :: Acquire ) {
@@ -413,16 +416,10 @@ async fn manage_workers(
413416 add_runner ( initial_worker, & mut active_runners, & mut finished_runners) ;
414417
415418 while !active_runners. is_empty ( ) {
416- let next_event = select (
417- select_all ( active_runners) ,
418- new_workers,
419- ) ;
419+ let next_event = select ( select_all ( active_runners) , new_workers) ;
420420
421421 match next_event. await {
422- Either :: Left ( (
423- ( finished_worker, _, remaining_workers) ,
424- new_worker_stream,
425- ) ) => {
422+ Either :: Left ( ( ( finished_worker, _, remaining_workers) , new_worker_stream) ) => {
426423 match finished_worker {
427424 Ok ( ( runner, result) ) => {
428425 finished_runners. push ( runner) ;
@@ -443,16 +440,13 @@ async fn manage_workers(
443440 active_runners = remaining_workers;
444441 new_workers = new_worker_stream;
445442 }
446- Either :: Right ( (
447- ( new_worker, new_worker_receiver) ,
448- remaining_workers,
449- ) ) => {
443+ Either :: Right ( ( ( new_worker, new_worker_receiver) , remaining_workers) ) => {
450444 active_runners = remaining_workers. into_inner ( ) ;
451445 add_runner ( new_worker, & mut active_runners, & mut finished_runners) ;
452446 new_workers = new_worker_receiver. into_future ( ) ;
453447 }
454448 }
455- } ;
449+ }
456450
457451 ( finished_runners, new_workers, errors)
458452}
0 commit comments