Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
gen_http_client:
gen_http_client: kill_node
echo "Running demo server..."
- kill $(lsof -ti :3000)
cargo run --features swagger --bin reactor_nctrl -- --port 3000 /tmp &
SERVER_PID=$!
sleep 5
Expand All @@ -9,3 +8,26 @@ gen_http_client:
openapitools/openapi-generator-cli generate -i http://host.docker.internal:3000/api-doc/openapi.json \
-g rust -o /local/rpc_client/ --additional-properties=packageName=reactor-client
kill ${SERVER_PID}

# Expects openapi-generator-cli to be installed:
# npm install @openapitools/openapi-generator-cli -g
gen_http_client_manual: kill_node
@echo "Running demo server..."
cargo run --features swagger --bin reactor_nctrl -- --port 3000 /tmp &
sleep 5
@echo "Generating client"
openapi-generator-cli generate -i http://localhost:3000/api-doc/openapi.json \
-g rust -o rpc_client/ --additional-properties=packageName=reactor-client

kill_node:
@echo "Killing process on port 3000 if any..."
@lsof -ti :3000 | xargs --no-run-if-empty kill

node: kill_node
@echo "Running demo server..."
cargo run --features swagger --bin reactor_nctrl -- --port 3000 /tmp

generate:
@echo "Generating client"
openapi-generator-cli generate -i http://localhost:3000/api-doc/openapi.json \
-g rust -o rpc_client/ --additional-properties=packageName=reactor-client
1 change: 1 addition & 0 deletions actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tracing = {version = "0.1.41", features = ["std", "attributes"] }
tracing-subscriber = "0.3.19"
tracing-shared = "0.1.5"
criterion = "0.6.0"
rand = "0.9.2"

[dev-dependencies]
lazy_static = "1.5.0"
Expand Down
62 changes: 62 additions & 0 deletions actor/src/chaos_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use rand::Rng;

#[derive(Debug)]
pub struct ChaosManager {
pub msg_loss_probability: Option<f32>,
pub msg_duplication_factor: Option<u32>,
pub msg_duplication_probability: Option<f32>,
}

impl ChaosManager {
pub fn new() -> Self {
ChaosManager {
msg_loss_probability: None,
msg_duplication_factor: None,
msg_duplication_probability: None,
}
}

pub fn set_msg_loss(&mut self, probability: f32) {
self.msg_loss_probability = Some(probability);
}

pub fn set_msg_duplication(&mut self, factor: u32, probability: f32) {
self.msg_duplication_factor = Some(factor);
self.msg_duplication_probability = Some(probability);
}

// pub fn unset_msg_loss(&mut self) {
// self.msg_loss_probability = None;
// }

// pub fn unset_msg_duplication(&mut self) {
// self.msg_duplication_factor = None;
// self.msg_duplication_probability = None;
// }

pub fn apply_chaos<T: Clone>(&self, msg: T) -> Vec<T> {
let mut rng = rand::rng();

if self
.msg_loss_probability
.is_some_and(|p| rng.random_bool(p as f64))
{
return vec![];
}

let mut result = vec![msg.clone()];

if let (Some(factor), Some(prob)) = (
self.msg_duplication_factor,
self.msg_duplication_probability,
) {
if rng.random_bool(prob as f64) {
for _ in 1..factor {
result.push(msg.clone());
}
}
}

result
}
}
63 changes: 60 additions & 3 deletions actor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::{
use tokio_util::codec::{Decoder, Encoder};
pub use tracing_shared::setup_shared_logger_ref;

mod chaos_manager;
pub mod codec;
pub mod err;
mod node_comm;
Expand Down Expand Up @@ -136,6 +137,13 @@ enum R2PMsg<T> {
AddPrio(mpsc::Receiver<R2PMsg<T>>),
#[allow(dead_code)]
RemoveLowPrio,
SetMsgDuplication {
factor: u32,
probability: f32,
},
SetMsgLoss {
probability: f32,
},
}

impl<T: Clone> Clone for R2PMsg<T> {
Expand Down Expand Up @@ -164,6 +172,8 @@ impl<T: HasPriority> HasPriority for R2PMsg<T> {
R2PMsg::Exit => MAX_PRIO,
R2PMsg::AddPrio(_) => MAX_PRIO,
R2PMsg::RemoveLowPrio => MAX_PRIO,
R2PMsg::SetMsgLoss { .. } => MAX_PRIO,
R2PMsg::SetMsgDuplication { .. } => MAX_PRIO,
}
}
}
Expand Down Expand Up @@ -504,15 +514,42 @@ where
));

