Skip to content

Commit 2f343e7

Browse files
Client side code done for dupl, loss
ChaosManager at Actor TBD Co-authored-by: Brian Sajeev <Brian-1402@users.noreply.github.com>
1 parent bbf2aa2 commit 2f343e7

File tree

28 files changed

+492
-136
lines changed

28 files changed

+492
-136
lines changed

Cargo.lock

Lines changed: 37 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,9 @@ kill_node:
2525

2626
node: kill_node
2727
@echo "Running demo server..."
28-
cargo run --features swagger --bin reactor_nctrl -- --port 3000 /tmp
28+
cargo run --features swagger --bin reactor_nctrl -- --port 3000 /tmp
29+
30+
generate:
31+
@echo "Generating client"
32+
openapi-generator-cli generate -i http://localhost:3000/api-doc/openapi.json \
33+
-g rust -o rpc_client/ --additional-properties=packageName=reactor-client

actor/src/node_comm.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum ControlInst {
2727
StartLocalRecv(LocalChannelRx),
2828
StartTcpRecv(u16),
2929
Stop,
30+
SetMsgLoss { probability: f32 },
31+
SetMsgDuplication { factor: i32, probability: f32 },
3032
}
3133

3234
#[derive(Debug)]

actor/src/recv.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,23 @@ where
135135
.map_err(|_| ActorError::R2PErr)?;
136136
break;
137137
}
138+
ControlInst::SetMsgDuplication {
139+
factor,
140+
probability,
141+
} => {
142+
// cancel_token.cancel();
143+
// p_tx.send(R2PMsg::Exit)
144+
// .await
145+
// .map_err(|_| ActorError::R2PErr)?;
146+
break;
147+
}
148+
ControlInst::SetMsgLoss { probability } => {
149+
// cancel_token.cancel();
150+
// p_tx.send(R2PMsg::Exit)
151+
// .await
152+
// .map_err(|_| ActorError::R2PErr)?;
153+
break;
154+
}
138155
}
139156
}
140157
tcp_server_set.abort_all();

actor/src/send.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ async fn sender_task<M, E>(
211211
Connection::CouldntResolve => {
212212
log::warn!("[ACTOR] Failed to resolve {}", send_addr);
213213
tokio::time::sleep(Duration::from_millis(100)).await;
214-
continue;
214+
// continue;
215+
break;
215216
}
216217
};
217218
}

