@@ -7,7 +7,7 @@ use tokio::time::sleep;
77use placement:: { ChaosOp , Hostname , LibInfo , LogicalOp , PhysicalOp , PlacementManager } ;
88use reactor_client:: {
99 self ,
10- models:: { RemoteActorInfo , SpawnArgs } ,
10+ models:: { ChaosConfig , ChaosType , RemoteActorInfo , SpawnArgs } ,
1111} ;
1212
1313pub mod placement;
@@ -17,7 +17,7 @@ struct NodeHandle {
1717 client_config : reactor_client:: apis:: configuration:: Configuration ,
1818 actors : Vec < RemoteActorInfo > ,
1919 loaded_libs : Vec < String > ,
20- crash_schedule : Vec < ( RemoteActorInfo , Instant ) > ,
20+ chaos_schedule : Vec < ( RemoteActorInfo , ChaosOp , Instant , Option < Instant > ) > ,
2121}
2222
2323impl NodeHandle {
@@ -52,6 +52,7 @@ impl NodeHandle {
5252 async fn place ( & mut self , logical_op : & LogicalOp , physical_op : & PhysicalOp ) -> RemoteActorInfo {
5353 let mut remote_actor_info = reactor_client:: apis:: default_api:: start_actor (
5454 & self . client_config ,
55+ //? Might need to add choas info here (instead of just payload) -> useful for msg_loss, msg_duplicate chaos ops
5556 SpawnArgs {
5657 actor_name : physical_op. actor_name . clone ( ) ,
5758 operator_name : logical_op. name . clone ( ) ,
@@ -63,11 +64,18 @@ impl NodeHandle {
6364 . unwrap ( ) ;
6465 remote_actor_info. hostname = self . hostname . to_string ( ) ;
6566 self . actors . push ( remote_actor_info. clone ( ) ) ;
66- if let Some ( ChaosOp :: Crash { start_ms } ) = & physical_op. chaos {
67+ for chaos_op in & physical_op. chaos . clone ( ) . unwrap_or_default ( ) {
6768 let now = Instant :: now ( ) ;
68- let delay = Duration :: from_millis ( * start_ms as u64 ) ;
69- self . crash_schedule
70- . push ( ( remote_actor_info. clone ( ) , now + delay) ) ;
69+ let start_time = now + Duration :: from_millis ( chaos_op. start_ms ( ) as u64 ) ;
70+ let stop_time = chaos_op
71+ . stop_ms ( )
72+ . map ( |stop_ms| now + Duration :: from_millis ( stop_ms as u64 ) ) ;
73+ self . chaos_schedule . push ( (
74+ remote_actor_info. clone ( ) ,
75+ chaos_op. clone ( ) ,
76+ start_time,
77+ stop_time,
78+ ) ) ;
7179 }
7280 remote_actor_info
7381 }
@@ -78,22 +86,59 @@ impl NodeHandle {
7886 . unwrap ( ) ;
7987 }
8088
81- async fn schedule_actor_crash ( & self ) {
89+ async fn schedule_actor_chaos ( & self ) {
8290 // also needs some logic on, what if ctrlc pressed prematurely
8391 // and this function keeps running and sends requests to non-existent nodes
84- for ( actor, when) in self . crash_schedule . clone ( ) {
85- // let this = self.clone();
92+ for ( actor, op, start, stop_opt) in self . chaos_schedule . clone ( ) {
8693 let client_config = self . client_config . clone ( ) ;
94+
95+ let chaos_config = match op {
96+ ChaosOp :: Crash { .. } => ChaosConfig {
97+ kind : ChaosType :: Crash ,
98+ actor_name : actor. name . clone ( ) ,
99+ factor : None ,
100+ probability : None ,
101+ } ,
102+
103+ ChaosOp :: MsgLoss { probability, .. } => ChaosConfig {
104+ kind : ChaosType :: MsgLoss ,
105+ actor_name : actor. name . clone ( ) ,
106+ factor : None ,
107+ probability : Some ( probability. into_inner ( ) ) ,
108+ } ,
109+
110+ ChaosOp :: MsgDuplication {
111+ factor,
112+ probability,
113+ ..
114+ } => ChaosConfig {
115+ kind : ChaosType :: MsgDuplication ,
116+ actor_name : actor. name . clone ( ) ,
117+ factor : Some ( factor) ,
118+ probability : Some ( probability. into_inner ( ) ) ,
119+ } ,
120+ } ;
87121 tokio:: spawn ( async move {
88122 let now = Instant :: now ( ) ;
89- if when > now {
90- sleep ( when - now) . await ;
123+ if start > now {
124+ sleep ( start - now) . await ;
91125 }
92- // this.stop_actor(&actor).await;
93- reactor_client:: apis:: default_api:: stop_actor ( & client_config, actor. clone ( ) )
126+ reactor_client:: apis:: default_api:: add_chaos ( & client_config, chaos_config)
94127 . await
95128 . unwrap ( ) ;
96129 } ) ;
130+ //? This is for later
131+ // if let Some(stop) = stop_opt {
132+ // tokio::spawn(async move {
133+ // let now = Instant::now();
134+ // if stop > now {
135+ // sleep(stop - now).await;
136+ // }
137+ // // reactor_client::apis::default_api::stop_actor(&client_config, actor.clone())
138+ // // .await
139+ // // .unwrap();
140+ // });
141+ // }
97142 }
98143 }
99144
@@ -124,7 +169,7 @@ impl<PM: PlacementManager> JobController<PM> {
124169 client_config : self . client_config ( hostname, port) ,
125170 actors : Vec :: new ( ) ,
126171 loaded_libs : Vec :: new ( ) ,
127- crash_schedule : Vec :: new ( ) ,
172+ chaos_schedule : Vec :: new ( ) ,
128173 } ,
129174 ) ;
130175 }
@@ -170,7 +215,7 @@ impl<PM: PlacementManager> JobController<PM> {
170215
171216 pub async fn chaos_scheduler ( & self ) {
172217 for ( _, node_handle) in self . nodes . iter ( ) {
173- node_handle. schedule_actor_crash ( ) . await ;
218+ node_handle. schedule_actor_chaos ( ) . await ;
174219 }
175220 }
176221
0 commit comments