let _addr = ctx.addr;
let mut chaos_manager = chaos_manager::ChaosManager::new();
let proc_handle: JoinHandle<Result<(), ActorError>> =
tokio::task::spawn_blocking(move || -> Result<(), ActorError> {
tracing::info!("[ACTOR][{}] Processor Started", _addr);
loop {
match r2p_rx.recv() {
Some(R2PMsg::Msg(m, origin)) => {
let processed = processor.process(m);
for o in processed {
p2s_tx.send((o, origin)).map_err(|_| ActorError::P2SErr)?;
let chaos_out = chaos_manager.apply_chaos(m);
let chaos_out_len = chaos_out.len();
// // print len
tracing::debug!(
"[ACTOR][{}] Message Received from {} | After Chaos: {} messages",
_addr,
origin,
chaos_out_len
);
tracing::debug!(
"[ACTOR][{}] Chaos Flags - Msg Loss: {} | Msg Duplication: {}",
_addr,
chaos_manager.msg_loss_probability.unwrap_or(-1.0),
chaos_manager.msg_duplication_factor.unwrap_or(1)
);
if chaos_out_len > 1 {
tracing::warn!(
"[ACTOR][{}] Message Duplicated {} times",
_addr,
chaos_out_len
);
} else if chaos_out_len == 0 {
tracing::warn!("[ACTOR][{}] Message Lost", _addr);
}
for msg in chaos_out {
let processed = processor.process(msg);
for o in processed {
p2s_tx.send((o, origin)).map_err(|_| ActorError::P2SErr)?;
}
}
}
Some(R2PMsg::AddPrio(new_rx)) => {
Expand All @@ -524,6 +561,26 @@ where
Some(R2PMsg::Exit) => {
break;
}
Some(R2PMsg::SetMsgDuplication {
factor,
probability,
}) => {
tracing::info!(
"[ACTOR][{}] Setting Msg Duplication: factor={}, probability={}",
_addr,
factor,
probability
);
chaos_manager.set_msg_duplication(factor, probability);
}
Some(R2PMsg::SetMsgLoss { probability }) => {
tracing::info!(
"[ACTOR][{}] Setting Msg Loss: probability={}",
_addr,
probability
);
chaos_manager.set_msg_loss(probability);
}
None => {
break;
}
Expand Down
2 changes: 2 additions & 0 deletions actor/src/node_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum ControlInst {
StartLocalRecv(LocalChannelRx),
StartTcpRecv(u16),
Stop,
SetMsgLoss { probability: f32 },
SetMsgDuplication { factor: u32, probability: f32 },
}

#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions actor/src/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,24 @@ where
msg_transform,
));
}
ControlInst::SetMsgDuplication {
factor,
probability,
} => {
p_tx.send(R2PMsg::SetMsgDuplication {
factor,
probability,
})
.await
.map_err(|_| ActorError::R2PErr)?;
// break;
}
ControlInst::SetMsgLoss { probability } => {
p_tx.send(R2PMsg::SetMsgLoss { probability })
.await
.map_err(|_| ActorError::R2PErr)?;
// break;
}
ControlInst::Stop => {
cancel_token.cancel();
p_tx.send(R2PMsg::Exit)
Expand Down
3 changes: 2 additions & 1 deletion actor/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ async fn sender_task<M, E>(
Connection::CouldntResolve => {
log::warn!("[ACTOR] Failed to resolve {}", send_addr);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
// continue;
break;
}
};
}
Expand Down
1 change: 1 addition & 0 deletions generic_jctrl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ reactor-jobm = { path = "../job_manager" }
serde = { version = "1", features = ["derive"] }
toml = "0.8.23"
clap = { version = "4", features = ["derive"] }
ordered-float = {version = "5"}

[dev-dependencies]
serde_json = "1.0"
Loading