generic_jctrl/src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ port = 3000
109109
actor_name: "pinger".into(),
110110
payload: HashMap::from([("other".to_string(), json!("ponger"))]),
111111
replicas: None,
112-
chaos: Some(ChaosOp::Crash { start_ms: 10000 }),
112+
chaos: Some(vec![ChaosOp::Crash {
113+
start_ms: Some(10000),
114+
}]),
113115
}],
114116
),
115117
(

job_manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ serde_json = "1.0"
2020
cargo_toml = "0.22"
2121
thiserror = "2.0"
2222
tempfile = "3.20.0"
23+
ordered-float = {version = "5", features = ["serde"]}
2324
reactor-client = {path="../rpc_client"}
2425

2526

job_manager/src/lib.rs

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::time::sleep;
77
use placement::{ChaosOp, Hostname, LibInfo, LogicalOp, PhysicalOp, PlacementManager};
88
use reactor_client::{
99
self,
10-
models::{RemoteActorInfo, SpawnArgs},
10+
models::{RemoteActorInfo, SpawnArgs, ChaosConfig, ChaosType},
1111
};
1212

1313
pub mod placement;
@@ -17,7 +17,7 @@ struct NodeHandle {
1717
client_config: reactor_client::apis::configuration::Configuration,
1818
actors: Vec<RemoteActorInfo>,
1919
loaded_libs: Vec<String>,
20-
crash_schedule: Vec<(RemoteActorInfo, Instant)>,
20+
chaos_schedule: Vec<(RemoteActorInfo, ChaosOp, Instant, Option<Instant>)>,
2121
}
2222

2323
impl NodeHandle {
@@ -52,6 +52,7 @@ impl NodeHandle {
5252
async fn place(&mut self, logical_op: &LogicalOp, physical_op: &PhysicalOp) -> RemoteActorInfo {
5353
let mut remote_actor_info = reactor_client::apis::default_api::start_actor(
5454
&self.client_config,
55+
//? Might need to add choas info here (instead of just payload) -> useful for msg_loss, msg_duplicate chaos ops
5556
SpawnArgs {
5657
actor_name: physical_op.actor_name.clone(),
5758
operator_name: logical_op.name.clone(),
@@ -63,11 +64,18 @@ impl NodeHandle {
6364
.unwrap();
6465
remote_actor_info.hostname = self.hostname.to_string();
6566
self.actors.push(remote_actor_info.clone());
66-
if let Some(ChaosOp::Crash { start_ms }) = &physical_op.chaos {
67+
for chaos_op in &physical_op.chaos.clone().unwrap_or_default() {
6768
let now = Instant::now();
68-
let delay = Duration::from_millis(*start_ms as u64);
69-
self.crash_schedule
70-
.push((remote_actor_info.clone(), now + delay));
69+
let start_time = now + Duration::from_millis(chaos_op.start_ms() as u64);
70+
let stop_time = chaos_op
71+
.stop_ms()
72+
.map(|stop_ms| now + Duration::from_millis(stop_ms as u64));
73+
self.chaos_schedule.push((
74+
remote_actor_info.clone(),
75+
chaos_op.clone(),
76+
start_time,
77+
stop_time,
78+
));
7179
}
7280
remote_actor_info
7381
}
@@ -78,22 +86,55 @@ impl NodeHandle {
7886
.unwrap();
7987
}
8088

81-
async fn schedule_actor_crash(&self) {
89+
async fn schedule_actor_chaos(&self) {
8290
// also needs some logic on, what if ctrlc pressed prematurely
8391
// and this function keeps running and sends requests to non-existent nodes
84-
for (actor, when) in self.crash_schedule.clone() {
85-
// let this = self.clone();
92+
for (actor, op, start, stop_opt) in self.chaos_schedule.clone() {
8693
let client_config = self.client_config.clone();
94+
95+
let chaos_config = match op {
96+
ChaosOp::Crash { .. } => ChaosConfig {
97+
kind: ChaosType::Crash,
98+
actor_name: actor.name.clone(),
99+
factor: None,
100+
probability: None,
101+
},
102+
103+
ChaosOp::MsgLoss { probability, .. } => ChaosConfig {
104+
kind: ChaosType::MsgLoss,
105+
actor_name: actor.name.clone(),
106+
factor: None,
107+
probability: Some(probability.into_inner()),
108+
},
109+
110+
ChaosOp::MsgDuplication { factor, probability, .. } => ChaosConfig {
111+
kind: ChaosType::MsgDuplication,
112+
actor_name: actor.name.clone(),
113+
factor: Some(factor),
114+
probability: Some(probability.into_inner()),
115+
},
116+
};
87117
tokio::spawn(async move {
88118
let now = Instant::now();
89-
if when > now {
90-
sleep(when - now).await;
119+
if start > now {
120+
sleep(start - now).await;
91121
}
92-
// this.stop_actor(&actor).await;
93-
reactor_client::apis::default_api::stop_actor(&client_config, actor.clone())
122+
reactor_client::apis::default_api::add_chaos(&client_config, chaos_config)
94123
.await
95124
.unwrap();
96125
});
126+
//? This is for later
127+
// if let Some(stop) = stop_opt {
128+
// tokio::spawn(async move {
129+
// let now = Instant::now();
130+
// if stop > now {
131+
// sleep(stop - now).await;
132+
// }
133+
// // reactor_client::apis::default_api::stop_actor(&client_config, actor.clone())
134+
// // .await
135+
// // .unwrap();
136+
// });
137+
// }
97138
}
98139
}
99140

@@ -124,7 +165,7 @@ impl<PM: PlacementManager> JobController<PM> {
124165
client_config: self.client_config(hostname, port),
125166
actors: Vec::new(),
126167
loaded_libs: Vec::new(),
127-
crash_schedule: Vec::new(),
168+
chaos_schedule: Vec::new(),
128169
},
129170
);
130171
}
@@ -170,7 +211,7 @@ impl<PM: PlacementManager> JobController<PM> {
170211

171212
pub async fn chaos_scheduler(&self) {
172213
for (_, node_handle) in self.nodes.iter() {
173-
node_handle.schedule_actor_crash().await;
214+
node_handle.schedule_actor_chaos().await;
174215
}
175216
}
176217

job_manager/src/placement.rs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use ordered_float::OrderedFloat;
12
use serde::Deserialize;
23
use std::{
34
collections::{BTreeMap, HashMap},
@@ -44,24 +45,47 @@ pub struct LogicalOp {
4445
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
4546
pub enum ChaosOp {
4647
Crash {
47-
start_ms: u32,
48+
start_ms: Option<u32>,
4849
},
4950
MsgLoss {
50-
start_ms: u32,
51-
probability: i32, // should change to f32, but issue with derive(Eq)
51+
start_ms: Option<u32>,
52+
stop_ms: Option<u32>,
53+
probability: OrderedFloat<f32>, // should change to f32, but issue with derive(Eq)
5254
},
55+
//? Add probabiltity field for this?
56+
//? duplication factor instead of rate? (confusing terms)
5357
MsgDuplication {
54-
start_ms: u32,
55-
rate: i32, // should change to f32, but issue with derive(Eq)
58+
start_ms: Option<u32>,
59+
stop_ms: Option<u32>,
60+
factor: i32,
61+
probability: OrderedFloat<f32>,
5662
},
5763
}
5864

65+
impl ChaosOp {
66+
pub fn start_ms(&self) -> u32 {
67+
match self {
68+
ChaosOp::Crash { start_ms, .. }
69+
| ChaosOp::MsgLoss { start_ms, .. }
70+
| ChaosOp::MsgDuplication { start_ms, .. } => start_ms.unwrap_or(0),
71+
}
72+
}
73+
74+
pub fn stop_ms(&self) -> Option<u32> {
75+
match self {
76+
ChaosOp::Crash { .. } => None,
77+
ChaosOp::MsgLoss { stop_ms, .. } | ChaosOp::MsgDuplication { stop_ms, .. } => *stop_ms,
78+
}
79+
}
80+
}
81+
5982
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
6083
pub struct PhysicalOp {
6184
pub nodename: String,
6285
pub actor_name: String,
6386
pub replicas: Option<u32>,
64-
pub chaos: Option<ChaosOp>,
87+
//? Change to list (/map) of chaos ops for multiple choas ops per actor
88+
pub chaos: Option<Vec<ChaosOp>>,
6589
#[serde(flatten)]
6690
pub payload: HashMap<String, serde_json::Value>,
6791
}

node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ tempfile = "3.20.0"
1818
reactor-actor = { path = "../actor"}
1919
# env_logger = "0.11"
2020
thiserror.workspace = true
21+
ordered-float = {version = "5", features = ["serde"]}
2122

2223
tracing = "0.1.41"
2324
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

0 commit comments

Comments
 (0)