Skip to content

Commit 8b7f38f

Browse files
committed
feat(testing): make sure scenario is saved during crash and ctrl-c
1 parent d598a5a commit 8b7f38f

File tree

5 files changed

+75
-24
lines changed

5 files changed

+75
-24
lines changed

node/testing/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,25 @@ axum = "0.6"
2727
tower-http = { version = "0.4.4", features = ["cors"] }
2828
strum = "0.24.1"
2929
strum_macros = "0.24.3"
30+
tracing-log = "0.2.0"
3031
documented = { version = "0.1", optional = true }
3132
redux = { git = "https://github.com/openmina/redux-rs.git", branch="feat/global-time", features = ["serde"] }
3233
ledger = { workspace = true }
3334
mina-p2p-messages = { workspace = true }
3435
libp2p = { workspace = true, features = ["macros", "serde", "tcp", "dns", "tokio", "yamux", "pnet", "noise", "gossipsub"] }
3536
vrf = { workspace = true }
3637

38+
nix = { version = "0.27.1", features = ["process", "signal"] }
39+
ctrlc = "3.4.2"
40+
temp-dir = "0.1.11"
3741
console = "0.15.5"
3842
clap = { version = "4.3", features = [ "derive", "env" ] }
43+
reqwest = { version = "0.11.22", features = ["blocking", "json"] }
3944

4045
openmina-core = { path = "../../core" }
4146
node = { path = "../../node" }
4247
openmina-node-invariants = { path = "../../node/invariants" }
4348
openmina-node-native = { path = "../../node/native" }
44-
reqwest = { version = "0.11.22", features = ["blocking", "json"] }
45-
temp-dir = "0.1.11"
46-
nix = { version = "0.27.1", features = ["process", "signal"] }
47-
tracing-log = "0.2.0"
4849

4950
[features]
5051
default = ["scenario-generators"]

node/testing/src/cluster/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use std::collections::BTreeMap;
1111
use std::time::Duration;
1212
use std::{collections::VecDeque, sync::Arc};
1313

14-
use libp2p::futures::{stream::FuturesUnordered, StreamExt};
1514
use ledger::proofs::{VerifierIndex, VerifierSRS};
15+
use libp2p::futures::{stream::FuturesUnordered, StreamExt};
1616
use node::core::channels::mpsc;
1717
use node::core::requests::RpcId;
1818
use node::{
@@ -270,6 +270,9 @@ impl Cluster {
270270

271271
let state = node::State::new(config);
272272
fn effects(store: &mut node::Store<NodeTestingService>, action: node::ActionWithMeta) {
273+
// if action.action().kind().to_string().starts_with("BlockProducer") {
274+
// dbg!(action.action());
275+
// }
273276
store.service.dyn_effects(store.state.get(), &action);
274277
let peer_id = store.state().p2p.my_id();
275278
openmina_core::log::trace!(action.time(); "{peer_id}: {:?}", action.action().kind());

node/testing/src/main.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ impl Command {
3939
let rt = setup();
4040
let _rt_guard = rt.enter();
4141

42+
let (shutdown_tx, shutdown_rx) = openmina_core::channels::oneshot::channel();
43+
let mut shutdown_tx = Some(shutdown_tx);
44+
45+
ctrlc::set_handler(move || match shutdown_tx.take() {
46+
Some(tx) => {
47+
let _ = tx.send(());
48+
}
49+
None => {
50+
std::process::exit(1);
51+
}
52+
})
53+
.expect("Error setting Ctrl-C handler");
54+
4255
match self {
4356
Self::Server(args) => {
4457
server(args.port);
@@ -56,22 +69,32 @@ impl Command {
5669
config
5770
};
5871

59-
if let Some(name) = cmd.name {
60-
if let Some(scenario) = Scenarios::iter()
61-
.into_iter()
62-
.find(|s| <&'static str>::from(s) == name)
63-
{
64-
rt.block_on(scenario.run_and_save_from_scratch(config));
72+
let fut = async move {
73+
if let Some(name) = cmd.name {
74+
if let Some(scenario) = Scenarios::iter()
75+
.into_iter()
76+
.find(|s| <&'static str>::from(s) == name)
77+
{
78+
scenario.run_and_save_from_scratch(config).await;
79+
} else {
80+
anyhow::bail!("no such scenario: \"{name}\"");
81+
}
6582
} else {
66-
anyhow::bail!("no such scenario: \"{name}\"");
83+
for scenario in Scenarios::iter() {
84+
scenario.run_and_save_from_scratch(config.clone()).await;
85+
}
6786
}
68-
} else {
69-
for scenario in Scenarios::iter() {
70-
rt.block_on(scenario.run_and_save_from_scratch(config.clone()));
71-
}
72-
}
87+
Ok(())
88+
};
7389

74-
Ok(())
90+
rt.block_on(async {
91+
tokio::select! {
92+
res = fut => res,
93+
_ = shutdown_rx => {
94+
anyhow::bail!("Received ctrl-c signal! shutting down...");
95+
}
96+
}
97+
})
7598
}
7699
#[cfg(not(feature = "scenario-generators"))]
77100
Err("binary not compiled with `scenario-generators` feature"

node/testing/src/scenario/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,12 @@ impl Scenario {
101101
tokio::fs::write(&tmp_file, encoded).await?;
102102
Ok(tokio::fs::rename(tmp_file, self.file_path()).await?)
103103
}
104+
105+
pub fn save_sync(&self) -> Result<(), anyhow::Error> {
106+
let tmp_file = self.tmp_file_path();
107+
let encoded = serde_json::to_vec_pretty(self)?;
108+
std::fs::create_dir_all(Self::PATH)?;
109+
std::fs::write(&tmp_file, encoded)?;
110+
Ok(std::fs::rename(tmp_file, self.file_path())?)
111+
}
104112
}

node/testing/src/scenarios/mod.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,30 @@ impl Scenarios {
140140
}
141141

142142
pub async fn run_and_save(self, cluster: &mut Cluster) {
143+
struct ScenarioSaveOnExit(Scenario);
144+
145+
impl Drop for ScenarioSaveOnExit {
146+
fn drop(&mut self) {
147+
let info = self.0.info.clone();
148+
let steps = std::mem::take(&mut self.0.steps);
149+
let scenario = Scenario { info, steps };
150+
151+
eprintln!("saving scenario({}) before exit...", scenario.info.id);
152+
if let Err(err) = scenario.save_sync() {
153+
eprintln!(
154+
"failed to save scenario({})! error: {}",
155+
scenario.info.id, err
156+
);
157+
}
158+
}
159+
}
160+
143161
eprintln!("run_and_save: {}", self.to_str());
144-
let mut scenario = self.blank_scenario();
145-
self.run(cluster, |step| scenario.add_step(step.clone()).unwrap())
162+
let mut scenario = ScenarioSaveOnExit(self.blank_scenario());
163+
self.run(cluster, |step| scenario.0.add_step(step.clone()).unwrap())
146164
.await;
147-
scenario
148-
.save()
149-
.await
150-
.expect("failed to save scenario after run");
165+
// drop to save it.
166+
let _ = scenario;
151167
}
152168

153169
pub async fn run_only(self, cluster: &mut Cluster) {

0 commit comments

Comments
 (0)